22 #include <boost/asio.hpp> 24 #include "goby/common/logger.h" 26 #include "goby/pb/application.h" 28 #include "goby/acomms/protobuf/mosh_packet.pb.h" 29 #include "mosh_relay_config.pb.h" 33 using boost::asio::ip::udp;
43 Packetizer(
int src,
int dest,
const std::vector<char>& input);
44 const std::set<protobuf::MoshPacket>& fragments() {
return fragments_; }
47 std::vector<char> reassemble();
50 std::set<protobuf::MoshPacket> fragments_;
61 void start_udp_receive();
62 void handle_udp_receive(
const boost::system::error_code& error, std::size_t bytes_transferred);
64 void handle_udp_send(
const boost::system::error_code& error, std::size_t bytes_transferred);
68 boost::asio::io_service io_service_;
73 MOSH_UDP_PAYLOAD_SIZE = 1300
76 udp::endpoint remote_endpoint_;
77 std::vector<char> recv_buffer_;
80 std::map<ModemId, Packetizer> packets_;
85 inline bool operator<(
const MoshPacket& a,
const MoshPacket& b)
87 return a.frag_num() < b.frag_num();
94 const int MOSH_FRAGMENT_SIZE = goby::acomms::protobuf::MoshPacket::descriptor()
95 ->FindFieldByName(
"fragment")
97 .GetExtension(dccl::field)
100 void test_packetizer(
int size)
102 std::vector<char> in(size, 0);
103 for (
int i = 0; i < size; ++i) in[i] = i % 256;
105 const std::set<goby::acomms::protobuf::MoshPacket>& f = pi.fragments();
107 for (std::set<goby::acomms::protobuf::MoshPacket>::const_iterator it = f.begin(), end = f.end();
109 ready = po.add_fragment(*it);
112 assert(po.reassemble() == in);
115 int main(
int argc,
char* argv[])
117 test_packetizer(MOSH_FRAGMENT_SIZE * 4);
118 test_packetizer(1300);
119 test_packetizer(MOSH_FRAGMENT_SIZE - 10);
120 test_packetizer(MOSH_FRAGMENT_SIZE * 2 - 5);
123 goby::run<goby::acomms::MoshRelay>(argc, argv, &cfg);
130 recv_buffer_(MOSH_UDP_PAYLOAD_SIZE, 0)
132 glog.is(DEBUG1) &&
glog << cfg_.DebugString() << std::endl;
134 socket_.open(udp::v4());
137 socket_.bind(udp::endpoint(boost::asio::ip::address::from_string(cfg_.ip_address()),
142 remote_endpoint_ = udp::endpoint(boost::asio::ip::address::from_string(cfg_.ip_address()),
147 subscribe(&MoshRelay::handle_goby_receive,
this,
148 "QueueRx" + goby::util::as<std::string>(cfg_.src_modem_id()));
151 goby::acomms::MoshRelay::~MoshRelay() {}
153 void goby::acomms::MoshRelay::loop() { io_service_.poll(); }
155 void goby::acomms::MoshRelay::start_udp_receive()
157 socket_.async_receive_from(boost::asio::buffer(recv_buffer_), remote_endpoint_,
158 boost::bind(&MoshRelay::handle_udp_receive,
this,
159 boost::asio::placeholders::error,
160 boost::asio::placeholders::bytes_transferred));
163 void goby::acomms::MoshRelay::handle_udp_receive(
const boost::system::error_code& error,
164 std::size_t bytes_transferred)
166 if (!error || error == boost::asio::error::message_size)
168 glog.is(DEBUG1) &&
glog << remote_endpoint_ <<
": " << bytes_transferred <<
" Bytes" 172 cfg_.src_modem_id(), cfg_.dest_modem_id(),
173 std::vector<char>(recv_buffer_.begin(), recv_buffer_.begin() + bytes_transferred));
174 const std::set<goby::acomms::protobuf::MoshPacket>& f = p.fragments();
175 for (std::set<goby::acomms::protobuf::MoshPacket>::const_iterator it = f.begin(),
178 publish(*it,
"QueuePush" + goby::util::as<std::string>(cfg_.src_modem_id()));
186 glog.is(DEBUG1) &&
glog <<
"> " << packet.ShortDebugString() << std::endl;
188 if (packet.dest() == (
int)cfg_.src_modem_id() && packet.src() == (
int)cfg_.dest_modem_id())
190 if (packets_[packet.src()].add_fragment(packet))
192 socket_.async_send_to(
193 boost::asio::buffer(packets_[packet.src()].reassemble()), remote_endpoint_,
194 boost::bind(&MoshRelay::handle_udp_send,
this, boost::asio::placeholders::error,
195 boost::asio::placeholders::bytes_transferred));
196 packets_.erase(packet.src());
201 void goby::acomms::MoshRelay::handle_udp_send(
const boost::system::error_code& ,
206 goby::acomms::Packetizer::Packetizer() {}
208 goby::acomms::Packetizer::Packetizer(
int src,
int dest,
const std::vector<char>& input)
212 packet.set_dest(dest);
214 for (
int i = 0, n = (input.size() + MOSH_FRAGMENT_SIZE - 1) / MOSH_FRAGMENT_SIZE; i < n; ++i)
216 packet.set_frag_num(i);
218 std::min(MOSH_FRAGMENT_SIZE, (
int)(input.size() - i * MOSH_FRAGMENT_SIZE)));
219 packet.set_is_last_frag(i + 1 == n);
220 std::string* frag = packet.mutable_fragment();
221 frag->resize(MOSH_FRAGMENT_SIZE);
223 std::vector<char>::const_iterator begin = input.begin() + i * MOSH_FRAGMENT_SIZE,
224 end = begin + packet.frag_len();
225 std::copy(begin, end, frag->begin());
227 glog.is(DEBUG1) &&
glog << packet.DebugString() << std::endl;
229 fragments_.insert(packet);
235 fragments_.insert(frag);
238 if (frag.is_last_frag() && fragments_.size() != frag.frag_num() + 1)
241 glog.is(WARN) &&
glog <<
"Missed fragment" << std::endl;
245 return frag.is_last_frag();
248 std::vector<char> goby::acomms::Packetizer::reassemble()
250 if (!fragments_.size())
251 return std::vector<char>();
253 std::vector<char> out(MOSH_FRAGMENT_SIZE * fragments_.size(), 0);
255 std::vector<char>::iterator oit = out.begin();
257 for (std::set<protobuf::MoshPacket>::const_iterator it = fragments_.begin(),
258 end = fragments_.end();
260 { oit = std::copy(it->fragment().begin(), it->fragment().begin() + it->frag_len(), oit); }
261 out.resize(out.size() - (MOSH_FRAGMENT_SIZE - fragments_.rbegin()->frag_len()));
Base class provided for users to generate applications that participate in the Goby publish/subscribe...
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.