24 #ifndef GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
25 #define GOBY_ZEROMQ_TRANSPORT_INTERPROCESS_H
31 #include <condition_variable>
42 #include <unordered_map>
65 #if ZMQ_VERSION <= ZMQ_MAKE_VERSION(4, 3, 1)
66 #define USE_OLD_ZMQ_CPP_API
69 #if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 7, 1)
70 #define USE_OLD_CPPZMQ_SETSOCKOPT
73 #if CPPZMQ_VERSION < ZMQ_MAKE_VERSION(4, 8, 0)
74 #define USE_OLD_CPPZMQ_POLL
81 template <
typename Data>
class Publisher;
112 template <
typename Key>
113 const std::string&
id_component(
const Key& k, std::unordered_map<Key, std::string>& map)
115 auto it = map.find(k);
120 auto it_pair = map.insert(std::make_pair(k, v));
121 return it_pair.first->second;
127 std::unordered_map<int, std::string>* schemes_buffer =
nullptr,
128 std::unordered_map<std::thread::id, std::string>* threads_buffer =
nullptr)
135 auto thread = std::this_thread::get_id();
136 return (
"/" +
group +
"/" +
139 type_name +
"/" + process +
"/" +
145 return (
"/" +
group +
"/" +
148 type_name +
"/" + process +
"/");
152 return (
"/" +
group +
"/" +
160 #ifdef USE_OLD_ZMQ_CPP_API
175 #ifdef USE_OLD_CPPZMQ_SETSOCKOPT
176 control_socket_.setsockopt(ZMQ_LINGER, 0);
177 publish_socket_.setsockopt(ZMQ_LINGER, 0);
179 control_socket_.set(zmq::sockopt::linger, 0);
180 publish_socket_.set(zmq::sockopt::linger, 0);
194 void publish(
const std::string& identifier,
const char* bytes,
int size,
195 bool ignore_buffer =
false);
196 void subscribe(
const std::string& identifier);
200 std::deque<protobuf::InprocControl>&
control_buffer() {
return control_buffer_; }
208 bool have_pubsub_sockets_{
false};
210 std::deque<std::pair<std::string, std::vector<char>>>
214 std::deque<protobuf::InprocControl> control_buffer_;
222 zmq::context_t& context, std::atomic<bool>& alive,
223 std::shared_ptr<std::condition_variable_any> poller_cv);
227 #ifdef USE_OLD_CPPZMQ_SETSOCKOPT
228 control_socket_.setsockopt(ZMQ_LINGER, 0);
229 subscribe_socket_.setsockopt(ZMQ_LINGER, 0);
230 manager_socket_.setsockopt(ZMQ_LINGER, 0);
232 control_socket_.set(zmq::sockopt::linger, 0);
233 subscribe_socket_.set(zmq::sockopt::linger, 0);
234 manager_socket_.set(zmq::sockopt::linger, 0);
239 void poll(
long timeout_ms = -1);
240 void control_data(
const zmq::message_t& zmq_msg);
241 void subscribe_data(
const zmq::message_t& zmq_msg);
242 void manager_data(
const zmq::message_t& zmq_msg);
251 std::atomic<bool>& alive_;
252 std::shared_ptr<std::condition_variable_any> poller_cv_;
253 std::vector<zmq::pollitem_t> poll_items_;
264 bool have_pubsub_sockets_{
false};
266 bool manager_waiting_for_reply_{
false};
271 std::chrono::milliseconds(100)};
274 template <
typename InnerTransporter,
275 template <
typename Derived,
typename InnerTransporterType>
class PortalBase>
277 :
public PortalBase<InterProcessPortalImplementation<InnerTransporter, PortalBase>,
286 zmq_context_(cfg.zeromq_number_io_threads()),
287 zmq_main_(zmq_context_),
288 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
297 zmq_context_(cfg.zeromq_number_io_threads()),
298 zmq_main_(zmq_context_),
299 zmq_read_thread_(cfg_, zmq_context_, zmq_alive_, middleware::PollerInterface::cv())
320 friend typename Base::Base;
328 zmq_thread_ = std::make_unique<std::thread>([
this]() { zmq_read_thread_.
run(); });
333 if (zmq_main_.
recv(&control_msg))
335 switch (control_msg.
type())
349 _subscribe<protobuf::ManagerResponse, middleware::MarshallingScheme::PROTOBUF>(
350 [
this](std::shared_ptr<const protobuf::ManagerResponse> response) {
352 << response->ShortDebugString() << std::endl;
354 response->client_pid() == getpid() &&
357 zmq_main_.set_hold_state(response->hold());
363 _unsubscribe<protobuf::ManagerResponse,
364 middleware::MarshallingScheme::PROTOBUF>(
365 groups::manager_response,
366 middleware::Subscriber<protobuf::ManagerResponse>());
372 template <
typename Data,
int scheme>
378 _publish_serialized(type_name,
scheme, bytes,
group, ignore_buffer);
381 void _publish_serialized(std::string type_name,
int scheme,
const std::vector<char>& bytes,
384 std::string identifier = _make_fully_qualified_identifier(type_name,
scheme,
group) +
'\0';
385 zmq_main_.
publish(identifier, &bytes[0], bytes.size(), ignore_buffer);
388 template <
typename Data,
int scheme>
389 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
391 const middleware::Subscriber<Data>& )
393 std::string identifier =
396 auto subscription = std::make_shared<middleware::SerializationSubscription<Data, scheme>>(
399 [=](
const Data& ) {
return group; }));
401 if (forwarder_subscriptions_.count(identifier) == 0 &&
402 portal_subscriptions_.count(identifier) == 0)
404 portal_subscriptions_.insert(std::make_pair(identifier, subscription));
407 std::shared_ptr<middleware::SerializationSubscriptionRegex> _subscribe_regex(
408 std::function<
void(
const std::vector<unsigned char>&,
int scheme,
const std::string&
type,
411 const std::set<int>& schemes,
const std::string& type_regex,
const std::string& group_regex)
413 auto new_sub = std::make_shared<middleware::SerializationSubscriptionRegex>(
414 f, schemes, type_regex, group_regex);
415 _subscribe_regex(new_sub);
419 template <
typename Data,
int scheme>
422 const middleware::Subscriber<Data>& = middleware::Subscriber<Data>())
424 std::string identifier =
427 portal_subscriptions_.erase(identifier);
430 if (forwarder_subscriptions_.count(identifier) == 0)
434 void _unsubscribe_all(
440 for (
const auto& p : portal_subscriptions_)
442 const auto& identifier = p.first;
443 if (forwarder_subscriptions_.count(identifier) == 0)
446 portal_subscriptions_.clear();
450 while (forwarder_subscription_identifiers_[subscriber_id].size() > 0)
451 _forwarder_unsubscribe(
453 forwarder_subscription_identifiers_[subscriber_id].begin()->first);
457 if (regex_subscriptions_.size() > 0)
459 regex_subscriptions_.erase(subscriber_id);
460 if (regex_subscriptions_.empty())
465 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
468 protobuf::InprocControl new_control_msg;
470 #ifdef USE_OLD_ZMQ_CPP_API
471 int flags = ZMQ_NOBLOCK;
473 auto flags = zmq::recv_flags::dontwait;
476 while (zmq_main_.
recv(&new_control_msg, flags))
482 switch (control_msg.type())
490 const auto& data = control_msg.received_data();
494 std::tie(
group,
scheme,
type, process, thread) = parse_identifier(data);
495 std::string identifier = _make_identifier(
499 std::vector<std::weak_ptr<const middleware::SerializationHandlerBase<>>>
501 auto portal_range = portal_subscriptions_.equal_range(identifier);
502 for (
auto it = portal_range.first; it != portal_range.second; ++it)
503 subs_to_post.push_back(it->second);
504 auto forwarder_it = forwarder_subscriptions_.find(identifier);
505 if (forwarder_it != forwarder_subscriptions_.end())
506 subs_to_post.push_back(forwarder_it->second);
510 const auto& data = control_msg.received_data();
511 auto null_delim_it = std::find(std::begin(data), std::end(data),
'\0');
512 for (
auto& sub : subs_to_post)
514 if (
auto sub_sp = sub.lock())
515 sub_sp->post(null_delim_it + 1, data.end());
519 if (!regex_subscriptions_.empty())
521 auto null_delim_it = std::find(std::begin(data), std::end(data),
'\0');
523 bool forwarder_subscription_posted =
false;
524 for (
auto& sub : regex_subscriptions_)
527 bool is_forwarded_sub =
529 if (is_forwarded_sub && forwarder_subscription_posted)
532 if (sub.second->post(null_delim_it + 1, data.end(),
scheme,
type,
535 forwarder_subscription_posted =
true;
543 protobuf::ManagerRequest req;
545 req.set_ready(ready_);
548 req.set_client_pid(getpid());
551 << req.ShortDebugString() << std::endl;
553 _publish<protobuf::ManagerRequest, middleware::MarshallingScheme::PROTOBUF>(
555 middleware::Publisher<protobuf::ManagerRequest>(),
true);
566 void _receive_publication_forwarded(
569 std::string identifier =
570 _make_identifier(
msg.key().type(),
msg.key().marshalling_scheme(),
msg.key().group(),
573 auto& bytes =
msg.data();
574 zmq_main_.
publish(identifier, &bytes[0], bytes.size());
577 void _receive_subscription_forwarded(
578 const std::shared_ptr<
const middleware::SerializationHandlerBase<>>& subscription)
580 std::string identifier = _make_identifier(subscription->type_name(), subscription->scheme(),
581 subscription->subscribed_group(),
585 goby::glog <<
"Received subscription forwarded for identifier [" << identifier
586 <<
"] from subscriber id: " << subscription->subscriber_id() << std::endl;
588 switch (subscription->action())
593 if (forwarder_subscription_identifiers_[subscription->subscriber_id()].count(
597 if (forwarder_subscriptions_.count(identifier) == 0)
600 if (portal_subscriptions_.count(identifier) == 0)
604 forwarder_subscriptions_.insert(std::make_pair(identifier, subscription));
606 forwarder_subscription_identifiers_[subscription->subscriber_id()].insert(
607 std::make_pair(identifier, forwarder_subscriptions_.find(identifier)));
614 _forwarder_unsubscribe(subscription->subscriber_id(), identifier);
622 void _forwarder_unsubscribe(
const std::string& subscriber_id,
const std::string& identifier)
624 auto it = forwarder_subscription_identifiers_[subscriber_id].find(identifier);
625 if (it != forwarder_subscription_identifiers_[subscriber_id].end())
627 bool no_forwarder_subscribers =
true;
628 for (
const auto& p : forwarder_subscription_identifiers_)
630 if (p.second.count(identifier) != 0)
632 no_forwarder_subscribers =
false;
638 if (no_forwarder_subscribers)
641 forwarder_subscriptions_.erase(it->second);
644 if (portal_subscriptions_.count(identifier) == 0)
648 forwarder_subscription_identifiers_[subscriber_id].erase(it);
652 void _receive_regex_subscription_forwarded(
653 std::shared_ptr<const middleware::SerializationSubscriptionRegex> subscription)
655 _subscribe_regex(subscription);
658 void _subscribe_regex(
659 const std::shared_ptr<const middleware::SerializationSubscriptionRegex>& new_sub)
661 if (regex_subscriptions_.empty())
664 regex_subscriptions_.insert(std::make_pair(new_sub->subscriber_id(), new_sub));
667 template <
typename Data,
int scheme>
674 std::string _make_fully_qualified_identifier(
const std::string& type_name,
int scheme,
675 const std::string&
group)
681 template <
typename Data,
int scheme>
689 std::string _make_identifier(
const std::string& type_name,
int scheme,
const std::string&
group,
696 std::tuple<std::string, int, std::string, int, std::size_t>
697 parse_identifier(
const std::string& identifier)
699 const int number_elements = 5;
700 std::string::size_type previous_slash = 0;
701 std::vector<std::string> elem;
702 for (
auto i = 0; i < number_elements; ++i)
704 auto slash_pos = identifier.find(
'/', previous_slash + 1);
705 elem.push_back(identifier.substr(previous_slash + 1, slash_pos - (previous_slash + 1)));
706 previous_slash = slash_pos;
709 elem[2], std::stoi(elem[3]), std::stoull(elem[4],
nullptr, 16));
713 const protobuf::InterProcessPortalConfig cfg_;
715 std::unique_ptr<std::thread> zmq_thread_;
716 std::atomic<bool> zmq_alive_{
true};
717 zmq::context_t zmq_context_;
718 InterProcessPortalMainThread zmq_main_;
719 InterProcessPortalReadThread zmq_read_thread_;
722 std::unordered_multimap<std::string,
723 std::shared_ptr<const middleware::SerializationHandlerBase<>>>
724 portal_subscriptions_;
726 std::unordered_map<std::string, std::shared_ptr<const middleware::SerializationHandlerBase<>>>
727 forwarder_subscriptions_;
729 std::string, std::unordered_map<std::string,
typename decltype(
730 forwarder_subscriptions_)::const_iterator>>
731 forwarder_subscription_identifiers_;
733 std::unordered_multimap<std::string,
734 std::shared_ptr<const middleware::SerializationSubscriptionRegex>>
735 regex_subscriptions_;
737 std::unordered_map<int, std::string> schemes_;
738 std::unordered_map<std::thread::id, std::string> threads_;
747 : context_(context), cfg_(cfg)
762 zmq::context_t& context_;
774 :
Manager(context, cfg, router)
776 for (
const auto& req_c : hold.
required_client()) required_clients_.insert(req_c);
788 std::set<std::string> reported_clients_;
789 std::set<std::string> required_clients_;
791 zmq::context_t& context_;
795 std::vector<zmq::pollitem_t> poll_items_;
799 SOCKET_SUBSCRIBE = 1,
806 std::unique_ptr<zmq::socket_t> manager_socket_;
807 std::unique_ptr<zmq::socket_t> subscribe_socket_;
808 std::unique_ptr<zmq::socket_t> publish_socket_;
811 middleware::SerializerParserHelper<
812 protobuf::ManagerRequest, middleware::scheme<protobuf::ManagerRequest>()>::type_name(),
816 std::string zmq_filter_rep_{
818 protobuf::ManagerResponse,
819 middleware::scheme<protobuf::ManagerResponse>()>::type_name(),
822 std::string(1,
'\0')};
825 template <
typename InnerTransporter = m
iddleware::NullTransporter>