Goby v2
pAcommsHandler.cpp
1 // Copyright 2009-2018 Toby Schneider (http://gobysoft.org/index.wt/people/toby)
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 //
5 //
6 // This file is part of the Goby Underwater Autonomy Project Binaries
7 // ("The Goby Binaries").
8 //
9 // The Goby Binaries are free software: you can redistribute them and/or modify
10 // them under the terms of the GNU General Public License as published by
11 // the Free Software Foundation, either version 2 of the License, or
12 // (at your option) any later version.
13 //
14 // The Goby Binaries are distributed in the hope that they will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
21 
22 #include <cctype>
23 #include <dlfcn.h>
24 
25 #include <boost/algorithm/string.hpp>
26 #include <boost/date_time/posix_time/posix_time.hpp>
27 #include <boost/foreach.hpp>
28 #include <boost/lexical_cast.hpp>
29 #include <boost/math/special_functions/fpclassify.hpp>
30 #include <boost/numeric/conversion/cast.hpp>
31 
32 #include "goby/acomms/modemdriver/benthos_atm900_driver.h"
33 #include "goby/acomms/modemdriver/iridium_driver.h"
34 #include "goby/acomms/modemdriver/iridium_shore_driver.h"
35 #include "goby/acomms/modemdriver/udp_driver.h"
36 #include "goby/moos/moos_bluefin_driver.h"
38 #include "goby/moos/moos_ufield_sim_driver.h"
39 #include "goby/moos/protobuf/bluefin_driver.pb.h"
40 #include "goby/moos/protobuf/frontseat.pb.h"
41 #include "goby/moos/protobuf/ufield_sim_driver.pb.h"
42 #include "goby/pb/pb_modem_driver.h"
43 #include "goby/util/sci.h"
44 #include "pAcommsHandler.h"
45 
46 using namespace goby::common::tcolor;
47 using namespace goby::common::logger;
48 using goby::acomms::operator<<;
49 using goby::moos::operator<<;
50 using goby::util::as;
51 using google::protobuf::uint32;
52 
53 using goby::glog;
54 
55 pAcommsHandlerConfig CpAcommsHandler::cfg_;
56 CpAcommsHandler* CpAcommsHandler::inst_ = 0;
57 std::map<std::string, void*> CpAcommsHandler::driver_plugins_;
58 
59 CpAcommsHandler* CpAcommsHandler::get_instance()
60 {
61  if (!inst_)
62  inst_ = new CpAcommsHandler();
63  return inst_;
64 }
65 
66 void CpAcommsHandler::delete_instance() { delete inst_; }
67 
68 CpAcommsHandler::CpAcommsHandler()
69  : GobyMOOSApp(&cfg_),
70  translator_(goby::moos::protobuf::TranslatorEntry(), cfg_.common().lat_origin(),
71  cfg_.common().lon_origin(), cfg_.modem_id_lookup_path()),
72  dccl_(goby::acomms::DCCLCodec::get()), work_(timer_io_service_), router_(0)
73 {
74 #ifdef ENABLE_GOBY_V1_TRANSITIONAL_SUPPORT
75  transitional_dccl_.convert_to_v2_representation(&cfg_);
76  glog.is(DEBUG2) && glog << group("pAcommsHandler")
77  << "Configuration after transitional configuration modifications: \n"
78  << cfg_ << std::flush;
79 #else
80  if (cfg_.has_transitional_cfg())
81  glog.is(WARN) && glog << "transitional_cfg is set but pAcommsHandler was not compiled with "
82  "the CMake flag 'enable_goby_v1_transitional_support' set to ON"
83  << std::endl;
84 #endif
85 
86  translator_.add_entry(cfg_.translator_entry());
87 
88  goby::acomms::connect(&queue_manager_.signal_receive, this,
89  &CpAcommsHandler::handle_queue_receive);
90 
91  // informational 'queue' signals
92  goby::acomms::connect(&queue_manager_.signal_ack,
93  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
94  cfg_.moos_var().queue_ack_transmission(), _2,
95  cfg_.moos_var().queue_ack_original_msg()));
96  goby::acomms::connect(&queue_manager_.signal_receive,
97  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
98  cfg_.moos_var().queue_receive(), _1, ""));
99  goby::acomms::connect(&queue_manager_.signal_expire,
100  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
101  cfg_.moos_var().queue_expire(), _1, ""));
102  goby::acomms::connect(&queue_manager_.signal_queue_size_change,
103  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
104  cfg_.moos_var().queue_size(), _1, ""));
105 
106  // informational 'mac' signals
108  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
109  cfg_.moos_var().mac_initiate_transmission(), _1, ""));
110 
112  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
113  cfg_.moos_var().mac_slot_start(), _1, ""));
114 
115  goby::acomms::connect(&queue_manager_.signal_data_on_demand, this,
116  &CpAcommsHandler::handle_encode_on_demand);
117 
118  process_configuration();
119 
120  driver_bind();
121 
122  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
123  goby::acomms::protobuf::DriverConfig*>::iterator it = drivers_.begin(),
124  end = drivers_.end();
125  it != end; ++it)
126  goby::acomms::bind(*(it->first), queue_manager_);
127 
128  if (router_)
129  {
130  bind(queue_manager_, *router_);
131  }
132 
133  // update comms cycle
134  subscribe(cfg_.moos_var().prefix() + cfg_.moos_var().mac_cycle_update(),
135  &CpAcommsHandler::handle_mac_cycle_update, this);
136 
137  subscribe(cfg_.moos_var().prefix() + cfg_.moos_var().queue_flush(),
138  &CpAcommsHandler::handle_flush_queue, this);
139 
140  subscribe(cfg_.moos_var().prefix() + cfg_.moos_var().config_file_request(),
141  &CpAcommsHandler::handle_config_file_request, this);
142 
143  subscribe(cfg_.moos_var().prefix() + cfg_.moos_var().mac_initiate_transmission(),
144  &CpAcommsHandler::handle_external_initiate_transmission, this);
145 
146  subscribe(cfg_.moos_var().prefix() + cfg_.moos_var().driver_reset(),
147  &CpAcommsHandler::handle_driver_reset, this);
148 
149  subscribe_pb(cfg_.moos_var().prefix() + cfg_.moos_var().driver_cfg_update(),
150  &CpAcommsHandler::handle_driver_cfg_update, this);
151 }
152 
153 CpAcommsHandler::~CpAcommsHandler() {}
154 
155 void CpAcommsHandler::loop()
156 {
157  timer_io_service_.poll();
158 
159  if (driver_restart_time_.size())
160  restart_drivers();
161 
162  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
163  goby::acomms::protobuf::DriverConfig*>::iterator it = drivers_.begin(),
164  end = drivers_.end();
165  it != end; ++it)
166  {
167  if (!driver_restart_time_.count(it->first))
168  {
169  try
170  {
171  it->first->do_work();
172  }
174  {
175  driver_reset(it->first, e);
176  break; // no longer valid drivers_ container
177  }
178  }
179  }
180 
181  // don't run the MAC if the primary driver is shutdown
182  if (!driver_restart_time_.count(driver_))
183  mac_.do_work();
184  queue_manager_.do_work();
185 }
186 
187 //
188 // Mail Handlers
189 //
190 
191 void CpAcommsHandler::handle_mac_cycle_update(const CMOOSMsg& msg)
192 {
194  parse_for_moos(msg.GetString(), &update_msg);
195 
196  glog << group("pAcommsHandler") << "got update for MAC: " << update_msg << std::endl;
197 
198  if (update_msg.dest() != cfg_.modem_id())
199  {
200  glog << group("pAcommsHandler") << "update not for us" << std::endl;
201  return;
202  }
203 
204  goby::acomms::MACManager::iterator it1 = mac_.begin(), it2 = mac_.begin();
205 
206  for (int i = 0, n = update_msg.first_iterator(); i < n; ++i) ++it1;
207 
208  for (int i = 0, n = update_msg.second_iterator(); i < n; ++i) ++it2;
209 
210  switch (update_msg.update_type())
211  {
212  case goby::acomms::protobuf::MACUpdate::ASSIGN:
213  mac_.assign(update_msg.slot().begin(), update_msg.slot().end());
214  break;
215 
216  case goby::acomms::protobuf::MACUpdate::PUSH_BACK:
217  for (int i = 0, n = update_msg.slot_size(); i < n; ++i)
218  mac_.push_back(update_msg.slot(i));
219  break;
220 
221  case goby::acomms::protobuf::MACUpdate::PUSH_FRONT:
222  for (int i = 0, n = update_msg.slot_size(); i < n; ++i)
223  mac_.push_front(update_msg.slot(i));
224  break;
225 
226  case goby::acomms::protobuf::MACUpdate::POP_BACK:
227  if (mac_.size())
228  mac_.pop_back();
229  else
230  glog.is(WARN) && glog << group("pAcommsHandler")
231  << "Cannot POP_BACK of empty MAC cycle" << std::endl;
232  break;
233 
234  case goby::acomms::protobuf::MACUpdate::POP_FRONT:
235  if (mac_.size())
236  mac_.pop_front();
237  else
238  glog.is(WARN) && glog << group("pAcommsHandler")
239  << "Cannot POP_FRONT of empty MAC cycle" << std::endl;
240  break;
241 
242  case goby::acomms::protobuf::MACUpdate::INSERT:
243  mac_.insert(it1, update_msg.slot().begin(), update_msg.slot().end());
244  break;
245 
246  case goby::acomms::protobuf::MACUpdate::ERASE:
247  if (update_msg.second_iterator() != -1)
248  mac_.erase(it1, it2);
249  else
250  mac_.erase(it1);
251  break;
252 
253  case goby::acomms::protobuf::MACUpdate::CLEAR: mac_.clear(); break;
254 
255  case goby::acomms::protobuf::MACUpdate::NO_CHANGE: break;
256  }
257 
258  mac_.update();
259 
260  if (update_msg.has_cycle_state())
261  {
262  switch (update_msg.cycle_state())
263  {
264  case goby::acomms::protobuf::MACUpdate::STARTED:
265  mac_.restart();
266 
267  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
269  it = drivers_.begin(),
270  end = drivers_.end();
271  it != end; ++it)
272  {
273  if (!driver_restart_time_.count(it->first))
274  {
275  goby::acomms::MMDriver* driver =
276  dynamic_cast<goby::acomms::MMDriver*>(it->first.get());
277  if (driver)
278  driver->set_silent(false);
279  }
280  }
281  break;
282 
283  case goby::acomms::protobuf::MACUpdate::STOPPED:
284  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
286  it = drivers_.begin(),
287  end = drivers_.end();
288  it != end; ++it)
289  {
290  if (!driver_restart_time_.count(it->first))
291  {
292  goby::acomms::MMDriver* driver =
293  dynamic_cast<goby::acomms::MMDriver*>(it->first.get());
294  if (driver)
295  driver->set_silent(true);
296  }
297  }
298 
299  mac_.shutdown();
300  break;
301  }
302  }
303 }
304 
305 void CpAcommsHandler::handle_flush_queue(const CMOOSMsg& msg)
306 {
308  parse_for_moos(msg.GetString(), &flush);
309 
310  glog.is(VERBOSE) && glog << group("pAcommsHandler") << "Queue flush request: " << flush
311  << std::endl;
312  queue_manager_.flush_queue(flush);
313 }
314 
315 void CpAcommsHandler::handle_config_file_request(const CMOOSMsg&)
316 {
317  publish(cfg_.moos_var().prefix() + cfg_.moos_var().config_file(),
318  dccl::b64_encode(cfg_.SerializeAsString()));
319 }
320 
321 void CpAcommsHandler::handle_driver_reset(const CMOOSMsg& msg)
322 {
323  driver_reset(driver_,
325  "Manual reset", goby::acomms::protobuf::ModemDriverStatus::MANUAL_RESET));
326 }
327 
328 void CpAcommsHandler::handle_driver_cfg_update(const goby::acomms::protobuf::DriverConfig& cfg)
329 {
330  glog.is(VERBOSE) && glog << group("pAcommsHandler") << "Driver config update request: " << cfg
331  << std::endl;
332 
333  bool driver_found = false;
334  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
335  goby::acomms::protobuf::DriverConfig*>::iterator it = drivers_.begin(),
336  end = drivers_.end();
337  it != end; ++it)
338  {
339  if (it->second->modem_id() == cfg.modem_id())
340  {
341  driver_found = true;
342  if (it->first && !driver_restart_time_.count(it->first))
343  it->first->update_cfg(cfg);
344  }
345  }
346  if (!driver_found)
347  glog.is(WARN) && glog << group("pAcommsHandler")
348  << "Could not find driver with modem id: " << cfg.modem_id()
349  << " to update";
350 }
351 
352 void CpAcommsHandler::handle_external_initiate_transmission(const CMOOSMsg& msg)
353 {
354  // don't repost our own transmissions
355  if (msg.GetSource() == CMOOSApp::GetAppName())
356  return;
357 
358  if (driver_)
359  {
361  parse_for_moos(msg.GetString(), &transmission);
362 
363  glog.is(VERBOSE) && glog << group("pAcommsHandler")
364  << "Initiating transmission: " << transmission << std::endl;
365  driver_->handle_initiate_transmission(transmission);
366  }
367 }
368 
369 void CpAcommsHandler::handle_goby_signal(const google::protobuf::Message& msg1,
370  const std::string& moos_var1,
371  const google::protobuf::Message& msg2,
372  const std::string& moos_var2)
373 
374 {
375  if (!moos_var1.empty())
376  publish_pb(cfg_.moos_var().prefix() + moos_var1, msg1);
377 
378  if (!moos_var2.empty())
379  publish_pb(cfg_.moos_var().prefix() + moos_var2, msg2);
380 }
381 
382 void CpAcommsHandler::handle_raw(const goby::acomms::protobuf::ModemRaw& msg,
383  const std::string& moos_var)
384 {
385  publish(cfg_.moos_var().prefix() + moos_var, msg.raw());
386 }
387 
388 //
389 // READ CONFIGURATION
390 //
391 
392 void CpAcommsHandler::process_configuration()
393 {
394  // create driver objects
395  create_driver(driver_, cfg_.driver_type(), cfg_.mutable_driver_cfg(), &mac_);
396  if (driver_)
397  drivers_.insert(std::make_pair(driver_, cfg_.mutable_driver_cfg()));
398 
399  // create receive only (listener) drivers
400  if (cfg_.listen_driver_type_size() == cfg_.listen_driver_cfg_size())
401  {
402  for (int i = 0, n = cfg_.listen_driver_type_size(); i < n; ++i)
403  {
404  boost::shared_ptr<goby::acomms::ModemDriverBase> driver;
405  create_driver(driver, cfg_.listen_driver_type(i), cfg_.mutable_listen_driver_cfg(i), 0);
406  drivers_.insert(std::make_pair(driver, cfg_.mutable_listen_driver_cfg(i)));
407  }
408  }
409  else
410  {
411  glog.is(DIE) && glog << "You must specify the same number of listen_driver_type fields to "
412  "correspond with the listen_driver_cfg fields."
413  << std::endl;
414  }
415 
416  if (cfg_.has_route_cfg() && cfg_.route_cfg().route().hop_size() > 0)
417  {
418  router_ = new goby::acomms::RouteManager;
419  }
420 
421  // check and propagate modem id
422  if (cfg_.modem_id() == goby::acomms::BROADCAST_ID)
423  glog.is(DIE) &&
424  glog << "modem_id = " << goby::acomms::BROADCAST_ID
425  << " is reserved for broadcast messages. You must specify a modem_id != "
426  << goby::acomms::BROADCAST_ID << " for this vehicle." << std::endl;
427 
428  publish("MODEM_ID", cfg_.modem_id());
429  publish("VEHICLE_ID", cfg_.modem_id());
430 
431  cfg_.mutable_queue_cfg()->set_modem_id(cfg_.modem_id());
432  cfg_.mutable_mac_cfg()->set_modem_id(cfg_.modem_id());
433  cfg_.mutable_transitional_cfg()->set_modem_id(cfg_.modem_id());
434 
435  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
436  goby::acomms::protobuf::DriverConfig*>::iterator it = drivers_.begin(),
437  end = drivers_.end();
438  it != end; ++it)
439  {
440  if (!it->second->has_modem_id())
441  it->second->set_modem_id(cfg_.modem_id());
442  }
443 
444  std::vector<void*> handles;
445  // load all shared libraries
446  for (int i = 0, n = cfg_.load_shared_library_size(); i < n; ++i)
447  {
448  glog.is(VERBOSE) && glog << group("pAcommsHandler")
449  << "Loading shared library: " << cfg_.load_shared_library(i)
450  << std::endl;
451 
452  void* handle =
453  goby::util::DynamicProtobufManager::load_from_shared_lib(cfg_.load_shared_library(i));
454  handles.push_back(handle);
455 
456  if (!handle)
457  {
458  glog.is(DIE) && glog << "Failed ... check path provided or add to /etc/ld.so.conf "
459  << "or LD_LIBRARY_PATH" << std::endl;
460  }
461 
462  glog << group("pAcommsHandler") << "Loading shared library dccl codecs." << std::endl;
463  }
464 
465  // set id codec before shared library load
466  dccl_->set_cfg(cfg_.dccl_cfg());
467  for (int i = 0, n = handles.size(); i < n; ++i) dccl_->load_shared_library_codecs(handles[i]);
468 
469  // load all .proto files
470  goby::util::DynamicProtobufManager::enable_compilation();
471  for (int i = 0, n = cfg_.load_proto_file_size(); i < n; ++i)
472  {
473  glog.is(VERBOSE) && glog << group("pAcommsHandler")
474  << "Loading protobuf file: " << cfg_.load_proto_file(i)
475  << std::endl;
476 
477  if (!goby::util::DynamicProtobufManager::load_from_proto_file(cfg_.load_proto_file(i)))
478  glog.is(DIE) && glog << "Failed to load file." << std::endl;
479  }
480 
481  // start goby-acomms classes
482 
483  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
484  goby::acomms::protobuf::DriverConfig*>::iterator it = drivers_.begin(),
485  end = drivers_.end();
486  it != end; ++it)
487  driver_restart_time_.insert(std::make_pair(it->first, 0));
488 
489  mac_.startup(cfg_.mac_cfg());
490  queue_manager_.set_cfg(cfg_.queue_cfg());
491  if (router_)
492  router_->set_cfg(cfg_.route_cfg());
493 
494  // process translator entries
495  for (int i = 0, n = cfg_.translator_entry_size(); i < n; ++i)
496  {
497  typedef boost::shared_ptr<google::protobuf::Message> GoogleProtobufMessagePointer;
498  glog.is(VERBOSE) && glog << group("pAcommsHandler") << "Checking translator entry: "
499  << cfg_.translator_entry(i).protobuf_name() << std::endl;
500 
501  // check that the protobuf file is loaded somehow
502  GoogleProtobufMessagePointer msg =
503  goby::util::DynamicProtobufManager::new_protobuf_message<GoogleProtobufMessagePointer>(
504  cfg_.translator_entry(i).protobuf_name());
505 
506  if (cfg_.translator_entry(i).trigger().type() ==
507  goby::moos::protobuf::TranslatorEntry::Trigger::TRIGGER_PUBLISH)
508  {
509  // subscribe for trigger publish variables
510  GobyMOOSApp::subscribe(cfg_.translator_entry(i).trigger().moos_var(),
511  boost::bind(&CpAcommsHandler::create_on_publish, this, _1,
512  cfg_.translator_entry(i)));
513  }
514  else if (cfg_.translator_entry(i).trigger().type() ==
515  goby::moos::protobuf::TranslatorEntry::Trigger::TRIGGER_TIME)
516  {
517  timers_.push_back(boost::shared_ptr<Timer>(new Timer(timer_io_service_)));
518 
519  Timer& new_timer = *timers_.back();
520 
521  new_timer.expires_from_now(
522  boost::posix_time::seconds(cfg_.translator_entry(i).trigger().period()));
523  // Start an asynchronous wait.
524  new_timer.async_wait(boost::bind(&CpAcommsHandler::create_on_timer, this, _1,
525  cfg_.translator_entry(i), &new_timer));
526  }
527 
528  for (int j = 0, m = cfg_.translator_entry(i).create_size(); j < m; ++j)
529  {
530  // subscribe for all create variables
531  subscribe(cfg_.translator_entry(i).create(j).moos_var());
532  }
533  }
534 
535  for (int i = 0, m = cfg_.multiplex_create_moos_var_size(); i < m; ++i)
536  {
537  GobyMOOSApp::subscribe(cfg_.multiplex_create_moos_var(i),
538  &CpAcommsHandler::create_on_multiplex_publish, this);
539  }
540 
541  for (int i = 0, n = cfg_.dccl_frontseat_forward_name_size(); i < n; ++i)
542  {
543  const google::protobuf::Descriptor* desc =
544  goby::util::DynamicProtobufManager::find_descriptor(
545  cfg_.dccl_frontseat_forward_name(i));
546  if (desc)
547  {
548  dccl_frontseat_forward_.insert(desc);
549  }
550  else
551  {
552  glog.is(DIE) && glog << "Invalid message name given to dccl_frontseat_forward_name: "
553  << cfg_.dccl_frontseat_forward_name(i) << std::endl;
554  }
555  }
556 }
557 
558 void CpAcommsHandler::create_driver(boost::shared_ptr<goby::acomms::ModemDriverBase>& driver,
559  goby::acomms::protobuf::DriverType driver_type,
562 {
563  if (driver_cfg->has_driver_name())
564  {
565  std::map<std::string, void*>::const_iterator driver_it =
566  driver_plugins_.find(driver_cfg->driver_name());
567 
568  if (driver_it == driver_plugins_.end())
569  glog.is(DIE) && glog << "Could not find driver_plugin_name '"
570  << driver_cfg->driver_name()
571  << "'. Make sure it is loaded using the PACOMMSHANDLER_PLUGINS "
572  "environmental var"
573  << std::endl;
574  else
575  {
576  goby::acomms::ModemDriverBase* (*driver_function)(void) =
577  (goby::acomms::ModemDriverBase * (*)(void))
578  dlsym(driver_it->second, "goby_make_driver");
579 
580  if (!driver_function)
581  {
582  glog.is(DIE) && glog << "Could not load goby::acomms::ModemDriverBase* "
583  "goby_make_driver() for driver name '"
584  << driver_cfg->driver_name() << "'." << std::endl;
585  }
586  else
587  {
588  driver.reset((*driver_function)());
589  }
590  }
591  }
592  else
593  {
594  switch (driver_type)
595  {
596  case goby::acomms::protobuf::DRIVER_WHOI_MICROMODEM:
597  driver.reset(new goby::acomms::MMDriver);
598  break;
599 
600  case goby::acomms::protobuf::DRIVER_BENTHOS_ATM900:
601  driver.reset(new goby::acomms::BenthosATM900Driver);
602  break;
603 
604  case goby::acomms::protobuf::DRIVER_ABC_EXAMPLE_MODEM:
605  driver.reset(new goby::acomms::ABCDriver);
606  break;
607 
608  case goby::acomms::protobuf::DRIVER_UFIELD_SIM_DRIVER:
609  driver.reset(new goby::moos::UFldDriver);
610  driver_cfg->SetExtension(goby::moos::protobuf::Config::modem_id_lookup_path,
611  cfg_.modem_id_lookup_path());
612  break;
613 
614  case goby::acomms::protobuf::DRIVER_PB_STORE_SERVER:
615  zeromq_service_.push_back(boost::shared_ptr<goby::common::ZeroMQService>(
617  driver.reset(new goby::pb::PBDriver(zeromq_service_.back().get()));
618  break;
619 
620  case goby::acomms::protobuf::DRIVER_IRIDIUM:
621  driver.reset(new goby::acomms::IridiumDriver);
622  break;
623 
624  case goby::acomms::protobuf::DRIVER_UDP:
625  asio_service_.push_back(
626  boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service));
627  driver.reset(new goby::acomms::UDPDriver(asio_service_.back().get()));
628  break;
629 
630  case goby::acomms::protobuf::DRIVER_BLUEFIN_MOOS:
631  driver.reset(new goby::moos::BluefinCommsDriver(mac));
632  driver_cfg->SetExtension(goby::moos::protobuf::BluefinConfig::moos_server,
633  cfg_.common().server_host());
634  driver_cfg->SetExtension(goby::moos::protobuf::BluefinConfig::moos_port,
635  cfg_.common().server_port());
636  break;
637 
638  case goby::acomms::protobuf::DRIVER_IRIDIUM_SHORE:
639  driver.reset(new goby::acomms::IridiumShoreDriver);
640  break;
641 
642  case goby::acomms::protobuf::DRIVER_NONE: break;
643  }
644  }
645 }
646 
647 void CpAcommsHandler::handle_queue_receive(const google::protobuf::Message& msg)
648 {
649  try
650  {
651  std::multimap<std::string, CMOOSMsg> out;
652 
653  out = translator_.protobuf_to_moos(msg);
654 
655  for (std::multimap<std::string, CMOOSMsg>::iterator it = out.begin(), n = out.end();
656  it != n; ++it)
657  {
658  glog.is(DEBUG2) && glog << group("pAcommsHandler") << "Publishing: " << it->second
659  << std::endl;
660  publish(it->second);
661  }
662  }
663  catch (std::runtime_error& e)
664  {
665  glog.is(WARN) && glog << group("pAcommsHandler")
666  << "Failed to translate received message: " << e.what() << std::endl;
667  }
668 
669  // forward to frontseat driver
670  if (dccl_frontseat_forward_.count(msg.GetDescriptor()))
671  {
673  dccl_->encode(fs_data.mutable_dccl_message(), msg);
674  publish_pb(cfg_.moos_var().ifrontseat_data_out(), fs_data);
675  }
676 
677  // handle various commands
678 
679  if (router_ && msg.GetDescriptor() == goby::acomms::protobuf::RouteCommand::descriptor())
680  {
682  route_cmd.CopyFrom(msg);
683  glog.is(VERBOSE) && glog << group("pAcommsHandler")
684  << "Received RouteCommand: " << msg.DebugString() << std::endl;
685  goby::acomms::protobuf::RouteManagerConfig cfg = cfg_.route_cfg();
686  cfg.mutable_route()->CopyFrom(route_cmd.new_route());
687  router_->set_cfg(cfg);
688  }
689 }
690 
691 void CpAcommsHandler::handle_encode_on_demand(
693  google::protobuf::Message* data_msg)
694 {
695  glog.is(VERBOSE) && glog << group("pAcommsHandler")
696  << "Received encode on demand request: " << request_msg << std::endl;
697 
698  boost::shared_ptr<google::protobuf::Message> created_message =
699  translator_.moos_to_protobuf<boost::shared_ptr<google::protobuf::Message> >(
700  dynamic_vars().all(), data_msg->GetDescriptor()->full_name());
701 
702  data_msg->CopyFrom(*created_message);
703 }
704 
705 void CpAcommsHandler::create_on_publish(const CMOOSMsg& trigger_msg,
707 {
708  glog.is(DEBUG2) && glog << group("pAcommsHandler")
709  << "Received trigger: " << trigger_msg.GetKey() << std::endl;
710 
711  if (!entry.trigger().has_mandatory_content() ||
712  trigger_msg.GetString().find(entry.trigger().mandatory_content()) != std::string::npos)
713  translate_and_push(entry);
714  else
715  glog.is(DEBUG2) &&
716  glog << group("pAcommsHandler")
717  << "Message missing mandatory content for: " << entry.protobuf_name() << std::endl;
718 }
719 
720 void CpAcommsHandler::create_on_multiplex_publish(const CMOOSMsg& moos_msg)
721 {
722  boost::shared_ptr<google::protobuf::Message> msg = dynamic_parse_for_moos(moos_msg.GetString());
723 
724  if (!msg)
725  {
726  glog.is(WARN) && glog << group("pAcommsHandler")
727  << "Multiplex receive failed: Unknown Protobuf type for "
728  << moos_msg.GetString()
729  << "; be sure it is compiled in or directly loaded into the "
730  "goby::util::DynamicProtobufManager."
731  << std::endl;
732  return;
733  }
734 
735  std::multimap<std::string, CMOOSMsg> out;
736 
737  try
738  {
739  out = translator_.protobuf_to_inverse_moos(*msg);
740 
741  for (std::multimap<std::string, CMOOSMsg>::iterator it = out.begin(), n = out.end();
742  it != n; ++it)
743  {
744  glog.is(VERBOSE) && glog << group("pAcommsHandler")
745  << "Inverse Publishing: " << it->second.GetKey() << std::endl;
746  publish(it->second);
747  }
748  }
749  catch (std::exception& e)
750  {
751  glog.is(WARN) && glog << group("pAcommsHandler")
752  << "Failed to inverse publish: " << e.what() << std::endl;
753  }
754 }
755 
756 void CpAcommsHandler::create_on_timer(const boost::system::error_code& error,
758  Timer* timer)
759 {
760  if (!error)
761  {
762  double skew_seconds = std::abs(goby::common::goby_time<double>() -
763  goby::util::as<double>(timer->expires_at()));
764  if (skew_seconds > ALLOWED_TIMER_SKEW_SECONDS)
765  {
766  glog.is(VERBOSE) && glog << group("pAcommsHandler") << warn << "clock skew of "
767  << skew_seconds << " seconds detected, resetting timer."
768  << std::endl;
769  timer->expires_at(
771  boost::posix_time::seconds(boost::posix_time::seconds(entry.trigger().period())));
772  }
773  else
774  {
775  // reset the timer
776  timer->expires_at(timer->expires_at() +
777  boost::posix_time::seconds(entry.trigger().period()));
778  }
779 
780  timer->async_wait(boost::bind(&CpAcommsHandler::create_on_timer, this, _1, entry, timer));
781 
782  glog.is(DEBUG2) && glog << group("pAcommsHandler")
783  << "Received trigger for: " << entry.protobuf_name() << std::endl;
784  glog.is(DEBUG2) && glog << group("pAcommsHandler") << "Next expiry: " << timer->expires_at()
785  << std::endl;
786 
787  translate_and_push(entry);
788  }
789 }
790 
791 void CpAcommsHandler::translate_and_push(const goby::moos::protobuf::TranslatorEntry& entry)
792 {
793  try
794  {
795  boost::shared_ptr<google::protobuf::Message> created_message =
796  translator_.moos_to_protobuf<boost::shared_ptr<google::protobuf::Message> >(
797  dynamic_vars().all(), entry.protobuf_name());
798 
799  glog.is(DEBUG2) && glog << group("pAcommsHandler") << "Created message: \n"
800  << created_message->DebugString() << std::endl;
801 
802  queue_manager_.push_message(*created_message);
803  }
804  catch (std::runtime_error& e)
805  {
806  glog.is(WARN) && glog << group("pAcommsHandler")
807  << "Failed to translate or queue message: " << e.what() << std::endl;
808  }
809 }
810 
811 void CpAcommsHandler::driver_reset(
812  boost::shared_ptr<goby::acomms::ModemDriverBase> driver,
814  pAcommsHandlerConfig::DriverFailureApproach::DriverFailureTechnique
815  technique /* = cfg_.driver_failure_approach().technique() */)
816 {
817  glog.is(WARN) && glog << group("pAcommsHandler") << "Driver exception: " << e.what()
818  << std::endl;
819  glog.is(WARN) && glog << group("pAcommsHandler") << "Shutting down driver: " << driver
820  << std::endl;
821  driver->shutdown();
822 
823  switch (technique)
824  {
825  case pAcommsHandlerConfig::DriverFailureApproach::DISABLE_AND_MOVE_LISTEN_DRIVER_TO_PRIMARY:
826  case pAcommsHandlerConfig::DriverFailureApproach::MOVE_LISTEN_DRIVER_TO_PRIMARY:
827  {
828  if (driver == driver_)
829  {
830  glog.is(WARN) && glog << group("pAcommsHandler")
831  << "Now using listen driver as new primary." << std::endl;
832  // unbind signals to old driver
833  driver_unbind();
834 
835  if (drivers_.size() == 1)
836  {
837  glog.is(DIE) && glog << "No more drivers to try..." << std::endl;
838  }
839  else
840  {
841  std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
842  goby::acomms::protobuf::DriverConfig*>::iterator old_it =
843  drivers_.find(driver);
844  std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>,
845  goby::acomms::protobuf::DriverConfig*>::iterator new_it = old_it;
846 
847  // try the next one after the current driver_, otherwise the first driver
848  ++new_it;
849  if (new_it == drivers_.end())
850  new_it = drivers_.begin();
851 
852  // new primary driver_
853  driver_ = new_it->first;
854  if (!driver_restart_time_.count(driver_))
855  driver_->shutdown();
856 
857  goby::acomms::protobuf::DriverConfig& new_config = *(new_it->second);
858  goby::acomms::protobuf::DriverConfig& old_config = *(old_it->second);
859 
860  // swap the modem ids
861  int new_id = old_config.modem_id();
862  old_config.set_modem_id(new_config.modem_id());
863  new_config.set_modem_id(new_id);
864 
865  // bind the correct signals
866  driver_bind();
867 
868  // restart the new primary driver
869  driver_restart_time_.insert(std::make_pair(driver_, 0));
870  }
871  }
872 
874  DISABLE_AND_MOVE_LISTEN_DRIVER_TO_PRIMARY)
875  {
876  // erase old driver
877  drivers_.erase(driver);
878  driver_restart_time_.erase(driver);
879  break;
880  }
881  }
882  // fall through intentional (no break) - want to restart old driver if MOVE_LISTEN_DRIVER_TO_PRIMARY
883  case pAcommsHandlerConfig::DriverFailureApproach::CONTINUALLY_RESTART_DRIVER:
884  {
885  glog.is(WARN) && glog << group("pAcommsHandler") << "Attempting to restart driver in "
886  << cfg_.driver_failure_approach().driver_backoff_sec()
887  << " seconds." << std::endl;
888  driver_restart_time_.insert(
889  std::make_pair(driver, goby::common::goby_time<double>() +
890  cfg_.driver_failure_approach().driver_backoff_sec()));
891  }
892  break;
893  }
894 }
895 
896 void CpAcommsHandler::restart_drivers()
897 {
898  double now = goby::common::goby_time<double>();
899  std::set<boost::shared_ptr<goby::acomms::ModemDriverBase> > drivers_to_start;
900 
901  for (std::map<boost::shared_ptr<goby::acomms::ModemDriverBase>, double>::iterator it =
902  driver_restart_time_.begin();
903  it != driver_restart_time_.end();)
904  {
905  if (it->second < now)
906  {
907  drivers_to_start.insert(it->first);
908  driver_restart_time_.erase(it++);
909  }
910  else
911  {
912  ++it;
913  }
914  }
915 
916  for (std::set<boost::shared_ptr<goby::acomms::ModemDriverBase> >::iterator
917  it = drivers_to_start.begin(),
918  end = drivers_to_start.end();
919  it != end; ++it)
920  {
921  boost::shared_ptr<goby::acomms::ModemDriverBase> driver = *it;
922  try
923  {
924  glog.is(DEBUG1) && glog << "Starting up driver: " << driver << std::endl;
925  driver->startup(*drivers_[driver]);
926  }
928  {
929  driver_reset(driver, e);
930  }
931  }
932 }
933 
934 void CpAcommsHandler::driver_bind()
935 {
936  // bind the lower level pieces of goby-acomms together
937  if (driver_)
938  {
939  goby::acomms::bind(mac_, *driver_);
940 
941  // informational 'driver' signals
942  goby::acomms::connect(&driver_->signal_receive,
943  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
944  cfg_.moos_var().driver_receive(), _1, ""));
945 
946  goby::acomms::connect(&driver_->signal_transmit_result,
947  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
948  cfg_.moos_var().driver_transmit(), _1, ""));
949 
950  goby::acomms::connect(&driver_->signal_raw_incoming,
951  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
952  cfg_.moos_var().driver_raw_msg_in(), _1, ""));
953  goby::acomms::connect(&driver_->signal_raw_outgoing,
954  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
955  cfg_.moos_var().driver_raw_msg_out(), _1, ""));
956 
958  &driver_->signal_raw_incoming,
959  boost::bind(&CpAcommsHandler::handle_raw, this, _1, cfg_.moos_var().driver_raw_in()));
960 
962  &driver_->signal_raw_outgoing,
963  boost::bind(&CpAcommsHandler::handle_raw, this, _1, cfg_.moos_var().driver_raw_out()));
964  }
965 }
966 
967 void CpAcommsHandler::driver_unbind()
968 {
969  // unbind the lower level pieces of goby-acomms together
970  if (driver_)
971  {
972  goby::acomms::unbind(mac_, *driver_);
973 
974  // informational 'driver' signals
975  goby::acomms::disconnect(&driver_->signal_receive,
976  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
977  cfg_.moos_var().driver_receive(), _1, ""));
978 
979  goby::acomms::disconnect(&driver_->signal_transmit_result,
980  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
981  cfg_.moos_var().driver_transmit(), _1, ""));
982 
983  goby::acomms::disconnect(&driver_->signal_raw_incoming,
984  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
985  cfg_.moos_var().driver_raw_msg_in(), _1, ""));
986  goby::acomms::disconnect(&driver_->signal_raw_outgoing,
987  boost::bind(&CpAcommsHandler::handle_goby_signal, this, _1,
988  cfg_.moos_var().driver_raw_msg_out(), _1, ""));
989 
991  &driver_->signal_raw_incoming,
992  boost::bind(&CpAcommsHandler::handle_raw, this, _1, cfg_.moos_var().driver_raw_in()));
993 
995  &driver_->signal_raw_outgoing,
996  boost::bind(&CpAcommsHandler::handle_raw, this, _1, cfg_.moos_var().driver_raw_out()));
997  }
998 }
Contains functions for adding color to Terminal window streams.
Definition: term_color.h:54
void startup(const protobuf::MACConfig &cfg)
Starts the MAC with given configuration.
Definition: mac_manager.cpp:65
provides an API to the imaginary ABC modem (as an example how to write drivers)
Definition: abc_driver.h:39
void do_work()
Allows the MAC timer to do its work. Does not block. If you prefer more control you can directly cont...
Definition: mac_manager.h:76
provides a driver for the Bluefin Huxley communications infrastructure (initially uses SonarDyne as u...
void parse_for_moos(const std::string &in, google::protobuf::Message *msg)
Parses the string in to Google Protocol Buffers message msg. All errors are written to the goby::util...
Helpers for MOOS applications for serializing and parsed Google Protocol buffers messages.
provides an API to the WHOI Micro-Modem driver
Definition: mm_driver.h:41
double goby_time< double >()
Returns current UTC time as seconds and fractional seconds since 1970-01-01 00:00:00.
Definition: time.h:130
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
Definition: time.h:104
const int BROADCAST_ID
special modem id for the broadcast destination - no one is assigned this address. Analogous to 192...
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
Definition: mac_manager.h:51
void shutdown()
Shutdown the MAC.
void connect(Signal *signal, Slot slot)
connect a signal to a slot (e.g. function pointer)
Definition: connect.h:36
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
provides an simulator driver to the uFldNodeComms MOOS module: http://oceanai.mit.edu/moos-ivp/pmwiki/pmwiki.php?n=Modules.UFldNodeComms
boost::signals2::signal< void(const protobuf::ModemTransmission &m)> signal_slot_start
Signals the start of all transmissions (even when we don&#39;t transmit)
Definition: mac_manager.h:97
provides an abstract base class for acoustic modem drivers. This is subclassed by the various drivers...
Definition: driver_base.h:47
void unbind(ModemDriverBase &driver, QueueManager &queue_manager)
unbinds the driver link-layer callbacks to the QueueManager
Definition: bind.h:76
void restart()
Restarts the MAC with original configuration.
Definition: mac_manager.cpp:97
void update()
You must call this after any change to the underlying list that would invalidate iterators or change ...
boost::signals2::signal< void(const protobuf::ModemTransmission &m)> signal_initiate_transmission
Signals when it is time for this platform to begin transmission of an acoustic message at the start o...
Definition: mac_manager.h:91
void bind(ModemDriverBase &driver, QueueManager &queue_manager)
binds the driver link-layer callbacks to the QueueManager
Definition: bind.h:43
void disconnect(Signal *signal, Slot slot)
disconnect a signal to a slot (e.g. function pointer)
Definition: connect.h:63