24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_H
29 #include <sys/types.h>
33 #include <google/protobuf/io/zero_copy_stream_impl.h>
69 template <
typename Derived,
typename InnerTransporter>
73 public Poller<InterVehicleTransporterBase<Derived, InnerTransporter>>
98 << request.ShortDebugString() << std::endl;
100 switch (request.request())
103 omit_publish_metadata_.erase(request.key().type());
106 omit_publish_metadata_.insert(request.key().type());
116 template <
typename Data>
static constexpr
int scheme()
120 "Can only use DCCL messages with InterVehicleTransporters");
130 "goby::middleware::Group must have non-zero numeric "
131 "value to publish on the InterVehicle layer");
141 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
146 "Can only use DCCL messages with InterVehicleTransporters");
148 Data data_with_group = data;
149 publisher.set_group(data_with_group,
group);
151 static_cast<Derived*
>(
this)->
template _publish<Data>(data_with_group,
group, publisher);
153 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
155 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(data_with_group,
166 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
171 "Can only use DCCL messages with InterVehicleTransporters");
175 std::shared_ptr<Data> data_with_group(data->New());
176 data_with_group->CopyFrom(*data);
178 publisher.set_group(*data_with_group,
group);
180 static_cast<Derived*
>(
this)->
template _publish<Data>(*data_with_group,
group,
184 this->
inner().template publish_dynamic<Data, MarshallingScheme::DCCL>(data_with_group,
186 this->
inner().template publish_dynamic<Data, MarshallingScheme::PROTOBUF>(
187 data_with_group,
group, publisher);
198 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
202 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
212 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
217 "Can only use DCCL messages with InterVehicleTransporters");
218 auto pointer_ref_lambda = [=](std::shared_ptr<const Data> d) { f(*d); };
219 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
230 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
236 "Can only use DCCL messages with InterVehicleTransporters");
237 static_cast<Derived*
>(
this)->
template _subscribe<Data>(f,
group, subscriber,
247 template <typename Data, int scheme = goby::middleware::scheme<Data>()>
252 "Can only use DCCL messages with InterVehicleTransporters");
253 static_cast<Derived*
>(
this)->
template _subscribe<Data>(
254 std::function<
void(std::shared_ptr<const Data>)>(),
group, subscriber,
259 template <
typename Data>
260 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
265 std::stringstream ss;
266 ss <<
"Error: Publisher must have set_group_func in order to publish to a "
267 "non-broadcast Group ("
269 <<
"). The set_group_func modifies the contents of the outgoing message to store "
270 "the group information.";
278 auto ack_handler = std::make_shared<
282 auto expire_handler =
288 data, ack_handler, expire_handler);
291 if (!omit_publish_metadata_.count(data->key().type()))
292 _set_protobuf_metadata<Data>(data->mutable_key()->mutable_metadata(), d);
295 goby::glog <<
"Set up publishing for: " << data->ShortDebugString() << std::endl;
300 template <
typename Data>
301 std::shared_ptr<intervehicle::protobuf::Subscription>
313 std::stringstream ss;
314 ss <<
"Error: Subscriber must have group_func in order to subscribe to "
315 "non-broadcast Group ("
317 <<
"). The group_func returns the appropriate Group based on the contents "
318 "of the incoming message.";
325 std::stringstream ss;
326 ss <<
"Error: Broadcast subscriptions cannot have ack_required: true";
330 auto subscription = std::make_shared<
332 func,
group, subscriber);
346 std::stringstream ss;
347 ss <<
"Cannot unsubscribe to DCCL id: " << dccl_id
348 <<
" and group: " << std::string(
group) <<
" as no subscription was found.";
355 auto dccl_subscription =
356 this->
template _serialize_subscription<Data>(
group, subscriber, action);
364 auto subscribe_time = dccl_subscription->time_with_units();
365 subscription_publication->mutable_key()->set_serialize_time_with_units(subscribe_time);
371 auto expire_handler =
377 << subscription_publication->ShortDebugString()
380 this->pending_ack_.insert(std::make_pair(*subscription_publication,
381 std::make_tuple(ack_handler, expire_handler)));
383 return dccl_subscription;
386 template <
int tuple_index,
typename AckorExpirePair>
389 auto original = ack_or_expire_pair.serializer();
390 const auto& ack_or_expire_msg = ack_or_expire_pair.data();
392 original.key().type() ==
398 auto bytes_begin = original.data().begin(), bytes_end = original.data().end();
399 decltype(bytes_begin) actual_end;
403 auto subscription = Helper::parse(bytes_begin, bytes_end, actual_end);
404 subscription->mutable_header()->set_src(0);
406 std::vector<char> bytes(Helper::serialize(*subscription));
407 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
408 original.set_allocated_data(sbytes);
411 auto it = pending_ack_.find(original);
412 if (it != pending_ack_.end())
415 <<
" for: " << original.ShortDebugString() <<
", "
416 << ack_or_expire_msg.ShortDebugString()
419 std::get<tuple_index>(it->second)
420 ->post(original.data().begin(), original.data().end(), ack_or_expire_msg);
425 << (is_subscription ?
"subscription: " :
"data: ")
426 << original.ShortDebugString() << std::endl;
435 for (
const auto& packet : packets.
frame())
438 p.second->post(packet.data().begin(), packet.data().end(), packets.
header());
442 template <
typename Data>
443 std::shared_ptr<intervehicle::protobuf::Subscription>
448 auto dccl_subscription = std::make_shared<intervehicle::protobuf::Subscription>();
449 dccl_subscription->mutable_header()->set_src(0);
452 dccl_subscription->mutable_header()->add_dest(
id);
455 dccl_subscription->set_dccl_id(dccl_id);
456 dccl_subscription->set_group(
group.numeric());
457 dccl_subscription->set_time_with_units(
458 goby::time::SystemClock::now<goby::time::MicroTime>());
463 _set_protobuf_metadata<Data>(dccl_subscription->mutable_metadata());
464 *dccl_subscription->mutable_intervehicle() = subscriber.
cfg().
intervehicle();
465 return dccl_subscription;
469 int dccl_id, std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage> data,
477 this->pending_ack_.insert(
478 std::make_pair(*data, std::make_tuple(ack_handler, expire_handler)));
491 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
493 _expire_pending_ack();
495 return static_cast<Derived*
>(
this)->_poll(
lock);
501 _insert_file_desc_with_dependencies(Data::descriptor()->file(), meta);
504 template <
typename Data>
505 void _set_protobuf_metadata(protobuf::SerializerProtobufMetadata* meta,
const Data& d)
507 meta->set_protobuf_name(
509 _insert_file_desc_with_dependencies(d.GetDescriptor()->file(), meta);
513 void _insert_file_desc_with_dependencies(
const google::protobuf::FileDescriptor* file_desc,
514 protobuf::SerializerProtobufMetadata* meta)
516 for (
int i = 0, n = file_desc->dependency_count(); i < n; ++i)
517 _insert_file_desc_with_dependencies(file_desc->dependency(i), meta);
519 google::protobuf::FileDescriptorProto* file_desc_proto = meta->add_file_descriptor();
520 file_desc->CopyTo(file_desc_proto);
524 void _expire_pending_ack()
526 auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
527 for (
auto it = pending_ack_.begin(), end = pending_ack_.end(); it != end;)
530 ->FindFieldByName(
"ttl")
536 decltype(now) serialize_time(it->first.key().serialize_time_with_units());
537 decltype(now) expire_time(serialize_time + max_ttl);
540 const decltype(now) interprocess_wait(1.0 * boost::units::si::seconds);
544 if (now > expire_time + interprocess_wait)
547 << it->first.ShortDebugString() << std::endl;
548 it = pending_ack_.erase(it);
562 protobuf::SerializerTransporterMessage,
563 std::tuple<std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::AckData>>,
564 std::shared_ptr<SerializationHandlerBase<intervehicle::protobuf::ExpireData>>>>
568 std::set<std::string> omit_publish_metadata_;
575 template <
typename InnerTransporter>
595 this->
inner().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
596 [
this](
const ack_pair_type& ack_pair)
597 { this->
template _handle_ack_or_expire<0>(ack_pair); });
600 this->
inner().template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
601 [
this](
const expire_pair_type& expire_pair)
602 { this->
template _handle_ack_or_expire<1>(expire_pair); });
610 template <
typename Data>
613 this->
inner().template publish<intervehicle::groups::modem_data_out>(
617 template <
typename Data>
618 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
629 catch (
const InvalidUnsubscription&
e)
635 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock) {
return 0; }
641 template <
typename InnerTransporter>
668 for (
auto& modem_driver_data : modem_drivers_)
670 modem_driver_data->driver_thread_alive =
false;
671 if (modem_driver_data->underlying_thread)
672 modem_driver_data->underlying_thread->join();
679 template <
typename Data>
682 this->
innermost().template publish<intervehicle::groups::modem_data_out>(
686 template <
typename Data>
687 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> func,
const Group&
group,
694 this->
innermost().template publish<intervehicle::groups::modem_subscription_forward_tx>(
697 catch (
const InvalidUnsubscription&
e)
703 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
707 while (!received_.empty())
710 received_.pop_front();
723 using intervehicle::protobuf::Subscription;
724 auto subscribe_lambda = [=](std::shared_ptr<const Subscription> d)
728 intervehicle::protobuf::Subscription,
731 auto subscription = std::make_shared<
732 IntervehicleSerializationSubscription<Subscription, MarshallingScheme::DCCL>>(
735 auto dccl_id = SerializerParserHelper<Subscription, MarshallingScheme::DCCL>::id();
737 std::make_pair(subscription->subscribed_group(), subscription));
740 this->
innermost().template subscribe<intervehicle::groups::modem_data_in>(
741 [
this](
const intervehicle::protobuf::DCCLForwardedData&
msg)
742 { received_.push_back(
msg); });
747 using ack_pair_type = intervehicle::protobuf::AckMessagePair;
748 this->
innermost().template subscribe<intervehicle::groups::modem_ack_in, ack_pair_type>(
749 [
this](
const ack_pair_type& ack_pair)
750 { this->
template _handle_ack_or_expire<0>(ack_pair); });
752 using expire_pair_type = intervehicle::protobuf::ExpireMessagePair;
754 .template subscribe<intervehicle::groups::modem_expire_in, expire_pair_type>(
755 [
this](
const expire_pair_type& expire_pair)
756 { this->
template _handle_ack_or_expire<1>(expire_pair); });
758 this->
innermost().template subscribe<intervehicle::groups::modem_driver_ready, bool>(
759 [
this](
const bool& ready)
767 _set_up_persistent_subscriptions();
769 for (
int i = 0, n = cfg_.
link_size(); i < n; ++i)
774 link->mutable_mac()->set_modem_id(link->modem_id());
776 modem_drivers_.emplace_back(
new ModemDriverData);
777 ModemDriverData& data = *modem_drivers_.back();
779 data.underlying_thread.reset(
new std::thread(
784 data.modem_driver_thread.reset(
new intervehicle::ModemDriverThread(*link));
785 data.modem_driver_thread->run(data.driver_thread_alive);
787 catch (std::exception&
e)
790 goby::glog <<
"Modem driver thread had uncaught exception: " <<
e.what()
798 std::this_thread::sleep_for(std::chrono::milliseconds(250));
801 while (drivers_ready_ < modem_drivers_.size())
805 std::this_thread::sleep_for(std::chrono::seconds(1));
812 goby::glog <<
"Begin loading subscriptions from persistent storage..." << std::endl;
813 for (
const auto& sub : former_sub_collection_.
subscription())
816 intervehicle::protobuf::Subscription,
821 void _set_up_persistent_subscriptions()
828 std::stringstream file_name;
830 if (dir.back() !=
'/')
834 persist_sub_file_name_ = file_name.str();
836 std::ifstream persist_sub_ifs(persist_sub_file_name_.c_str());
839 if (persist_sub_ifs.is_open())
841 google::protobuf::TextFormat::Parser parser;
842 google::protobuf::io::IstreamInputStream iis(&persist_sub_ifs);
843 parser.Parse(&iis, &former_sub_collection_);
848 goby::glog <<
"Could not open persistent subscriptions file: "
849 << persist_sub_file_name_
850 <<
". Assuming no persistent subscriptions exist" << std::endl;
853 catch (
const std::exception&
e)
856 goby::glog <<
"Error reading persistent subscriptions file: " <<
e.what()
861 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
862 if (!persist_sub_ofs.is_open())
865 goby::glog <<
"Could not open persistent subscriptions file for writing: "
866 << persist_sub_file_name_ << std::endl;
868 remove(persist_sub_file_name_.c_str());
870 this->
innermost().template subscribe<intervehicle::groups::subscription_report>(
871 [
this](
const intervehicle::protobuf::SubscriptionReport&
report)
874 <<
report.ShortDebugString() << std::endl;
876 std::ofstream persist_sub_ofs(persist_sub_file_name_.c_str());
877 intervehicle::protobuf::SubscriptionPersistCollection collection;
878 collection.set_time_with_units(
879 goby::time::SystemClock::now<goby::time::MicroTime>());
880 for (
auto report_p : sub_reports_)
882 for (
const auto& sub : report_p.second.subscription())
883 *collection.add_subscription() = sub;
885 google::protobuf::TextFormat::Printer printer;
886 google::protobuf::io::OstreamOutputStream oos(&persist_sub_ofs);
888 goby::glog <<
"Collection: " << collection.ShortDebugString() << std::endl;
889 printer.Print(collection, &oos);
894 intervehicle::protobuf::PortalConfig cfg_;
896 struct ModemDriverData
898 std::unique_ptr<std::thread> underlying_thread;
899 std::unique_ptr<intervehicle::ModemDriverThread> modem_driver_thread;
900 std::atomic<bool> driver_thread_alive{
true};
902 std::vector<std::unique_ptr<ModemDriverData>> modem_drivers_;
903 unsigned drivers_ready_{0};
905 std::deque<intervehicle::protobuf::DCCLForwardedData> received_;
907 intervehicle::protobuf::SubscriptionPersistCollection former_sub_collection_;
908 std::string persist_sub_file_name_;
909 std::map<modem_id_type, intervehicle::protobuf::SubscriptionReport> sub_reports_;