22 #include "goby/common/logger.h" 23 #include "goby/common/logger/term_color.h" 24 #include "goby/common/zeromq_service.h" 26 #include "goby/pb/application.h" 28 #include "goby/acomms/amac.h" 29 #include "goby/acomms/queue.h" 30 #include "goby/acomms/route.h" 32 #include "goby/acomms/bind.h" 33 #include "goby/acomms/connect.h" 35 #include "goby/acomms/protobuf/queue.pb.h" 37 #include "goby/acomms/protobuf/file_transfer.pb.h" 38 #include "goby/acomms/protobuf/mm_driver.pb.h" 39 #include "goby/acomms/protobuf/modem_driver_status.pb.h" 40 #include "goby/acomms/protobuf/mosh_packet.pb.h" 41 #include "goby/acomms/protobuf/time_update.pb.h" 43 #include "bridge_config.pb.h" 68 void handle_external_push(boost::shared_ptr<google::protobuf::Message> msg,
75 catch (std::exception& e)
77 glog.is(WARN) &&
glog <<
"Failed to push message: " << e.what() << std::endl;
88 goby::acomms::protobuf::NetworkAck::AckType type);
89 void generate_time_update_network_ack(
QueueManager* in_queue,
90 goby::acomms::protobuf::NetworkAck::AckType type);
95 std::vector<boost::shared_ptr<QueueManager> > q_managers_;
96 std::vector<boost::shared_ptr<MACManager> > mac_managers_;
100 boost::shared_ptr<micromodem::protobuf::HardwareControlCommand> pending_hw_ctl_;
101 boost::shared_ptr<goby::acomms::protobuf::TimeUpdateResponse> pending_time_update_;
107 int main(
int argc,
char* argv[])
110 goby::run<goby::acomms::Bridge>(argc, argv, &cfg);
116 : Application(cfg), DynamicProtobufNode(&Application::zeromq_service()), cfg_(*cfg),
117 time_update_request_time_(0)
119 glog.is(DEBUG1) &&
glog << cfg_.DebugString() << std::endl;
124 for (
int i = 0, n = cfg_.load_shared_library_size(); i < n; ++i)
126 glog.is(DEBUG1) &&
glog <<
"Loading shared library: " << cfg_.load_shared_library(i)
130 goby::util::DynamicProtobufManager::load_from_shared_lib(cfg_.load_shared_library(i));
134 glog.is(DIE) &&
glog <<
"Failed ... check path provided or add to /etc/ld.so.conf " 135 <<
"or LD_LIBRARY_PATH" << std::endl;
138 glog.is(DEBUG1) &&
glog <<
"Loading shared library dccl codecs." << std::endl;
144 goby::util::DynamicProtobufManager::enable_compilation();
145 for (
int i = 0, n = cfg_.load_proto_file_size(); i < n; ++i)
147 glog.is(DEBUG1) &&
glog <<
"Loading protobuf file: " << cfg_.load_proto_file(i)
150 if (!goby::util::DynamicProtobufManager::load_from_proto_file(cfg_.load_proto_file(i)))
151 glog.is(DIE) &&
glog <<
"Failed to load file." << std::endl;
154 r_manager_.set_cfg(cfg_.route_cfg());
155 q_managers_.resize(cfg_.subnet_size());
156 mac_managers_.resize(cfg_.subnet_size());
157 for (
int i = 0, n = cfg_.subnet_size(); i < n; ++i)
163 q_managers_[i]->set_cfg(qcfg);
165 mac_managers_[i]->startup(cfg_.subnet(i).mac_cfg());
170 &(q_managers_[i]->signal_ack),
171 boost::bind(&Bridge::handle_link_ack,
this, _1, _2, q_managers_[i].
get()));
174 &(q_managers_[i]->signal_receive),
175 boost::bind(&Bridge::handle_queue_receive,
this, _1, q_managers_[i].
get()));
177 Application::subscribe<goby::acomms::protobuf::ModemTransmission>(
178 boost::bind(&Bridge::handle_modem_receive,
this, _1, q_managers_[i].
get()),
179 "Rx" + goby::util::as<std::string>(qcfg.modem_id()));
181 DynamicProtobufNode::subscribe(
182 goby::common::PubSubNodeWrapperBase::SOCKET_SUBSCRIBE,
183 boost::bind(&Bridge::handle_external_push,
this, _1, q_managers_[i].
get()),
184 "QueuePush" + goby::util::as<std::string>(qcfg.modem_id()));
186 Application::subscribe<goby::acomms::protobuf::ModemTransmission>(
187 boost::bind(&Bridge::handle_data_request,
this, _1, i),
188 "DataRequest" + goby::util::as<std::string>(qcfg.modem_id()));
190 Application::subscribe<goby::acomms::protobuf::ModemDriverStatus>(
191 boost::bind(&Bridge::handle_driver_status,
this, _1, i),
192 "Status" + goby::util::as<std::string>(qcfg.modem_id()));
195 boost::bind(&Bridge::handle_initiate_transmission,
this, _1, i));
199 goby::acomms::Bridge::~Bridge() {}
201 void goby::acomms::Bridge::loop()
203 for (std::vector<boost::shared_ptr<QueueManager> >::iterator it = q_managers_.begin(),
204 end = q_managers_.end();
206 { (*it)->do_work(); }
208 for (std::vector<boost::shared_ptr<MACManager> >::iterator it = mac_managers_.begin(),
209 end = mac_managers_.end();
211 { (*it)->do_work(); }
goby::uint64 now = goby::common::goby_time<goby::uint64>();
212 if (pending_hw_ctl_ && (pending_hw_ctl_->time() + cfg_.special_command_ttl() * 1000000 < now))
214 glog.is(VERBOSE) &&
glog <<
"HardwareControlCommand expired." << std::endl;
216 generate_hw_ctl_network_ack(q_managers_.at(0).get(),
217 goby::acomms::protobuf::NetworkAck::EXPIRE);
218 pending_hw_ctl_.reset();
221 if (pending_time_update_ &&
222 (pending_time_update_->time() + cfg_.special_command_ttl() * 1000000 < now))
224 glog.is(VERBOSE) &&
glog <<
"TimeUpdateRequest expired." << std::endl;
226 generate_time_update_network_ack(q_managers_.at(0).get(),
227 goby::acomms::protobuf::NetworkAck::EXPIRE);
228 pending_time_update_.reset();
235 publish(msg,
"QueueRx" + goby::util::as<std::string>(from_queue->
modem_id()));
238 if (msg.GetDescriptor() == goby::acomms::protobuf::RouteCommand::descriptor())
241 route_cmd.CopyFrom(msg);
242 glog.is(VERBOSE) &&
glog <<
"Received RouteCommand: " << msg.DebugString() << std::endl;
244 cfg.mutable_route()->CopyFrom(route_cmd.new_route());
245 r_manager_.set_cfg(cfg);
247 else if (msg.GetDescriptor() == micromodem::protobuf::HardwareControlCommand::descriptor())
250 pending_hw_ctl_->CopyFrom(msg);
251 if (!pending_hw_ctl_->has_hw_ctl_dest())
252 pending_hw_ctl_->set_hw_ctl_dest(pending_hw_ctl_->command_dest());
254 glog.is(VERBOSE) &&
glog <<
"Received HardwareControlCommand: " << msg.DebugString()
257 else if (msg.GetDescriptor() == goby::acomms::protobuf::TimeUpdateRequest::descriptor())
260 request.CopyFrom(msg);
263 pending_time_update_->set_time(request.time());
264 time_update_request_time_ = request.time();
265 pending_time_update_->set_request_src(request.src());
266 pending_time_update_->set_src(from_queue->
modem_id());
267 pending_time_update_->set_dest(request.update_time_for_id());
269 glog.is(VERBOSE) &&
glog <<
"Received TimeUpdateRequest: " << msg.DebugString()
279 publish(orig_msg,
"QueueAckOrig" + goby::util::as<std::string>(from_queue->
modem_id()));
282 void goby::acomms::Bridge::handle_modem_receive(
289 if (cfg_.forward_cacst())
291 for (
int i = 0, n = message.ExtensionSize(micromodem::protobuf::receive_stat); i < n;
295 message.GetExtension(micromodem::protobuf::receive_stat, i);
297 glog.is(VERBOSE) &&
glog <<
"Forwarding statistics message to topside: " << cacst
299 r_manager_.handle_in(in_queue->meta_from_msg(cacst), cacst, in_queue->
modem_id());
303 if (cfg_.forward_ranging_reply() &&
304 message.HasExtension(micromodem::protobuf::ranging_reply))
307 message.GetExtension(micromodem::protobuf::ranging_reply);
309 glog.is(VERBOSE) &&
glog <<
"Forwarding ranging message to topside: " << ranging
311 r_manager_.handle_in(in_queue->meta_from_msg(ranging), ranging, in_queue->
modem_id());
314 if (pending_time_update_)
316 if (message.type() == goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC &&
317 message.GetExtension(micromodem::protobuf::type) ==
318 micromodem::protobuf::MICROMODEM_TWO_WAY_PING)
321 message.GetExtension(micromodem::protobuf::ranging_reply);
323 if (range_reply.one_way_travel_time_size() > 0)
324 pending_time_update_->set_time_of_flight_microsec(
325 range_reply.one_way_travel_time(0) * 1e6);
327 glog.is(VERBOSE) &&
glog <<
"Received time of flight of " 328 << pending_time_update_->time_of_flight_microsec()
329 <<
" microseconds" << std::endl;
331 else if (message.type() == goby::acomms::protobuf::ModemTransmission::ACK &&
332 pending_time_update_->has_time_of_flight_microsec())
334 if (message.acked_frame_size() && message.acked_frame(0) == 0)
337 glog.is(VERBOSE) &&
glog <<
"Received ack for TimeUpdateResponse" << std::endl;
339 generate_time_update_network_ack(in_queue,
340 goby::acomms::protobuf::NetworkAck::ACK);
341 pending_time_update_.reset();
346 if (pending_hw_ctl_ &&
347 message.type() == goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC &&
348 message.GetExtension(micromodem::protobuf::type) ==
349 micromodem::protobuf::MICROMODEM_HARDWARE_CONTROL_REPLY)
352 message.GetExtension(micromodem::protobuf::hw_ctl);
354 if (message.src() == pending_hw_ctl_->hw_ctl_dest() &&
355 message.dest() == in_queue->
modem_id())
357 glog.is(VERBOSE) &&
glog <<
"Received hardware control response: " << control
358 <<
" to our command: " << *pending_hw_ctl_ << std::endl;
360 generate_hw_ctl_network_ack(in_queue, goby::acomms::protobuf::NetworkAck::ACK);
361 pending_hw_ctl_.reset();
365 catch (std::exception& e)
367 glog.is(WARN) &&
glog <<
"Failed to handle incoming message: " << e.what() << std::endl;
371 void goby::acomms::Bridge::generate_hw_ctl_network_ack(
372 QueueManager* in_queue, goby::acomms::protobuf::NetworkAck::AckType type)
375 ack.set_ack_src(pending_hw_ctl_->hw_ctl_dest());
376 ack.set_message_dccl_id(DCCLCodec::get()->
id(pending_hw_ctl_->GetDescriptor()));
378 ack.set_message_src(pending_hw_ctl_->command_src());
379 ack.set_message_dest(pending_hw_ctl_->command_dest());
380 ack.set_message_time(pending_hw_ctl_->time());
381 ack.set_ack_type(type);
383 r_manager_.handle_in(in_queue->meta_from_msg(ack), ack, in_queue->
modem_id());
386 void goby::acomms::Bridge::generate_time_update_network_ack(
387 QueueManager* in_queue, goby::acomms::protobuf::NetworkAck::AckType type)
390 ack.set_ack_src(pending_time_update_->dest());
391 ack.set_message_dccl_id(
392 DCCLCodec::get()->
id(goby::acomms::protobuf::TimeUpdateRequest::descriptor()));
394 ack.set_message_src(pending_time_update_->request_src());
395 ack.set_message_dest(pending_time_update_->dest());
396 ack.set_message_time(time_update_request_time_);
397 ack.set_ack_type(type);
399 r_manager_.handle_in(in_queue->meta_from_msg(ack), ack, in_queue->
modem_id());
406 if (pending_time_update_ &&
410 if (!pending_time_update_->has_time_of_flight_microsec())
412 new_transmission.set_dest(pending_time_update_->dest());
413 new_transmission.set_type(goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC);
414 new_transmission.SetExtension(micromodem::protobuf::type,
415 micromodem::protobuf::MICROMODEM_TWO_WAY_PING);
420 new_transmission.set_type(goby::acomms::protobuf::ModemTransmission::DATA);
421 new_transmission.set_ack_requested(
true);
422 new_transmission.set_dest(pending_time_update_->dest());
427 *pending_time_update_);
429 publish(new_transmission,
430 "Tx" + goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
433 else if (pending_hw_ctl_ &&
437 new_transmission.set_dest(pending_hw_ctl_->hw_ctl_dest());
438 new_transmission.set_type(goby::acomms::protobuf::ModemTransmission::DRIVER_SPECIFIC);
439 new_transmission.SetExtension(micromodem::protobuf::type,
440 micromodem::protobuf::MICROMODEM_HARDWARE_CONTROL);
441 new_transmission.MutableExtension(micromodem::protobuf::hw_ctl)
442 ->CopyFrom(pending_hw_ctl_->control());
443 publish(new_transmission,
444 "Tx" + goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
448 publish(m,
"Tx" + goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
456 q_managers_[subnet]->handle_modem_data_request(&msg);
457 publish(msg,
"DataResponse" +
458 goby::util::as<std::string>(cfg_.subnet(subnet).queue_cfg().modem_id()));
463 glog.is(VERBOSE) &&
glog <<
"Forwarding modemdriver status message to topside: " 464 << m.ShortDebugString() << std::endl;
467 r_manager_.handle_in(in_queue->meta_from_msg(m), m, in_queue->
modem_id());
provides an API to the goby-acomms Queuing Library.
Base class provided for users to generate applications that participate in the Goby publish/subscribe...
uint64 goby_time< uint64 >()
Returns current UTC time as integer microseconds since 1970-01-01 00:00:00.
static DCCLCodec * get()
DCCLCodec is a singleton class; use this to get a pointer to the class.
void handle_modem_receive(const protobuf::ModemTransmission &message)
Receive incoming data from the modem.
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
void connect(Signal *signal, Slot slot)
connect a signal to a slot (e.g. function pointer)
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
int modem_id()
The current modem ID (MAC address) of this node.
google::protobuf::uint64 uint64
an unsigned 64 bit integer
void bind(ModemDriverBase &driver, QueueManager &queue_manager)
binds the driver link-layer callbacks to the QueueManager
void push_message(const google::protobuf::Message &new_message)
Push a message (and add the queue if it does not exist)
const int QUERY_DESTINATION_ID
special modem id used internally to goby-acomms for indicating that the MAC layer (amac) is agnostic ...