Goby3  3.1.5a
2024.05.23
subscription_store.h
Go to the documentation of this file.
1 // Copyright 2016-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
26 
27 #include <condition_variable>
28 #include <functional>
29 #include <memory>
30 #include <mutex>
31 #include <set>
32 #include <shared_mutex>
33 #include <thread>
34 #include <typeindex>
35 #include <unordered_map>
36 #include <vector>
37 
39 
40 namespace goby
41 {
42 namespace middleware
43 {
44 namespace detail
45 {
48 {
49  private:
50  // for each thread, stores a map of Datas to SubscriptionStores so that can call poll() on all the stores
51  using StoresMap = std::unordered_map<std::type_index, std::shared_ptr<SubscriptionStoreBase>>;
52  static std::unordered_map<std::thread::id, StoresMap> stores_;
53  static std::shared_timed_mutex stores_mutex_;
54 
55  public:
56  SubscriptionStoreBase() = default;
57  virtual ~SubscriptionStoreBase() = default;
58 
59  // returns number of data items posted to callbacks
60  static int poll_all(std::thread::id thread_id,
61  std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock)
62  {
63  // make a copy so that other threads can subscribe if
64  // necessary in their callbacks
65  StoresMap stores;
66  {
67  std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
68  if (stores_.count(thread_id))
69  stores = stores_.at(thread_id);
70  }
71 
72  int poll_items = 0;
73  for (auto const& s : stores) poll_items += s.second->poll(thread_id, lock);
74  return poll_items;
75  }
76 
77  static void unsubscribe_all(std::thread::id thread_id)
78  {
79  std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
80  if (stores_.count(thread_id))
81  {
82  for (auto const& s : stores_.at(thread_id)) s.second->unsubscribe_all_groups(thread_id);
83  }
84  }
85 
86  static void remove(std::thread::id thread_id)
87  {
88  std::lock_guard<decltype(stores_mutex_)> lock(stores_mutex_);
89  stores_.erase(thread_id);
90  }
91 
92  protected:
93  template <typename StoreType> static void insert(std::thread::id thread_id)
94  {
95  // check the store, and if there isn't one for this type, create one
96  std::lock_guard<decltype(stores_mutex_)> lock(stores_mutex_);
97 
98  if (!stores_.count(thread_id))
99  stores_.insert(std::make_pair(thread_id, StoresMap()));
100 
101  auto index = std::type_index(typeid(StoreType));
102  if (!stores_.at(thread_id).count(index))
103  stores_.at(thread_id).insert(
104  std::make_pair(index, std::shared_ptr<StoreType>(new StoreType)));
105  }
106 
107  protected:
108  virtual int poll(std::thread::id thread_id,
109  std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) = 0;
110  virtual void unsubscribe_all_groups(std::thread::id thread_id) = 0;
111 };
112 
114 {
115  DataProtection(std::shared_ptr<std::mutex> dm, std::shared_ptr<std::condition_variable_any> pcv,
116  std::shared_ptr<std::timed_mutex> pm)
117  : data_mutex(dm), poller_cv(pcv), poller_mutex(pm)
118  {
119  }
120 
121  std::shared_ptr<std::mutex> data_mutex;
122  std::shared_ptr<std::condition_variable_any> poller_cv;
123  std::shared_ptr<std::timed_mutex> poller_mutex;
124 };
125 
127 template <typename Data> class SubscriptionStore : public SubscriptionStoreBase
128 {
129  public:
130  static void subscribe(std::function<void(std::shared_ptr<const Data>)> func, const Group& group,
131  std::thread::id thread_id, std::shared_ptr<std::mutex> data_mutex,
132  std::shared_ptr<std::condition_variable_any> cv,
133  std::shared_ptr<std::timed_mutex> poller_mutex)
134  {
135  {
136  std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
137 
138  // insert callback
139  auto it =
140  subscription_callbacks_.insert(std::make_pair(thread_id, Callback(group, func)));
141  // insert group with iterator to callback
142  subscription_groups_.insert(std::make_pair(group, it));
143 
144  // if necessary, create a DataQueue for this thread
145  auto queue_it = data_.find(thread_id);
146  if (queue_it == data_.end())
147  {
148  auto bool_it_pair = data_.insert(std::make_pair(thread_id, DataQueue()));
149  queue_it = bool_it_pair.first;
150  }
151  queue_it->second.create(group);
152 
153  // if we don't have a condition variable already for this thread, store it
154  if (!data_protection_.count(thread_id))
155  data_protection_.insert(std::make_pair(
156  thread_id, detail::DataProtection(data_mutex, cv, poller_mutex)));
157  }
158 
159  // try inserting a copy of this templated class via the base class for SubscriptionStoreBase::poll_all to use
160  SubscriptionStoreBase::insert<SubscriptionStore<Data>>(thread_id);
161  }
162 
163  static void unsubscribe(const Group& group, std::thread::id thread_id)
164  {
165  {
166  std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
167 
168  // iterate over subscriptions for this group, and erase the ones belonging to this thread_id
169  auto range = subscription_groups_.equal_range(group);
170  for (auto it = range.first; it != range.second;)
171  {
172  auto sub_thread_id = it->second->first;
173 
174  if (sub_thread_id == thread_id)
175  {
176  subscription_callbacks_.erase(it->second);
177  it = subscription_groups_.erase(it);
178  }
179  else
180  {
181  ++it;
182  }
183  }
184 
185  // remove the dataqueue for this group
186  auto queue_it = data_.find(thread_id);
187  queue_it->second.remove(group);
188  }
189  }
190 
191  static void publish(std::shared_ptr<const Data> data, const Group& group,
192  const Publisher<Data>& publisher)
193  {
194  // push new data
195  // build up local vector of relevant condition variables while locked
196  std::vector<detail::DataProtection> cv_to_notify;
197  {
198  std::shared_lock<std::shared_timed_mutex> lock(subscription_mutex_);
199 
200  auto range = subscription_groups_.equal_range(group);
201  for (auto it = range.first; it != range.second; ++it)
202  {
203  std::thread::id thread_id = it->second->first;
204 
205  // don't store a copy if publisher == subscriber, and echo is false
206  if (thread_id != std::this_thread::get_id() || publisher.cfg().echo())
207  {
208  // protect the DataQueue we are writing to
209  std::unique_lock<std::mutex> lock(*(data_protection_.at(thread_id).data_mutex));
210  auto queue_it = data_.find(thread_id);
211  queue_it->second.insert(group, data);
212  cv_to_notify.push_back(data_protection_.at(thread_id));
213  }
214  }
215  }
216 
217  // unlock and notify condition variables from local vector
218  for (const auto& data_protection : cv_to_notify)
219  {
220  {
221  // lock to ensure the other thread isn't in the limbo region
222  // between _poll_all() and wait(), where the condition variable
223  // signal would be lost
224 
225  std::lock_guard<std::timed_mutex>(*data_protection.poller_mutex);
226  }
227  data_protection.poller_cv->notify_all();
228  }
229  }
230 
231  private:
232  int poll(std::thread::id thread_id,
233  std::unique_ptr<std::unique_lock<std::timed_mutex>>& lock) override
234  {
235  std::vector<std::pair<std::shared_ptr<typename Callback::CallbackType>,
236  std::shared_ptr<const Data>>>
237  data_callbacks;
238  int poll_items_count = 0;
239 
240  {
241  std::shared_lock<std::shared_timed_mutex> sub_lock(subscription_mutex_);
242 
243  auto queue_it = data_.find(thread_id);
244  if (queue_it == data_.end())
245  return 0; // no subscriptions
246 
247  std::unique_lock<std::mutex> data_lock(
248  *(data_protection_.find(thread_id)->second.data_mutex));
249 
250  // loop over all Groups stored in this DataQueue
251  for (auto data_it = queue_it->second.cbegin(), end = queue_it->second.cend();
252  data_it != end; ++data_it)
253  {
254  const Group& group = data_it->first;
255  auto group_range = subscription_groups_.equal_range(group);
256  // For a given Group, loop over all subscriptions to this Group
257  for (auto group_it = group_range.first; group_it != group_range.second; ++group_it)
258  {
259  if (group_it->second->first != thread_id)
260  continue;
261 
262  // store the callback function and datum for all the elements queued
263  for (auto& datum : data_it->second)
264  {
265  ++poll_items_count;
266  // we have data, no need to keep this lock any longer
267  if (lock)
268  lock.reset();
269  data_callbacks.push_back(
270  std::make_pair(group_it->second->second.callback, datum));
271  }
272  }
273  queue_it->second.clear(group);
274  }
275  }
276 
277  // now that we're no longer blocking the subscription or data mutex, actually run the callbacks
278  for (const auto& callback_datum_pair : data_callbacks)
279  (*callback_datum_pair.first)(std::move(callback_datum_pair.second));
280 
281  return poll_items_count;
282  }
283 
284  void unsubscribe_all_groups(std::thread::id thread_id) override
285  {
286  {
287  std::lock_guard<std::shared_timed_mutex> lock(subscription_mutex_);
288 
289  for (auto it = subscription_groups_.begin(); it != subscription_groups_.end();)
290  {
291  auto sub_thread_id = it->second->first;
292 
293  if (sub_thread_id == thread_id)
294  {
295  subscription_callbacks_.erase(it->second);
296  it = subscription_groups_.erase(it);
297  }
298  else
299  {
300  ++it;
301  }
302  }
303 
304  data_.erase(thread_id);
305  data_protection_.erase(thread_id);
306  }
307  }
308 
309  private:
310  struct Callback
311  {
312  using CallbackType = std::function<void(std::shared_ptr<const Data>)>;
313  Callback(const Group& g, const std::function<void(std::shared_ptr<const Data>)>& c)
314  : group(g), callback(new CallbackType(c))
315  {
316  }
317  Group group;
318  std::shared_ptr<CallbackType> callback;
319  };
320 
321  class DataQueue
322  {
323  private:
324  std::unordered_map<Group, std::vector<std::shared_ptr<const Data>>> data_;
325 
326  public:
327  void create(const Group& g)
328  {
329  auto it = data_.find(g);
330  if (it == data_.end())
331  data_.insert(std::make_pair(g, std::vector<std::shared_ptr<const Data>>()));
332  }
333  void remove(const Group& g) { data_.erase(g); }
334 
335  void insert(const Group& g, std::shared_ptr<const Data> datum)
336  {
337  data_.find(g)->second.push_back(datum);
338  }
339  void clear(const Group& g) { data_.find(g)->second.clear(); }
340  bool empty() { return data_.empty(); }
341  typename decltype(data_)::const_iterator cbegin() { return data_.begin(); }
342  typename decltype(data_)::const_iterator cend() { return data_.end(); }
343  };
344 
345  // subscriptions for a given thread
346  static std::unordered_multimap<std::thread::id, Callback> subscription_callbacks_;
347  // threads that are subscribed to a given group
348  static std::unordered_multimap<Group,
349  typename decltype(subscription_callbacks_)::const_iterator>
350  subscription_groups_;
351  // condition variable to use for data
352  static std::unordered_map<std::thread::id, detail::DataProtection> data_protection_;
353 
354  static std::shared_timed_mutex
355  subscription_mutex_; // protects subscription_callbacks, subscription_groups, data_protection, and the overarching data_ map (but not the DataQueues within it, which are protected by the mutexes stored in data_protection_))
356 
357  // data for a given thread
358  static std::unordered_map<std::thread::id, DataQueue> data_;
359 };
360 
361 template <typename Data>
362 std::unordered_multimap<std::thread::id, typename SubscriptionStore<Data>::Callback>
363  SubscriptionStore<Data>::subscription_callbacks_;
364 template <typename Data>
365 std::unordered_map<std::thread::id, typename SubscriptionStore<Data>::DataQueue>
366  SubscriptionStore<Data>::data_;
367 template <typename Data>
368 std::unordered_multimap<goby::middleware::Group,
369  typename decltype(
370  SubscriptionStore<Data>::subscription_callbacks_)::const_iterator>
371  SubscriptionStore<Data>::subscription_groups_;
372 template <typename Data>
373 std::unordered_map<std::thread::id, detail::DataProtection>
374  SubscriptionStore<Data>::data_protection_;
375 
376 template <typename Data> std::shared_timed_mutex SubscriptionStore<Data>::subscription_mutex_;
377 
378 } // namespace detail
379 } // namespace middleware
380 } // namespace goby
381 
382 #endif
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::detail::SubscriptionStoreBase::SubscriptionStoreBase
SubscriptionStoreBase()=default
detail
detail namespace with internal helper functions
Definition: json.hpp:246
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::middleware::detail::SubscriptionStore
Storage class for a specific interthread subscription (and related data). Used by InterThreadTranspor...
Definition: subscription_store.h:127
goby::middleware::detail::SubscriptionStoreBase
Base class for interthread subscription information. Non-template so it can be stored in a single con...
Definition: subscription_store.h:47
detail::void
j template void())
Definition: json.hpp:4822
goby::middleware::thread_id
std::string thread_id(std::thread::id i=std::this_thread::get_id())
Definition: common.h:53
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
goby::middleware::detail::SubscriptionStore::unsubscribe
static void unsubscribe(const Group &group, std::thread::id thread_id)
Definition: subscription_store.h:163
goby::middleware::detail::SubscriptionStoreBase::~SubscriptionStoreBase
virtual ~SubscriptionStoreBase()=default
goby::middleware::protobuf::TransporterConfig::echo
bool echo() const
Definition: transporter_config.pb.h:226
goby::middleware::detail::SubscriptionStore::subscribe
static void subscribe(std::function< void(std::shared_ptr< const Data >)> func, const Group &group, std::thread::id thread_id, std::shared_ptr< std::mutex > data_mutex, std::shared_ptr< std::condition_variable_any > cv, std::shared_ptr< std::timed_mutex > poller_mutex)
Definition: subscription_store.h:130
goby::middleware::detail::SubscriptionStore::publish
static void publish(std::shared_ptr< const Data > data, const Group &group, const Publisher< Data > &publisher)
Definition: subscription_store.h:191
goby::middleware::detail::SubscriptionStoreBase::unsubscribe_all
static void unsubscribe_all(std::thread::id thread_id)
Definition: subscription_store.h:77
goby::middleware::detail::SubscriptionStoreBase::poll
virtual int poll(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::timed_mutex >> &lock)=0
goby::middleware::detail::DataProtection::poller_mutex
std::shared_ptr< std::timed_mutex > poller_mutex
Definition: subscription_store.h:123
goby::middleware::detail::DataProtection::poller_cv
std::shared_ptr< std::condition_variable_any > poller_cv
Definition: subscription_store.h:122
goby::middleware::detail::DataProtection::DataProtection
DataProtection(std::shared_ptr< std::mutex > dm, std::shared_ptr< std::condition_variable_any > pcv, std::shared_ptr< std::timed_mutex > pm)
Definition: subscription_store.h:115
goby::middleware::Publisher::cfg
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition: publisher.h:81
goby::middleware::detail::DataProtection::data_mutex
std::shared_ptr< std::mutex > data_mutex
Definition: subscription_store.h:121
publisher.h
goby::middleware::detail::DataProtection
Definition: subscription_store.h:113
goby::middleware::detail::SubscriptionStoreBase::poll_all
static int poll_all(std::thread::id thread_id, std::unique_ptr< std::unique_lock< std::timed_mutex >> &lock)
Definition: subscription_store.h:60
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
goby::middleware::detail::SubscriptionStoreBase::insert
static void insert(std::thread::id thread_id)
Definition: subscription_store.h:93
goby::middleware::detail::SubscriptionStoreBase::unsubscribe_all_groups
virtual void unsubscribe_all_groups(std::thread::id thread_id)=0
jwt::create
builder< json_traits > create()
Definition: jwt.h:4168
goby::middleware::detail::SubscriptionStoreBase::remove
static void remove(std::thread::id thread_id)
Definition: subscription_store.h:86