24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_DETAIL_SUBSCRIPTION_STORE_H
27 #include <condition_variable>
32 #include <shared_mutex>
35 #include <unordered_map>
51 using StoresMap = std::unordered_map<std::type_index, std::shared_ptr<SubscriptionStoreBase>>;
52 static std::unordered_map<std::thread::id, StoresMap> stores_;
53 static std::shared_timed_mutex stores_mutex_;
61 std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
67 std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
73 for (
auto const& s : stores) poll_items += s.second->poll(
thread_id,
lock);
79 std::shared_lock<std::shared_timed_mutex> stores_lock(stores_mutex_);
82 for (
auto const& s : stores_.at(
thread_id)) s.second->unsubscribe_all_groups(
thread_id);
88 std::lock_guard<decltype(stores_mutex_)>
lock(stores_mutex_);
96 std::lock_guard<decltype(stores_mutex_)>
lock(stores_mutex_);
99 stores_.insert(std::make_pair(
thread_id, StoresMap()));
101 auto index = std::type_index(
typeid(StoreType));
104 std::make_pair(index, std::shared_ptr<StoreType>(
new StoreType)));
109 std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock) = 0;
115 DataProtection(std::shared_ptr<std::mutex> dm, std::shared_ptr<std::condition_variable_any> pcv,
116 std::shared_ptr<std::timed_mutex> pm)
131 std::thread::id
thread_id, std::shared_ptr<std::mutex> data_mutex,
132 std::shared_ptr<std::condition_variable_any> cv,
133 std::shared_ptr<std::timed_mutex> poller_mutex)
136 std::lock_guard<std::shared_timed_mutex>
lock(subscription_mutex_);
140 subscription_callbacks_.insert(std::make_pair(
thread_id, Callback(
group, func)));
142 subscription_groups_.insert(std::make_pair(
group, it));
146 if (queue_it == data_.end())
148 auto bool_it_pair = data_.insert(std::make_pair(
thread_id, DataQueue()));
149 queue_it = bool_it_pair.first;
151 queue_it->second.create(
group);
155 data_protection_.insert(std::make_pair(
160 SubscriptionStoreBase::insert<SubscriptionStore<Data>>(
thread_id);
166 std::lock_guard<std::shared_timed_mutex>
lock(subscription_mutex_);
169 auto range = subscription_groups_.equal_range(
group);
170 for (
auto it = range.first; it != range.second;)
172 auto sub_thread_id = it->second->first;
176 subscription_callbacks_.erase(it->second);
177 it = subscription_groups_.erase(it);
187 queue_it->second.remove(
group);
196 std::vector<detail::DataProtection> cv_to_notify;
198 std::shared_lock<std::shared_timed_mutex>
lock(subscription_mutex_);
200 auto range = subscription_groups_.equal_range(
group);
201 for (
auto it = range.first; it != range.second; ++it)
203 std::thread::id
thread_id = it->second->first;
209 std::unique_lock<std::mutex>
lock(*(data_protection_.at(
thread_id).data_mutex));
211 queue_it->second.insert(
group, data);
212 cv_to_notify.push_back(data_protection_.at(
thread_id));
218 for (
const auto& data_protection : cv_to_notify)
225 std::lock_guard<std::timed_mutex>(*data_protection.poller_mutex);
227 data_protection.poller_cv->notify_all();
233 std::unique_ptr<std::unique_lock<std::timed_mutex>>&
lock)
override
235 std::vector<std::pair<std::shared_ptr<typename Callback::CallbackType>,
236 std::shared_ptr<const Data>>>
238 int poll_items_count = 0;
241 std::shared_lock<std::shared_timed_mutex> sub_lock(subscription_mutex_);
244 if (queue_it == data_.end())
247 std::unique_lock<std::mutex> data_lock(
248 *(data_protection_.find(
thread_id)->second.data_mutex));
251 for (
auto data_it = queue_it->second.cbegin(), end = queue_it->second.cend();
252 data_it != end; ++data_it)
255 auto group_range = subscription_groups_.equal_range(
group);
257 for (
auto group_it = group_range.first; group_it != group_range.second; ++group_it)
259 if (group_it->second->first !=
thread_id)
263 for (
auto& datum : data_it->second)
269 data_callbacks.push_back(
270 std::make_pair(group_it->second->second.callback, datum));
273 queue_it->second.clear(
group);
278 for (
const auto& callback_datum_pair : data_callbacks)
279 (*callback_datum_pair.first)(std::move(callback_datum_pair.second));
281 return poll_items_count;
284 void unsubscribe_all_groups(std::thread::id
thread_id)
override
287 std::lock_guard<std::shared_timed_mutex>
lock(subscription_mutex_);
289 for (
auto it = subscription_groups_.begin(); it != subscription_groups_.end();)
291 auto sub_thread_id = it->second->first;
295 subscription_callbacks_.erase(it->second);
296 it = subscription_groups_.erase(it);
312 using CallbackType = std::function<
void(std::shared_ptr<const Data>)>;
313 Callback(
const Group& g,
const std::function<
void(std::shared_ptr<const Data>)>& c)
314 :
group(g), callback(new CallbackType(c))
318 std::shared_ptr<CallbackType> callback;
324 std::unordered_map<Group, std::vector<std::shared_ptr<const Data>>> data_;
327 void create(
const Group& g)
329 auto it = data_.find(g);
330 if (it == data_.end())
331 data_.insert(std::make_pair(g, std::vector<std::shared_ptr<const Data>>()));
333 void remove(
const Group& g) { data_.erase(g); }
335 void insert(
const Group& g, std::shared_ptr<const Data> datum)
337 data_.find(g)->second.push_back(datum);
339 void clear(
const Group& g) { data_.find(g)->second.clear(); }
340 bool empty() {
return data_.empty(); }
341 typename decltype(data_)::const_iterator cbegin() {
return data_.begin(); }
342 typename decltype(data_)::const_iterator cend() {
return data_.end(); }
346 static std::unordered_multimap<std::thread::id, Callback> subscription_callbacks_;
348 static std::unordered_multimap<Group,
349 typename decltype(subscription_callbacks_)::const_iterator>
350 subscription_groups_;
352 static std::unordered_map<std::thread::id, detail::DataProtection> data_protection_;
354 static std::shared_timed_mutex
358 static std::unordered_map<std::thread::id, DataQueue> data_;
361 template <
typename Data>
362 std::unordered_multimap<std::thread::id, typename SubscriptionStore<Data>::Callback>
363 SubscriptionStore<Data>::subscription_callbacks_;
364 template <
typename Data>
365 std::unordered_map<std::thread::id, typename SubscriptionStore<Data>::DataQueue>
366 SubscriptionStore<Data>::data_;
367 template <
typename Data>
370 SubscriptionStore<Data>::subscription_callbacks_)::const_iterator>
371 SubscriptionStore<Data>::subscription_groups_;
372 template <
typename Data>
373 std::unordered_map<std::thread::id, detail::DataProtection>
374 SubscriptionStore<Data>::data_protection_;
376 template <
typename Data> std::shared_timed_mutex SubscriptionStore<Data>::subscription_mutex_;