25 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
26 #define GOBY_MIDDLEWARE_TRANSPORT_INTERPROCESS_H
30 #include <sys/types.h>
50 template <
typename Derived,
typename InnerTransporter>
54 public Poller<InterProcessTransporterBase<Derived, InnerTransporter>>
75 template <
typename Data>
static constexpr
int scheme()
77 int scheme = goby::middleware::scheme<Data>();
91 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
96 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(data,
group, publisher);
97 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
107 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
114 static_cast<Derived*
>(
this)->
template _publish<Data, scheme>(*data,
group, publisher);
115 this->
inner().template publish_dynamic<Data, scheme>(data,
group, publisher);
126 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
130 publish_dynamic<Data, scheme>(std::shared_ptr<const Data>(data),
group, publisher);
138 static_cast<Derived*
>(
this)->_publish_serialized(type_name,
scheme, bytes,
group);
148 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
153 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(
154 [=](std::shared_ptr<const Data> d) { f(*d); },
group, subscriber);
164 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
169 static_cast<Derived*
>(
this)->
template _subscribe<Data, scheme>(f,
group, subscriber);
177 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
182 static_cast<Derived*
>(
this)->
template _unsubscribe<Data, scheme>(
group, subscriber);
195 std::shared_ptr<SerializationSubscriptionRegex>
199 const std::set<int>& schemes,
const std::string& type_regex =
".*",
200 const std::string& group_regex =
".*")
202 return static_cast<Derived*
>(
this)->_subscribe_regex(f, schemes, type_regex, group_regex);
214 template <typename Data, int scheme = InterProcessTransporterBase::scheme<Data>()>
216 std::function<
void(std::shared_ptr<const Data>,
const std::string&
type)> f,
217 const Group&
group,
const std::string& type_regex =
".*")
219 std::regex special_chars{R
"([-[\]{}()*+?.,\^$|#\s])"};
220 std::string sanitized_group =
221 std::regex_replace(std::string(group), special_chars, R"(\$&)");
223 auto regex_lambda = [=](
const std::vector<unsigned char>& data,
int schm,
224 const std::string&
type,
const Group& grp) {
225 auto data_begin = data.begin(), data_end = data.end(), actual_end = data.end();
231 return static_cast<Derived*
>(
this)->_subscribe_regex(regex_lambda, {
scheme}, type_regex,
232 "^" + sanitized_group +
"$");
244 int scheme = InterProcessTransporterBase::scheme<Data>()>
246 std::function<
void(std::shared_ptr<const Data>,
const std::string&
type)> f,
247 const std::string& type_regex =
".*")
257 static_assert((
group.c_str() !=
nullptr) && (
group.c_str()[0] !=
'\0'),
258 "goby::middleware::Group must have non-zero length string to publish on the "
259 "InterProcess layer");
265 if ((
group.c_str() ==
nullptr) || (
group.c_str()[0] ==
'\0'))
266 throw(
goby::Exception(
"Group must have a non-empty string for use on InterProcess"));
276 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
278 return static_cast<Derived*
>(
this)->_poll(
lock);
282 template <
typename Derived,
typename InnerTransporter>
285 template <
typename Derived,
typename InnerTransporter>
288 template <
typename Derived,
typename InnerTransporter>
296 template <
typename InnerTransporter>
297 class InterProcessForwarder
298 :
public InterProcessTransporterBase<InterProcessForwarder<InnerTransporter>, InnerTransporter>
313 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
314 msg) { _receive_regex_data_forwarded(
msg); });
318 this->unsubscribe_all();
327 template <
typename Data,
int scheme>
332 std::string* sbytes =
new std::string(bytes.begin(), bytes.end());
333 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
334 auto* key =
msg->mutable_key();
336 key->set_marshalling_scheme(
scheme);
338 key->set_group(std::string(
group));
339 msg->set_allocated_data(sbytes);
341 *key->mutable_cfg() = publisher.
cfg();
343 this->inner().template publish<Base::to_portal_group_>(
msg);
346 void _publish_serialized(std::string type_name,
int scheme,
const std::vector<char>& bytes,
349 auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
350 auto* key =
msg->mutable_key();
352 key->set_marshalling_scheme(
scheme);
353 key->set_type(type_name);
354 key->set_group(std::string(
group));
355 msg->set_data(std::string(bytes.begin(), bytes.end()));
357 this->inner().template publish<Base::to_portal_group_>(
msg);
360 template <
typename Data,
int scheme>
361 void _subscribe(std::function<
void(std::shared_ptr<const Data> d)> f,
const Group&
group,
362 const Subscriber<Data>& subscriber)
364 this->inner().template subscribe_dynamic<Data, scheme>(f,
group);
367 auto inner_publication_lambda = [=](std::shared_ptr<const Data> d) {
368 this->inner().template publish_dynamic<Data, scheme>(d,
group);
371 auto subscription = std::make_shared<SerializationSubscription<Data, scheme>>(
372 inner_publication_lambda,
group,
374 [=](
const Data& d) {
return group; }));
376 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
380 template <
typename Data,
int scheme>
381 void _unsubscribe(
const Group&
group,
const Subscriber<Data>& subscriber)
383 this->inner().template unsubscribe_dynamic<Data, scheme>(
group, subscriber);
385 auto unsubscription = std::shared_ptr<SerializationHandlerBase<>>(
386 new SerializationUnSubscription<Data, scheme>(
group));
388 this->inner().template publish<Base::to_portal_group_, SerializationHandlerBase<>>(
392 void _unsubscribe_all()
394 regex_subscriptions_.clear();
395 auto all = std::make_shared<SerializationUnSubscribeAll>();
396 this->inner().template publish<Base::to_portal_group_, SerializationUnSubscribeAll>(all);
399 std::shared_ptr<SerializationSubscriptionRegex>
400 _subscribe_regex(std::function<
void(
const std::vector<unsigned char>&,
int scheme,
401 const std::string&
type,
const Group&
group)>
403 const std::set<int>& schemes,
const std::string& type_regex =
".*",
404 const std::string& group_regex =
".*")
406 auto inner_publication_lambda = [=](
const std::vector<unsigned char>& data,
int scheme,
407 const std::string&
type,
const Group&
group) {
408 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
410 forwarded_data->mutable_key()->set_marshalling_scheme(
scheme);
411 forwarded_data->mutable_key()->set_type(
type);
412 forwarded_data->mutable_key()->set_group(
group);
413 forwarded_data->set_data(std::string(data.begin(), data.end()));
414 this->inner().template publish<Base::regex_group_>(forwarded_data);
417 auto portal_subscription = std::make_shared<SerializationSubscriptionRegex>(
418 inner_publication_lambda, schemes, type_regex, group_regex);
419 this->inner().template publish<Base::to_portal_group_, SerializationSubscriptionRegex>(
420 portal_subscription);
422 auto local_subscription = std::shared_ptr<SerializationSubscriptionRegex>(
423 new SerializationSubscriptionRegex(f, schemes, type_regex, group_regex));
424 regex_subscriptions_.insert(local_subscription);
425 return local_subscription;
428 void _receive_regex_data_forwarded(
429 std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>
msg)
431 const auto& bytes =
msg->data();
432 for (
auto& sub : regex_subscriptions_)
433 sub->post(bytes.begin(), bytes.end(),
msg->key().marshalling_scheme(),
434 msg->key().type(),
msg->key().group());
437 int _poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
443 std::set<std::shared_ptr<const SerializationSubscriptionRegex>> regex_subscriptions_;
446 template <
typename Derived,
typename InnerTransporter>
461 this->
inner().template subscribe<Base::to_portal_group_, SerializerTransporterMessage>(
462 [
this](std::shared_ptr<const SerializerTransporterMessage> d) {
463 static_cast<Derived*
>(
this)->_receive_publication_forwarded(*d);
466 this->
inner().template subscribe<Base::to_portal_group_, SerializationHandlerBase<>>(
467 [
this](std::shared_ptr<const middleware::SerializationHandlerBase<>> s) {
468 static_cast<Derived*
>(
this)->_receive_subscription_forwarded(s);
471 this->
inner().template subscribe<Base::to_portal_group_, SerializationSubscriptionRegex>(
472 [
this](std::shared_ptr<const middleware::SerializationSubscriptionRegex> s) {
473 static_cast<Derived*
>(
this)->_receive_regex_subscription_forwarded(s);
476 this->
inner().template subscribe<Base::to_portal_group_, SerializationUnSubscribeAll>(
477 [
this](std::shared_ptr<const middleware::SerializationUnSubscribeAll> s) {
478 static_cast<Derived*
>(
this)->_unsubscribe_all(s->subscriber_id());