Goby3  3.1.4
2024.02.22
intervehicle.h
Go to the documentation of this file.
1 // Copyright 2016-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
26 
27 #include <atomic>
28 #include <functional>
29 #include <sys/types.h>
30 #include <thread>
31 #include <unistd.h>
32 
33 #include <google/protobuf/io/zero_copy_stream_impl.h>
34 
36 
38 #include "goby/middleware/transport/interthread.h" // used for InterVehiclePortal implementation
42 
43 namespace goby
44 {
45 namespace middleware
46 {
48 {
49  public:
50  InvalidSubscription(const std::string& e) : Exception(e) {}
51 };
52 
54 {
55  public:
56  InvalidPublication(const std::string& e) : Exception(e) {}
57 };
58 
60 {
61  public:
62  InvalidUnsubscription(const std::string& e) : Exception(e) {}
63 };
64 
69 template <typename Derived, typename InnerTransporter>
71  : public StaticTransporterInterface<InterVehicleTransporterBase<Derived, InnerTransporter>,
72  InnerTransporter>,
73  public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
74 {
75  using InterfaceType =
77  InnerTransporter>;
78 
80 
81  public:
82  enum class SubscriptionAction
83  {
84  SUBSCRIBE,
86  };
87 
89  : InterfaceType(inner), PollerType(&this->inner())
90  {
91  // handle request from Portal to omit or include metadata on future publications for a given data type
92  this->inner()
95  [this](const protobuf::SerializerMetadataRequest& request)
96  {
97  glog.is_debug3() && glog << "Received DCCL metadata request: "
98  << request.ShortDebugString() << std::endl;
99 
100  switch (request.request())
101  {
103  omit_publish_metadata_.erase(request.key().type());
104  break;
106  omit_publish_metadata_.insert(request.key().type());
107  break;
108  }
109  });
110  }
112 
113  virtual ~InterVehicleTransporterBase() = default;
114 
116  template <typename Data> static constexpr int scheme()
117  {
120  "Can only use DCCL messages with InterVehicleTransporters");
122  }
123 
127  template <const Group& group> void check_validity()
128  {
129  static_assert(group.numeric() != Group::invalid_numeric_group,
130  "goby::middleware::Group must have non-zero numeric "
131  "value to publish on the InterVehicle layer");
132  }
133 
141  template <typename Data, int scheme = goby::middleware::scheme<Data>()>
142  void publish_dynamic(const Data& data, const Group& group = Group(),
143  const Publisher<Data>& publisher = Publisher<Data>())
144  {
145  static_assert(scheme == MarshallingScheme::DCCL,
146  "Can only use DCCL messages with InterVehicleTransporters");
147 
148  Data data_with_group = data;
149  publisher.set_group(data_with_group, group);
150 
151  static_cast<Derived*>(this)->template _publish<Data>(data_with_group, group, publisher);
152  // publish to interprocess as both DCCL and Protobuf
153  this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
154  group, publisher);
155  this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
156  group, publisher);
157  }
158 
166  template <typename Data, int scheme = goby::middleware::scheme<Data>()>
167  void publish_dynamic(std::shared_ptr<const Data> data, const Group& group = Group(),
168  const Publisher<Data>& publisher = Publisher<Data>())
169  {
170  static_assert(scheme == MarshallingScheme::DCCL,
171  "Can only use DCCL messages with InterVehicleTransporters");
172  if (data)
173  {
174  // copy this way as it allows us to copy Data == google::protobuf::Message abstract base class
175  std::shared_ptr<Data> data_with_group(data->New());
176  data_with_group->CopyFrom(*data);
177 
178  publisher.set_group(*data_with_group, group);
179 
180  static_cast<Derived*>(this)->template _publish<Data>(*data_with_group, group,
181  publisher);
182 
183  // publish to interprocess as both DCCL and Protobuf
184  this->inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
185  group, publisher);
186  this->inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
187  data_with_group, group, publisher);
188  }
189  }
190 
198  template <typename Data, int scheme = goby::middleware::scheme<Data>()>
199  void publish_dynamic(std::shared_ptr<Data> data, const Group& group = Group(),
200  const Publisher<Data>& publisher = Publisher<Data>())
201  {
202  publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data), group, publisher);
203  }
204 
212  template <typename Data, int scheme = goby::middleware::scheme<Data>()>
213  void subscribe_dynamic(std::function<void(const Data&)> f, const Group& group = Group(),
214  const Subscriber<Data>& subscriber = Subscriber<Data>())
215  {
216  static_assert(scheme == MarshallingScheme::DCCL,
217  "Can only use DCCL messages with InterVehicleTransporters");
218  auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
219  static_cast<Derived*>(this)->template _subscribe<Data>(
220  pointer_ref_lambda, group, subscriber, SubscriptionAction::SUBSCRIBE);
221  }
222 
230  template <typename Data, int scheme = goby::middleware::scheme<Data>()>
231  void subscribe_dynamic(std::function<void(std::shared_ptr<const Data>)> f,
232  const Group& group = Group(),
233  const Subscriber<Data>& subscriber = Subscriber<Data>())
234  {
235  static_assert(scheme == MarshallingScheme::DCCL,
236  "Can only use DCCL messages with InterVehicleTransporters");
237  static_cast<Derived*>(this)->template _subscribe<Data>(f, group, subscriber,
239  }
240 
247  template <typename Data, int scheme = goby::middleware::scheme<Data>()>
249  const Subscriber<Data>& subscriber = Subscriber<Data>())
250  {
251  static_assert(scheme == MarshallingScheme::DCCL,
252  "Can only use DCCL messages with InterVehicleTransporters");
253  static_cast<Derived*>(this)->template _subscribe<Data>(
254  std::function<void(std::shared_ptr<const Data>)>(), group, subscriber,
256  }
257 
258  protected:
259  template <typename Data>
260  std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
261  _set_up_publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
262  {
263  if (group.numeric() != Group::broadcast_group && !publisher.has_set_group_func())
264  {
265  std::stringstream ss;
266  ss << "Error: Publisher must have set_group_func in order to publish to a "
267  "non-broadcast Group ("
268  << group
269  << "). The set_group_func modifies the contents of the outgoing message to store "
270  "the group information.";
271  throw(InvalidPublication(ss.str()));
272  }
273 
274  auto data = intervehicle::serialize_publication(d, group, publisher);
275 
276  if (publisher.cfg().intervehicle().buffer().ack_required())
277  {
278  auto ack_handler = std::make_shared<
280  publisher.acked_func(), d);
281 
282  auto expire_handler =
283  std::make_shared<PublisherCallback<Data, MarshallingScheme::DCCL,
285  publisher.expired_func(), d);
286 
288  data, ack_handler, expire_handler);
289  }
290 
291  if (!omit_publish_metadata_.count(data->key().type()))
292  _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
293 
294  goby::glog.is_debug3() &&
295  goby::glog << "Set up publishing for: " << data->ShortDebugString() << std::endl;
296 
297  return data;
298  }
299 
300  template <typename Data>
301  std::shared_ptr<intervehicle::protobuf::Subscription>
302  _set_up_subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
303  const Subscriber<Data>& subscriber, SubscriptionAction action)
304  {
306 
307  switch (action)
308  {
310  {
311  if (group.numeric() != Group::broadcast_group && !subscriber.has_group_func())
312  {
313  std::stringstream ss;
314  ss << "Error: Subscriber must have group_func in order to subscribe to "
315  "non-broadcast Group ("
316  << group
317  << "). The group_func returns the appropriate Group based on the contents "
318  "of the incoming message.";
319  throw(InvalidSubscription(ss.str()));
320  }
321 
322  if (subscriber.cfg().intervehicle().broadcast() &&
323  subscriber.cfg().intervehicle().buffer().ack_required())
324  {
325  std::stringstream ss;
326  ss << "Error: Broadcast subscriptions cannot have ack_required: true";
327  throw(InvalidSubscription(ss.str()));
328  }
329 
330  auto subscription = std::make_shared<
332  func, group, subscriber);
333 
334  this->subscriptions_[dccl_id][group] = subscription;
335  }
336  break;
338  {
339  auto sub_it = this->subscriptions_[dccl_id].find(group);
340  if (sub_it != this->subscriptions_[dccl_id].end())
341  {
342  this->subscriptions_[dccl_id].erase(sub_it);
343  }
344  else
345  {
346  std::stringstream ss;
347  ss << "Cannot unsubscribe to DCCL id: " << dccl_id
348  << " and group: " << std::string(group) << " as no subscription was found.";
349  throw(InvalidUnsubscription(ss.str()));
350  }
351  }
352  break;
353  }
354 
355  auto dccl_subscription =
356  this->template _serialize_subscription<Data>(group, subscriber, action);
358  // insert pending subscription
359  auto subscription_publication = intervehicle::serialize_publication(
360  *dccl_subscription, intervehicle::groups::subscription_forward,
362 
363  // overwrite timestamps to ensure mapping with driver threads
364  auto subscribe_time = dccl_subscription->time_with_units();
365  subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
366 
367  auto ack_handler = std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
369  subscriber.subscribed_func());
370 
371  auto expire_handler =
372  std::make_shared<PublisherCallback<Subscription, MarshallingScheme::DCCL,
374  subscriber.subscribe_expired_func());
375 
376  goby::glog.is_debug1() && goby::glog << "Inserting subscription ack handler for "
377  << subscription_publication->ShortDebugString()
378  << std::endl;
379 
380  this->pending_ack_.insert(std::make_pair(*subscription_publication,
381  std::make_tuple(ack_handler, expire_handler)));
382 
383  return dccl_subscription;
384  }
385 
386  template <int tuple_index, typename AckorExpirePair>
387  void _handle_ack_or_expire(const AckorExpirePair& ack_or_expire_pair)
388  {
389  auto original = ack_or_expire_pair.serializer();
390  const auto& ack_or_expire_msg = ack_or_expire_pair.data();
391  bool is_subscription = original.key().marshalling_scheme() == MarshallingScheme::DCCL &&
392  original.key().type() ==
394 
395  if (is_subscription)
396  {
397  // rewrite data to remove src()
398  auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
399  decltype(bytes_begin) actual_end;
400 
403  auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
404  subscription->mutable_header()->set_src(0);
405 
406  std::vector<char> bytes(Helper::serialize(*subscription));
407  std::string* sbytes = new std::string(bytes.begin(), bytes.end());
408  original.set_allocated_data(sbytes);
409  }
410 
411  auto it = pending_ack_.find(original);
412  if (it != pending_ack_.end())
413  {
414  goby::glog.is_debug3() && goby::glog << ack_or_expire_msg.GetDescriptor()->name()
415  << " for: " << original.ShortDebugString() << ", "
416  << ack_or_expire_msg.ShortDebugString()
417  << std::endl;
418 
419  std::get<tuple_index>(it->second)
420  ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
421  }
422  else
423  {
424  goby::glog.is_debug3() && goby::glog << "No pending Ack/Expire for "
425  << (is_subscription ? "subscription: " : "data: ")
426  << original.ShortDebugString() << std::endl;
427  }
428  }
429 
431  {
432  goby::glog.is_debug3() && goby::glog << "Received DCCLForwarded data: "
433  << packets.ShortDebugString() << std::endl;
434 
435  for (const auto& packet : packets.frame())
436  {
437  for (auto p : this->subscriptions_[packet.dccl_id()])
438  p.second->post(packet.data().begin(), packet.data().end(), packets.header());
439  }
440  }
441 
442  template <typename Data>
443  std::shared_ptr<intervehicle::protobuf::Subscription>
445  SubscriptionAction action)
446  {
448  auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
449  dccl_subscription->mutable_header()->set_src(0);
450 
451  for (auto id : subscriber.cfg().intervehicle().publisher_id())
452  dccl_subscription->mutable_header()->add_dest(id);
453 
454  dccl_subscription->set_api_version(GOBY_INTERVEHICLE_API_VERSION);
455  dccl_subscription->set_dccl_id(dccl_id);
456  dccl_subscription->set_group(group.numeric());
457  dccl_subscription->set_time_with_units(
458  goby::time::SystemClock::now<goby::time::MicroTime>());
459  dccl_subscription->set_action((action == SubscriptionAction::SUBSCRIBE)
462 
463  _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
464  *dccl_subscription->mutable_intervehicle() = subscriber.cfg().intervehicle();
465  return dccl_subscription;
466  }
467 
469  int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
472  expire_handler)
473  {
474  goby::glog.is_debug3() && goby::glog << "Inserting ack handler for "
475  << data->ShortDebugString() << std::endl;
476 
477  this->pending_ack_.insert(
478  std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
479  }
480 
481  protected:
482  // maps DCCL ID to map of Group->subscription
483  // only one subscription allowed per IntervehicleForwarder/Portal (new subscription overwrites old one)
484  std::unordered_map<
485  int, std::unordered_map<std::string, std::shared_ptr<const SerializationHandlerBase<
488 
489  private:
490  friend PollerType;
491  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
492  {
493  _expire_pending_ack();
494 
495  return static_cast<Derived*>(this)->_poll(lock);
496  }
497 
498  template <typename Data> void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta)
499  {
501  _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
502  }
503 
504  template <typename Data>
505  void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta, const Data& d)
506  {
507  meta->set_protobuf_name(
509  _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
510  }
511 
512  // used to populated InterVehicleSubscription file_descriptor fields
513  void _insert_file_desc_with_dependencies(const google::protobuf::FileDescriptor* file_desc,
514  protobuf::SerializerProtobufMetadata* meta)
515  {
516  for (int i = 0, n = file_desc->dependency_count(); i < n; ++i)
517  _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
518 
519  google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
520  file_desc->CopyTo(file_desc_proto);
521  }
522 
523  // expire any pending_ack entries that are no longer relevant
524  void _expire_pending_ack()
525  {
526  auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
527  for (auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
528  {
530  ->FindFieldByName("ttl")
531  ->options()
532  .GetExtension(dccl::field)
533  .max() *
535 
536  decltype(now) serialize_time(it->first.key().serialize_time_with_units());
537  decltype(now) expire_time(serialize_time + max_ttl);
538 
539  // time to let any expire messages from the drivers propagate through the interprocess layer before we remove this
540  const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
541 
542  // loop through pending ack, and clear any at the front that can be removed
543 
544  if (now > expire_time + interprocess_wait)
545  {
546  goby::glog.is_debug3() && goby::glog << "Erasing pending ack for "
547  << it->first.ShortDebugString() << std::endl;
548  it = pending_ack_.erase(it);
549  }
550  else
551  {
552  // pending_ack_ is ordered by serialize time, so we can bail now
553  break;
554  }
555  }
556  }
557 
558  private:
559  // maps data with ack_requested onto callbacks for when the data are acknowledged or expire
560  // ordered by serialize time
561  std::map<
562  protobuf::SerializerTransporterMessage,
563  std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
564  std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
565  pending_ack_;
566 
567  // map of Protobuf names where we can omit metadata on publication
568  std::set<std::string> omit_publish_metadata_;
569 };
570 
575 template <typename InnerTransporter>
577  : public InterVehicleTransporterBase<InterVehicleForwarder<InnerTransporter>, InnerTransporter>
578 {
579  public:
580  using Base =
582 
586  InterVehicleForwarder(InnerTransporter& inner) : Base(inner)
587  {
588  this->inner()
592  { this->_receive(msg); });
593 
594  using ack_pair_type = intervehicle::protobuf::AckMessagePair;
595  this->inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
596  [this](const ack_pair_type& ack_pair)
597  { this->template _handle_ack_or_expire<0>(ack_pair); });
598 
599  using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
600  this->inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
601  [this](const expire_pair_type& expire_pair)
602  { this->template _handle_ack_or_expire<1>(expire_pair); });
603  }
604 
605  virtual ~InterVehicleForwarder() = default;
606 
607  friend Base;
608 
609  private:
610  template <typename Data>
611  void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
612  {
613  this->inner().template publish<intervehicle::groups::modem_data_out>(
614  this->_set_up_publish(d, group, publisher));
615  }
616 
617  template <typename Data>
618  void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
619  const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
620  {
621  try
622  {
623  this->inner()
627  this->_set_up_subscribe(func, group, subscriber, action));
628  }
629  catch (const InvalidUnsubscription& e)
630  {
631  goby::glog.is_warn() && goby::glog << e.what() << std::endl;
632  }
633  }
634 
635  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) { return 0; }
636 };
637 
641 template <typename InnerTransporter>
643  : public InterVehicleTransporterBase<InterVehiclePortal<InnerTransporter>, InnerTransporter>
644 {
646 
647  public:
648  using Base =
650 
654  InterVehiclePortal(const intervehicle::protobuf::PortalConfig& cfg) : cfg_(cfg) { _init(); }
655 
661  : Base(inner), cfg_(cfg)
662  {
663  _init();
664  }
665 
667  {
668  for (auto& modem_driver_data : modem_drivers_)
669  {
670  modem_driver_data->driver_thread_alive = false;
671  if (modem_driver_data->underlying_thread)
672  modem_driver_data->underlying_thread->join();
673  }
674  }
675 
676  friend Base;
677 
678  private:
679  template <typename Data>
680  void _publish(const Data& d, const Group& group, const Publisher<Data>& publisher)
681  {
682  this->innermost().template publish<intervehicle::groups::modem_data_out>(
683  this->_set_up_publish(d, group, publisher));
684  }
685 
686  template <typename Data>
687  void _subscribe(std::function<void(std::shared_ptr<const Data> d)> func, const Group& group,
688  const Subscriber<Data>& subscriber, typename Base::SubscriptionAction action)
689  {
690  try
691  {
692  auto dccl_subscription = this->_set_up_subscribe(func, group, subscriber, action);
693 
694  this->innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
695  dccl_subscription);
696  }
697  catch (const InvalidUnsubscription& e)
698  {
699  goby::glog.is_warn() && goby::glog << e.what() << std::endl;
700  }
701  }
702 
703  int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
704  {
705  int items = 0;
707  while (!received_.empty())
708  {
709  this->_receive(received_.front());
710  received_.pop_front();
711  ++items;
712  if (lock)
713  lock.reset();
714  }
715  return items;
716  }
717 
718  void _init()
719  {
720  // set up reception of forwarded (via acoustic) subscriptions,
721  // then re-publish to driver threads
722  {
723  using intervehicle::protobuf::Subscription;
724  auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
725  {
726  this->innermost()
728  intervehicle::protobuf::Subscription,
730  };
731  auto subscription = std::make_shared<
732  IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
733  subscribe_lambda);
734 
735  auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
736  this->subscriptions_[dccl_id].insert(
737  std::make_pair(subscription->subscribed_group(), subscription));
738  }
739 
740  this->innermost().template subscribe<intervehicle::groups::modem_data_in>(
741  [this](const intervehicle::protobuf::DCCLForwardedData& msg)
742  { received_.push_back(msg); });
743 
744  // a message requiring ack can be disposed by either [1] ack, [2] expire (TTL exceeded), [3] having no subscribers, [4] queue size exceeded.
745  // post the correct callback (ack for [1] and expire for [2-4])
746  // and remove the pending ack message
747  using ack_pair_type = intervehicle::protobuf::AckMessagePair;
748  this->innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
749  [this](const ack_pair_type& ack_pair)
750  { this->template _handle_ack_or_expire<0>(ack_pair); });
751 
752  using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
753  this->innermost()
754  .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
755  [this](const expire_pair_type& expire_pair)
756  { this->template _handle_ack_or_expire<1>(expire_pair); });
757 
758  this->innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
759  [this](const bool& ready)
760  {
761  goby::glog.is_debug1() && goby::glog << "Received driver ready" << std::endl;
762  ++drivers_ready_;
763  });
764 
765  // set up before drivers ready to ensure we don't miss subscriptions
766  if (cfg_.has_persist_subscriptions())
767  _set_up_persistent_subscriptions();
768 
769  for (int i = 0, n = cfg_.link_size(); i < n; ++i)
770  {
771  auto* link = cfg_.mutable_link(i);
772 
773  link->mutable_driver()->set_modem_id(link->modem_id());
774  link->mutable_mac()->set_modem_id(link->modem_id());
775 
776  modem_drivers_.emplace_back(new ModemDriverData);
777  ModemDriverData& data = *modem_drivers_.back();
778 
779  data.underlying_thread.reset(new std::thread(
780  [&data, link]()
781  {
782  try
783  {
784  data.modem_driver_thread.reset(new intervehicle::ModemDriverThread(*link));
785  data.modem_driver_thread->run(data.driver_thread_alive);
786  }
787  catch (std::exception& e)
788  {
789  goby::glog.is_warn() &&
790  goby::glog << "Modem driver thread had uncaught exception: " << e.what()
791  << std::endl;
792  throw;
793  }
794  }));
795 
796  if (goby::glog.buf().is_gui())
797  // allows for visual grouping of each link in the NCurses gui
798  std::this_thread::sleep_for(std::chrono::milliseconds(250));
799  }
800 
801  while (drivers_ready_ < modem_drivers_.size())
802  {
803  goby::glog.is_debug1() && goby::glog << "Waiting for drivers to be ready." << std::endl;
804  this->poll();
805  std::this_thread::sleep_for(std::chrono::seconds(1));
806  }
807 
808  // write subscriptions after drivers ready to ensure they aren't missed
809  if (former_sub_collection_.subscription_size() > 0)
810  {
811  goby::glog.is_debug1() &&
812  goby::glog << "Begin loading subscriptions from persistent storage..." << std::endl;
813  for (const auto& sub : former_sub_collection_.subscription())
814  this->innermost()
816  intervehicle::protobuf::Subscription,
818  }
819  }
820 
821  void _set_up_persistent_subscriptions()
822  {
823  const auto& dir = cfg_.persist_subscriptions().dir();
824  if (dir.empty())
825  goby::glog.is_die() && goby::glog << "persist_subscriptions.dir cannot be empty"
826  << std::endl;
827 
828  std::stringstream file_name;
829  file_name << dir;
830  if (dir.back() != '/')
831  file_name << "/";
832  file_name << "goby_intervehicle_subscriptions_" << cfg_.persist_subscriptions().name()
833  << ".pb.txt";
834  persist_sub_file_name_ = file_name.str();
835  {
836  std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
837  try
838  {
839  if (persist_sub_ifs.is_open())
840  {
841  google::protobuf::TextFormat::Parser parser;
842  google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
843  parser.Parse(&iis, &former_sub_collection_);
844  }
845  else
846  {
847  goby::glog.is_debug1() &&
848  goby::glog << "Could not open persistent subscriptions file: "
849  << persist_sub_file_name_
850  << ". Assuming no persistent subscriptions exist" << std::endl;
851  }
852  }
853  catch (const std::exception& e)
854  {
855  goby::glog.is_warn() &&
856  goby::glog << "Error reading persistent subscriptions file: " << e.what()
857  << std::endl;
858  }
859  }
860 
861  std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
862  if (!persist_sub_ofs.is_open())
863  {
864  goby::glog.is_die() &&
865  goby::glog << "Could not open persistent subscriptions file for writing: "
866  << persist_sub_file_name_ << std::endl;
867  }
868  remove(persist_sub_file_name_.c_str());
869 
870  this->innermost().template subscribe<intervehicle::groups::subscription_report>(
871  [this](const intervehicle::protobuf::SubscriptionReport& report)
872  {
873  goby::glog.is_debug1() && goby::glog << "Received subscription report: "
874  << report.ShortDebugString() << std::endl;
875  sub_reports_[report.link_modem_id()] = report;
876  std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
877  intervehicle::protobuf::SubscriptionPersistCollection collection;
878  collection.set_time_with_units(
879  goby::time::SystemClock::now<goby::time::MicroTime>());
880  for (auto report_p : sub_reports_)
881  {
882  for (const auto& sub : report_p.second.subscription())
883  *collection.add_subscription() = sub;
884  }
885  google::protobuf::TextFormat::Printer printer;
886  google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
887  goby::glog.is_debug1() &&
888  goby::glog << "Collection: " << collection.ShortDebugString() << std::endl;
889  printer.Print(collection, &oos);
890  });
891  }
892 
893  private:
894  intervehicle::protobuf::PortalConfig cfg_;
895 
896  struct ModemDriverData
897  {
898  std::unique_ptr<std::thread> underlying_thread;
899  std::unique_ptr<intervehicle::ModemDriverThread> modem_driver_thread;
900  std::atomic<bool> driver_thread_alive{true};
901  };
902  std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
903  unsigned drivers_ready_{0};
904 
905  std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
906 
907  intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
908  std::string persist_sub_file_name_;
909  std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;
910 };
911 } // namespace middleware
912 } // namespace goby
913 
914 #endif
goby::acomms::protobuf::ModemTransmission
Definition: modem_message.pb.h:166
goby::acomms::protobuf::DynamicBufferConfig::ttl_unit
boost::units::unit< ttl_dimension, boost::units::si::system > ttl_unit
Definition: buffer.pb.h:228
goby::middleware::InterVehiclePortal::Base
friend Base
Definition: intervehicle.h:676
goby::middleware::intervehicle::protobuf::Subscription
Definition: intervehicle.pb.h:822
goby::middleware::InterVehicleTransporterBase::publish_dynamic
void publish_dynamic(std::shared_ptr< const Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to const data variant)....
Definition: intervehicle.h:167
goby::middleware::intervehicle::protobuf::AckMessagePair
Definition: intervehicle.pb.h:1807
goby::middleware::intervehicle::groups::modem_subscription_forward_rx
constexpr Group modem_subscription_forward_rx
Definition: groups.h:50
goby::middleware::intervehicle::protobuf::TransporterConfig::broadcast
bool broadcast() const
Definition: intervehicle_transporter_config.pb.h:360
goby::middleware::intervehicle::protobuf::SubscriptionPersistCollection::subscription
const ::goby::middleware::intervehicle::protobuf::Subscription & subscription(int index) const
Definition: intervehicle.pb.h:4462
goby::middleware::StaticTransporterInterface
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition: interface.h:203
goby::middleware::InterVehicleForwarder
Implements the forwarder concept for the intervehicle layer.
Definition: intervehicle.h:576
goby::middleware::InterVehicleTransporterBase::SubscriptionAction::UNSUBSCRIBE
@ UNSUBSCRIBE
goby::acomms::protobuf::DriverConfig::set_modem_id
void set_modem_id(::google::protobuf::int32 value)
Definition: driver_base.pb.h:574
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::SerializerParserHelper
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:97
goby::util::FlexOstream::is_warn
bool is_warn()
Definition: flex_ostream.h:82
goby::middleware::StaticTransporterInterface< InterVehicleTransporterBase< Derived, InnerTransporter >, InnerTransporter >::subscribe
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition: interface.h:270
goby::middleware::intervehicle::protobuf::ExpireMessagePair
Definition: intervehicle.pb.h:1944
goby::middleware::Subscriber
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:36
intervehicle.pb.h
goby::middleware::InvalidPublication::InvalidPublication
InvalidPublication(const std::string &e)
Definition: intervehicle.h:56
goby::middleware::Subscriber::subscribe_expired_func
subscribe_expired_func_type subscribe_expired_func() const
Definition: subscriber.h:94
goby::middleware::intervehicle::protobuf::DCCLForwardedData
Definition: intervehicle.pb.h:1343
goby::util::FlexOstream::is_debug3
bool is_debug3()
Definition: flex_ostream.h:86
goby::middleware::detail::primitive_type::type
T type
Definition: primitive_type.h:38
goby::middleware::InterVehicleTransporterBase::InterVehicleTransporterBase
InterVehicleTransporterBase()
Definition: intervehicle.h:111
goby::middleware::intervehicle::protobuf::Subscription::UNSUBSCRIBE
static const Action UNSUBSCRIBE
Definition: intervehicle.pb.h:917
goby::middleware::InvalidSubscription::InvalidSubscription
InvalidSubscription(const std::string &e)
Definition: intervehicle.h:50
goby::middleware::intervehicle::protobuf::AckData
Definition: intervehicle.pb.h:1475
goby::middleware::Poller
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition: poller.h:37
goby::middleware::InterVehicleTransporterBase::subscribe_dynamic
void subscribe_dynamic(std::function< void(const Data &)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (const reference variant)....
Definition: intervehicle.h:213
goby::middleware::intervehicle::protobuf::PortalConfig::mutable_link
::goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig * mutable_link(int index)
Definition: intervehicle.pb.h:3175
goby::util::e
constexpr T e
Definition: constants.h:35
goby::middleware::InterVehicleTransporterBase< InterVehicleForwarder< InterProcessPortal< goby::middleware::InterThreadTransporter > >, InterProcessPortal< goby::middleware::InterThreadTransporter > >::SubscriptionAction
SubscriptionAction
Definition: intervehicle.h:82
goby::middleware::InvalidUnsubscription::InvalidUnsubscription
InvalidUnsubscription(const std::string &e)
Definition: intervehicle.h:62
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::middleware::MarshallingScheme::PROTOBUF
@ PROTOBUF
Definition: interface.h:53
goby::middleware::IntervehicleSerializationSubscription
Represents a subscription to a serialized data type (intervehicle layer).
Definition: serialization_handlers.h:204
goby::middleware::InterVehicleTransporterBase::InterVehicleTransporterBase
InterVehicleTransporterBase(InnerTransporter &inner)
Definition: intervehicle.h:88
goby::acomms::iridium::protobuf::report
extern ::google::protobuf::internal::ExtensionIdentifier< ::goby::acomms::protobuf::ModemReport, ::google::protobuf::internal::MessageTypeTraits< ::goby::acomms::iridium::protobuf::Report >, 11, false > report
Definition: iridium_driver.pb.h:1520
goby::middleware::InterVehicleTransporterBase::_set_up_publish
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > _set_up_publish(const Data &d, const Group &group, const Publisher< Data > &publisher)
Definition: intervehicle.h:261
goby::middleware::intervehicle::protobuf::TransporterConfig::publisher_id
::google::protobuf::int32 publisher_id(int index) const
Definition: intervehicle_transporter_config.pb.h:245
goby::middleware::intervehicle::protobuf::DCCLForwardedData::header
const ::goby::middleware::intervehicle::protobuf::Header & header() const
Definition: intervehicle.pb.h:3802
goby::middleware::Publisher::acked_func
acked_func_type acked_func() const
Returns the acked data callback (or an empty function if none is set)
Definition: publisher.h:91
goby::middleware::Subscriber::has_group_func
bool has_group_func() const
Definition: subscriber.h:96
goby::middleware::InterVehicleTransporterBase::subscriptions_
std::unordered_map< int, std::unordered_map< std::string, std::shared_ptr< const SerializationHandlerBase< intervehicle::protobuf::Header > > > > subscriptions_
Definition: intervehicle.h:487
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
groups.h
goby::middleware::InvalidUnsubscription
Definition: intervehicle.h:59
goby::middleware::intervehicle::protobuf::PortalConfig::has_persist_subscriptions
bool has_persist_subscriptions() const
Definition: intervehicle.pb.h:3199
goby::middleware::protobuf::SerializerMetadataRequest
Definition: serializer_transporter.pb.h:599
goby::middleware::intervehicle::groups::subscription_forward
constexpr Group subscription_forward
Definition: groups.h:39
goby::middleware::InterVehicleTransporterBase::SubscriptionAction::SUBSCRIBE
@ SUBSCRIBE
goby::util::FlexOstream::is_debug1
bool is_debug1()
Definition: flex_ostream.h:84
goby::acomms::protobuf::DynamicBufferConfig::ack_required
bool ack_required() const
Definition: buffer.pb.h:292
goby::middleware::Subscriber::cfg
const goby::middleware::protobuf::TransporterConfig & cfg() const
Definition: subscriber.h:80
goby::middleware::protobuf::TransporterConfig::intervehicle
const ::goby::middleware::intervehicle::protobuf::TransporterConfig & intervehicle() const
Definition: transporter_config.pb.h:249
goby::middleware::protobuf::SerializerMetadataRequest::METADATA_INCLUDE
static const Request METADATA_INCLUDE
Definition: serializer_transporter.pb.h:692
goby::field
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::FieldOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyFieldOptions >, 11, false > field
Definition: option_extensions.pb.h:1323
goby::middleware::SerializerParserHelper::type_name
static std::string type_name()
The marshalling scheme specific string name for this type.
Definition: interface.h:107
goby::middleware::StaticTransporterInterface< InterVehicleTransporterBase< InterVehicleForwarder< InnerTransporter >, InnerTransporter >, InnerTransporter >::publish
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition: interface.h:215
goby::middleware::InterVehicleTransporterBase::unsubscribe_dynamic
void unsubscribe_dynamic(const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe from a specific run-time defined group and data type. Where possible, prefer the static v...
Definition: intervehicle.h:248
goby::middleware::intervehicle::protobuf::Header
Definition: intervehicle.pb.h:1066
goby::middleware::InterVehicleTransporterBase::_receive
void _receive(const intervehicle::protobuf::DCCLForwardedData &packets)
Definition: intervehicle.h:430
goby::middleware::InterVehicleTransporterBase::publish_dynamic
void publish_dynamic(const Data &data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (const reference variant)....
Definition: intervehicle.h:142
goby::middleware::PollerInterface::poll
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition: interface.h:345
goby::middleware::InterVehicleForwarder::Base
friend Base
Definition: intervehicle.h:607
goby::middleware::InterVehicleTransporterBase::~InterVehicleTransporterBase
virtual ~InterVehicleTransporterBase()=default
goby::middleware::intervehicle::protobuf::DCCLForwardedData::frame
const ::goby::middleware::intervehicle::protobuf::DCCLPacket & frame(int index) const
Definition: intervehicle.pb.h:3859
interthread.h
goby::middleware::InterVehicleTransporterBase::_serialize_subscription
std::shared_ptr< intervehicle::protobuf::Subscription > _serialize_subscription(const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
Definition: intervehicle.h:444
goby::middleware::intervehicle::groups::modem_subscription_forward_tx
constexpr Group modem_subscription_forward_tx
Definition: groups.h:48
goby::middleware::SerializationHandlerBase
Base class for handling posting callbacks for serialized data types (interprocess and outer)
Definition: serialization_handlers.h:96
goby::acomms::protobuf::DynamicBufferConfig::descriptor
static const ::google::protobuf::Descriptor * descriptor()
goby::middleware::intervehicle::protobuf::Subscription::SUBSCRIBE
static const Action SUBSCRIBE
Definition: intervehicle.pb.h:915
goby::middleware::intervehicle::protobuf::PortalConfig
Definition: intervehicle.pb.h:573
goby::middleware::InterVehiclePortal::InterVehiclePortal
InterVehiclePortal(InnerTransporter &inner, const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration and a reference to an external inner transporter.
Definition: intervehicle.h:660
dccl.h
goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions::name
const ::std::string & name() const
Definition: intervehicle.pb.h:3046
goby::middleware::intervehicle::protobuf::ExpireData
Definition: intervehicle.pb.h:1622
goby::middleware::intervehicle::ModemDriverThread::modem_id_type
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
Definition: driver_thread.h:136
goby::middleware::Publisher::cfg
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition: publisher.h:81
goby::middleware::intervehicle::protobuf::SubscriptionPersistCollection::subscription_size
int subscription_size() const
Definition: intervehicle.pb.h:4447
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::middleware::InterVehicleTransporterBase::_handle_ack_or_expire
void _handle_ack_or_expire(const AckorExpirePair &ack_or_expire_pair)
Definition: intervehicle.h:387
goby::middleware::intervehicle::groups::metadata_request
constexpr Group metadata_request
Definition: groups.h:54
goby::middleware::InnerTransporterInterface< InterVehicleTransporterBase< Derived, InnerTransporter >, InnerTransporter >::inner
InnerTransporter & inner()
Definition: interface.h:63
goby::util::FlexOstream::is_die
bool is_die()
Definition: flex_ostream.h:81
goby::middleware::InterVehicleTransporterBase
Base class for implementing transporters (both portal and forwarder) for the intervehicle layer.
Definition: intervehicle.h:70
goby::middleware::intervehicle::protobuf::PortalConfig::link_size
int link_size() const
Definition: intervehicle.pb.h:3169
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
goby::middleware::InterVehicleForwarder::~InterVehicleForwarder
virtual ~InterVehicleForwarder()=default
goby::middleware::InterVehiclePortal::~InterVehiclePortal
~InterVehiclePortal()
Definition: intervehicle.h:666
goby::middleware::InterVehiclePortal::InterVehiclePortal
InterVehiclePortal(const intervehicle::protobuf::PortalConfig &cfg)
Instantiate a portal with the given configuration (with the portal owning the inner transporter)
Definition: intervehicle.h:654
goby::middleware::InterVehiclePortal
Implements a portal for the intervehicle layer based on Goby Acomms.
Definition: intervehicle.h:642
goby::middleware::Group::broadcast_group
static constexpr std::uint32_t broadcast_group
Special group number representing the broadcast group (used when no grouping is required for a given ...
Definition: group.h:62
goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions::dir
const ::std::string & dir() const
Definition: intervehicle.pb.h:3112
goby::middleware::MarshallingScheme::DCCL
@ DCCL
Definition: interface.h:54
goby::middleware::InterVehicleTransporterBase::_set_up_subscribe
std::shared_ptr< intervehicle::protobuf::Subscription > _set_up_subscribe(std::function< void(std::shared_ptr< const Data > d)> func, const Group &group, const Subscriber< Data > &subscriber, SubscriptionAction action)
Definition: intervehicle.h:302
goby::middleware::protobuf::SerializerProtobufMetadata
Definition: serializer_transporter.pb.h:109
goby::middleware::Publisher::has_set_group_func
bool has_set_group_func() const
Definition: publisher.h:95
goby::middleware::InterVehicleForwarder::InterVehicleForwarder
InterVehicleForwarder(InnerTransporter &inner)
Construct a forwarder for the intervehicle layer.
Definition: intervehicle.h:586
GOBY_INTERVEHICLE_API_VERSION
#define GOBY_INTERVEHICLE_API_VERSION
Definition: version.h:35
goby::middleware::protobuf::SerializerMetadataRequest::METADATA_EXCLUDE
static const Request METADATA_EXCLUDE
Definition: serializer_transporter.pb.h:694
goby::middleware::scheme
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition: cstr.h:65
goby::middleware::InvalidSubscription
Definition: intervehicle.h:47
goby::middleware::intervehicle::protobuf::Subscription::descriptor
static const ::google::protobuf::Descriptor * descriptor()
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
goby::middleware::InterVehicleTransporterBase::check_validity
void check_validity()
Check validity of the Group for interthread use (at compile time)
Definition: intervehicle.h:127
goby::middleware::intervehicle::groups::modem_data_in
constexpr Group modem_data_in
Definition: groups.h:44
goby::middleware::InvalidPublication
Definition: intervehicle.h:53
goby::middleware::intervehicle::protobuf::TransporterConfig::buffer
const ::goby::acomms::protobuf::DynamicBufferConfig & buffer() const
Definition: intervehicle_transporter_config.pb.h:305
goby::middleware::Group::invalid_numeric_group
static constexpr std::uint32_t invalid_numeric_group
Special group number representing an invalid numeric group (unsuitable for intervehicle and outer lay...
Definition: group.h:64
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::middleware::InterVehicleTransporterBase::_insert_pending_ack
void _insert_pending_ack(int dccl_id, std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > data, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::AckData >> ack_handler, std::shared_ptr< SerializationHandlerBase< intervehicle::protobuf::ExpireData >> expire_handler)
Definition: intervehicle.h:468
goby::middleware::Publisher::expired_func
expired_func_type expired_func() const
Returns the expired data callback (or an empty function if none is set)
Definition: publisher.h:93
serialization_handlers.h
goby::middleware::protobuf::SerializerProtobufMetadata::set_protobuf_name
void set_protobuf_name(const ::std::string &value)
Definition: serializer_transporter.pb.h:784
driver_thread.h
goby::middleware::InterVehicleTransporterBase::subscribe_dynamic
void subscribe_dynamic(std::function< void(std::shared_ptr< const Data >)> f, const Group &group=Group(), const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific run-time defined group and data type (shared pointer variant)....
Definition: intervehicle.h:231
goby::middleware::Subscriber::subscribed_func
subscribed_func_type subscribed_func() const
Definition: subscriber.h:92
goby::middleware::intervehicle::protobuf::PortalConfig::persist_subscriptions
const ::goby::middleware::intervehicle::protobuf::PortalConfig_PersistSubscriptions & persist_subscriptions() const
Definition: intervehicle.pb.h:3215
goby::middleware::InterVehicleTransporterBase::publish_dynamic
void publish_dynamic(std::shared_ptr< Data > data, const Group &group=Group(), const Publisher< Data > &publisher=Publisher< Data >())
Publish a message using a run-time defined DynamicGroup (shared pointer to mutable data variant)....
Definition: intervehicle.h:199
goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig::mutable_driver
::goby::acomms::protobuf::DriverConfig * mutable_driver()
Definition: intervehicle.pb.h:2844
goby::middleware::intervehicle::serialize_publication
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
Definition: driver_thread.h:112
goby::middleware::InnerTransporterInterface< InterVehicleTransporterBase< InterVehiclePortal< InnerTransporter >, InnerTransporter >, InnerTransporter >::innermost
auto innermost()
Definition: interface.h:67
goby::middleware::InterVehicleTransporterBase::scheme
static constexpr int scheme()
returns the marshalling scheme id for a given data type on this layer. Only MarshallingScheme::DCCL i...
Definition: intervehicle.h:116
google::protobuf::Message::ShortDebugString
string ShortDebugString() const
goby::middleware::PublisherCallback
Represents a callback for a published data type (e.g. acked_func or expired_func)
Definition: serialization_handlers.h:280
int