Goby3  3.1.4
2024.02.22
interface.h
Go to the documentation of this file.
1 // Copyright 2017-2021:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_INTERFACE_H
26 
27 #include <chrono>
28 #include <condition_variable>
29 #include <memory>
30 #include <mutex>
31 
32 #include "goby/middleware/group.h"
34 
35 #include "goby/exception.h"
42 #include "goby/util/debug_logger.h"
43 
44 namespace goby
45 {
46 namespace middleware
47 {
48 class NullTransporter;
49 
56 template <typename Transporter, typename InnerTransporter, typename Enable = void>
58 {
59  public:
61  using InnerTransporterType = InnerTransporter;
63  InnerTransporter& inner()
64  {
65  static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
66  }
67  auto innermost()
68  {
69  static_assert(std::is_void<Enable>::value, "InnerTransporterInterface must be specialized");
70  }
71 };
72 
74 template <typename Transporter, typename InnerTransporter>
76  Transporter, InnerTransporter,
77  typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
78  !std::is_same<InnerTransporter, NullTransporter>::value>>
79 {
81  Transporter, InnerTransporter,
82  typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
83  !std::is_same<InnerTransporter, NullTransporter>::value>>;
84 
85  public:
87  InnerTransporter& inner() { return inner_; }
88  auto& innermost() { return inner_.innermost(); }
89 
91  using InnerTransporterType = InnerTransporter;
92 
93  protected:
95  InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
97  InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
98 
99  private:
100  std::shared_ptr<InnerTransporter> own_inner_;
101  InnerTransporter& inner_;
102 };
103 
105 template <typename Transporter, typename InnerTransporter>
107  Transporter, InnerTransporter,
108  typename std::enable_if_t<!std::is_same<Transporter, NullTransporter>::value &&
109  std::is_same<InnerTransporter, NullTransporter>::value>>
110 {
111  public:
113  using InnerTransporterType = InnerTransporter;
115  InnerTransporter& inner() { return inner_; }
116  Transporter& innermost() { return *static_cast<Transporter*>(this); }
117 
118  protected:
120  InnerTransporterInterface(InnerTransporter& inner) : inner_(inner) {}
122  InnerTransporterInterface() : own_inner_(new InnerTransporter), inner_(*own_inner_) {}
123 
124  private:
125  std::shared_ptr<InnerTransporter> own_inner_;
126  InnerTransporter& inner_;
127 };
128 
130 template <typename Transporter, typename InnerTransporter>
132  Transporter, InnerTransporter,
133  typename std::enable_if_t<std::is_same<Transporter, NullTransporter>::value &&
134  std::is_same<InnerTransporter, NullTransporter>::value>>
135 {
136 };
137 
140 {
141  public:
146  template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
147  int poll(const std::chrono::time_point<Clock, Duration>& timeout =
148  std::chrono::time_point<Clock, Duration>::max());
149 
154  template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
155  int poll(Duration wait_for);
156 
160  std::shared_ptr<std::timed_mutex> poll_mutex() { return poll_mutex_; }
161 
166  std::shared_ptr<std::condition_variable_any> cv() { return cv_; }
167 
168  protected:
169  PollerInterface(std::shared_ptr<std::timed_mutex> poll_mutex,
170  std::shared_ptr<std::condition_variable_any> cv)
171  : poll_mutex_(poll_mutex), cv_(cv)
172  {
173  }
174 
175  private:
176  template <typename Transporter> friend class Poller;
177  // poll the transporter for data
178  virtual int _transporter_poll(std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) = 0;
179 
180  private:
181  // poll all the transporters for data, including a timeout (only called by the outside-most Poller)
182  template <class Clock = std::chrono::system_clock, class Duration = typename Clock::duration>
183  int _poll_all(const std::chrono::time_point<Clock, Duration>& timeout);
184 
185  std::shared_ptr<std::timed_mutex> poll_mutex_;
186  // signaled when there's no data for this thread to read during _poll()
187  std::shared_ptr<std::condition_variable_any> cv_;
188 };
189 
191 enum class Necessity
192 {
193  REQUIRED,
194  RECOMMENDED,
195  OPTIONAL
196 };
197 
202 template <typename Transporter, typename InnerTransporter>
203 class StaticTransporterInterface : public InnerTransporterInterface<Transporter, InnerTransporter>
204 {
205  public:
213  template <const Group& group, typename Data,
214  int scheme = transporter_scheme<Data, Transporter>()>
215  void publish(const Data& data, const Publisher<Data>& publisher = Publisher<Data>())
216  {
217  static_cast<Transporter*>(this)->template check_validity<group>();
218  static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
219  publisher);
220  }
221 
232  template <const Group& group, typename Data,
233  int scheme = transporter_scheme<Data, Transporter>()>
234  void publish(std::shared_ptr<const Data> data,
235  const Publisher<Data>& publisher = Publisher<Data>())
236  {
237  static_cast<Transporter*>(this)->template check_validity<group>();
238  static_cast<Transporter*>(this)->template publish_dynamic<Data, scheme>(data, group,
239  publisher);
240  }
241 
252  template <const Group& group, typename Data,
253  int scheme = transporter_scheme<Data, Transporter>()>
254  void publish(std::shared_ptr<Data> data, const Publisher<Data>& publisher = Publisher<Data>())
255  {
256  publish<group, Data, scheme>(std::shared_ptr<const Data>(data), publisher);
257  }
258 
267  template <const Group& group, typename Data,
268  int scheme = transporter_scheme<Data, Transporter>(),
269  Necessity necessity = Necessity::OPTIONAL>
270  void subscribe(std::function<void(const Data&)> f,
271  const Subscriber<Data>& subscriber = Subscriber<Data>())
272  {
273  static_cast<Transporter*>(this)->template check_validity<group>();
274  static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
275  subscriber);
276  }
277 
286  template <const Group& group, typename Data,
287  int scheme = transporter_scheme<Data, Transporter>(),
288  Necessity necessity = Necessity::OPTIONAL>
289  void subscribe(std::function<void(std::shared_ptr<const Data>)> f,
290  const Subscriber<Data>& subscriber = Subscriber<Data>())
291  {
292  static_cast<Transporter*>(this)->template check_validity<group>();
293  static_cast<Transporter*>(this)->template subscribe_dynamic<Data, scheme>(f, group,
294  subscriber);
295  }
296 
306  template <const Group& group, Necessity necessity = Necessity::OPTIONAL, typename Func>
307  void subscribe(Func f)
308  {
309  // we want to grab the first argument of "f" and then capture "Data" from "const Data& data" and "std::shared_ptr<const Data>"
310  using Data = typename detail::primitive_type<
311  typename std::decay<detail::first_argument<Func>>::type>::type;
312 
313  subscribe<group, Data, transporter_scheme<Data, Transporter>(), necessity>(f);
314  }
315 
321  template <const Group& group, typename Data,
322  int scheme = transporter_scheme<Data, Transporter>()>
323  void unsubscribe(const Subscriber<Data>& subscriber = Subscriber<Data>())
324  {
325  static_cast<Transporter*>(this)->template check_validity<group>();
326  static_cast<Transporter*>(this)->template unsubscribe_dynamic<Data, scheme>(group,
327  subscriber);
328  }
329 
331  void unsubscribe_all() { static_cast<Transporter*>(this)->template unsubscribe_all(); }
332 
333  protected:
335  : InnerTransporterInterface<Transporter, InnerTransporter>(inner)
336  {
337  }
339 };
340 
341 } // namespace middleware
342 } // namespace goby
343 
344 template <class Clock, class Duration>
345 int goby::middleware::PollerInterface::poll(const std::chrono::time_point<Clock, Duration>& timeout)
346 {
347  return _poll_all(timeout);
348 }
349 
350 template <class Clock, class Duration>
352 {
353  if (wait_for == Duration::max())
354  return poll();
355  else
356  return poll(Clock::now() + wait_for);
357 }
358 
359 template <class Clock, class Duration>
360 int goby::middleware::PollerInterface::_poll_all(
361  const std::chrono::time_point<Clock, Duration>& timeout)
362 {
363  // hold this lock until either we find a polled item or we wait on the condition variable
364  std::unique_ptr<std::unique_lock<std::timed_mutex>> lock(
365  new std::unique_lock<std::timed_mutex>(*poll_mutex_));
366  // std::cout << std::this_thread::get_id() << " _poll_all locking: " << poll_mutex_.get() << std::endl;
367 
368  int poll_items = _transporter_poll(lock);
369  while (poll_items == 0)
370  {
371  if (!lock)
372  throw(goby::Exception(
373  "Poller lock was released by poll() but no poll items were returned"));
374 
375  if (timeout == Clock::time_point::max())
376  {
377  cv_->wait(*lock); // wait_until doesn't work well with time_point::max()
378  poll_items = _transporter_poll(lock);
379 
380  // TODO: fix this message now that zeromq::InterProcessPortal can have
381  // a condition_variable trigger for REQUEST_HOLD_STATE
382  //if (poll_items == 0)
383  // goby::glog.is(goby::util::logger::DEBUG3) &&
384  // goby::glog << "PollerInterface condition_variable: no data (spurious?) wakeup"
385  // << std::endl;
386  }
387  else
388  {
389  if (cv_->wait_until(*lock, timeout) == std::cv_status::no_timeout)
390  poll_items = _transporter_poll(lock);
391  else
392  return poll_items;
393  }
394  }
395 
396  return poll_items;
397 }
398 
399 #endif
goby::middleware::StaticTransporterInterface::StaticTransporterInterface
StaticTransporterInterface()
Definition: interface.h:338
goby::middleware::PollerInterface::PollerInterface
PollerInterface(std::shared_ptr< std::timed_mutex > poll_mutex, std::shared_ptr< std::condition_variable_any > cv)
Definition: interface.h:169
goby::middleware::StaticTransporterInterface
Defines the common interface for publishing and subscribing data using static (constexpr) groups on G...
Definition: interface.h:203
interface.h
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&!std::is_same< InnerTransporter, NullTransporter >::value > >::innermost
auto & innermost()
Definition: interface.h:88
goby::middleware::StaticTransporterInterface::subscribe
void subscribe(std::function< void(const Data &)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (const reference variant)
Definition: interface.h:270
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&!std::is_same< InnerTransporter, NullTransporter >::value > >::InnerTransporterType
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Definition: interface.h:91
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&std::is_same< InnerTransporter, NullTransporter >::value > >::InnerTransporterInterface
InnerTransporterInterface()
Generate a local instantiation of the inner transporter.
Definition: interface.h:122
goby::middleware::Subscriber
Class that holds additional metadata and callback functions related to a subscription (and is optiona...
Definition: subscriber.h:36
intervehicle.pb.h
group.h
goby::middleware::StaticTransporterInterface::subscribe
void subscribe(Func f)
Simplified version of subscribe() that can deduce Data from the first argument of the function (lambd...
Definition: interface.h:307
goby::middleware::Poller
Utility class for allowing the various Goby middleware transporters to poll the underlying transport ...
Definition: poller.h:37
goby::middleware::Necessity::REQUIRED
@ REQUIRED
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
subscriber.h
goby::middleware::detail::primitive_type
Definition: primitive_type.h:36
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&std::is_same< InnerTransporter, NullTransporter >::value > >::innermost
Transporter & innermost()
Definition: interface.h:116
goby::middleware::StaticTransporterInterface::StaticTransporterInterface
StaticTransporterInterface(InnerTransporter &inner)
Definition: interface.h:334
transporter_config.pb.h
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
goby::middleware::InnerTransporterInterface
Recursive inner layer transporter storage or generator.
Definition: interface.h:57
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&!std::is_same< InnerTransporter, NullTransporter >::value > >
Real transporter that has a real inner transporter.
Definition: interface.h:75
goby::middleware::InnerTransporterInterface< InterVehicleTransporterBase< InterVehicleForwarder< InnerTransporter >, InnerTransporter >, InnerTransporter >::InnerTransporterType
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Definition: interface.h:61
goby::middleware::StaticTransporterInterface::unsubscribe_all
void unsubscribe_all()
Unsubscribe to all messages that this transporter has subscribed to.
Definition: interface.h:331
goby::middleware::StaticTransporterInterface::publish
void publish(const Data &data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (const reference variant)
Definition: interface.h:215
goby::middleware::PollerInterface::poll
int poll(const std::chrono::time_point< Clock, Duration > &timeout=std::chrono::time_point< Clock, Duration >::max())
poll for data. Blocks until a data event occurs or a timeout when a particular time has been reached
Definition: interface.h:345
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&!std::is_same< InnerTransporter, NullTransporter >::value > >::InnerTransporterInterface
InnerTransporterInterface()
Generate a local instantiation of the inner transporter.
Definition: interface.h:97
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&std::is_same< InnerTransporter, NullTransporter >::value > >::InnerTransporterInterface
InnerTransporterInterface(InnerTransporter &inner)
Pass in an external inner transporter for use.
Definition: interface.h:120
publisher.h
jwt::json::type
type
Generic JSON types used in JWTs.
Definition: jwt.h:2071
goby::middleware::InnerTransporterInterface::inner
InnerTransporter & inner()
Definition: interface.h:63
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&std::is_same< InnerTransporter, NullTransporter >::value > >::InnerTransporterType
InnerTransporter InnerTransporterType
the InnerTransporter type (accessible for other uses)
Definition: interface.h:113
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&!std::is_same< InnerTransporter, NullTransporter >::value > >::inner
InnerTransporter & inner()
Definition: interface.h:87
goby::middleware::StaticTransporterInterface::unsubscribe
void unsubscribe(const Subscriber< Data > &subscriber=Subscriber< Data >())
Unsubscribe to a specific group and data type.
Definition: interface.h:323
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:58
goby::middleware::Necessity::RECOMMENDED
@ RECOMMENDED
goby::middleware::PollerInterface::poll_mutex
std::shared_ptr< std::timed_mutex > poll_mutex()
access the mutex used for poll synchronization
Definition: interface.h:160
goby::middleware::StaticTransporterInterface::subscribe
void subscribe(std::function< void(std::shared_ptr< const Data >)> f, const Subscriber< Data > &subscriber=Subscriber< Data >())
Subscribe to a specific group and data type (shared pointer variant)
Definition: interface.h:289
goby::middleware::Necessity::OPTIONAL
@ OPTIONAL
debug_logger.h
goby::middleware::PollerInterface::cv
std::shared_ptr< std::condition_variable_any > cv()
access the condition variable used for poll synchronization
Definition: interface.h:166
goby::middleware::StaticTransporterInterface::publish
void publish(std::shared_ptr< Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to mutable data variant)
Definition: interface.h:254
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&std::is_same< InnerTransporter, NullTransporter >::value > >::inner
InnerTransporter & inner()
Definition: interface.h:115
goby::middleware::Necessity
Necessity
Used to tag subscriptions based on their necessity (e.g. required for correct functioning,...
Definition: interface.h:191
goby::middleware::scheme
constexpr int scheme()
Placeholder to provide an interface for the scheme() function family.
Definition: cstr.h:65
goby::middleware::PollerInterface
Defines the common interface for polling for data on Goby transporters.
Definition: interface.h:139
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
primitive_type.h
goby::middleware::StaticTransporterInterface::publish
void publish(std::shared_ptr< const Data > data, const Publisher< Data > &publisher=Publisher< Data >())
Publish a message (shared pointer to const data variant)
Definition: interface.h:234
exception.h
goby::middleware::InnerTransporterInterface< Transporter, InnerTransporter, typename std::enable_if_t<!std::is_same< Transporter, NullTransporter >::value &&!std::is_same< InnerTransporter, NullTransporter >::value > >::InnerTransporterInterface
InnerTransporterInterface(InnerTransporter &inner)
Pass in an external inner transporter for use.
Definition: interface.h:95
type_helpers.h
goby::middleware::InnerTransporterInterface::innermost
auto innermost()
Definition: interface.h:67
detail::enable_if_t
typename std::enable_if< B, T >::type enable_if_t
Definition: json.hpp:3079