24 #ifndef GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
25 #define GOBY_ACOMMS_BUFFER_DYNAMIC_BUFFER_H
28 #include <type_traits>
29 #include <unordered_map>
62 template <
typename Container>
size_t data_size(
const Container& c) {
return c.size(); }
68 using size_type =
typename std::deque<T>::size_type;
99 void update(
const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
103 throw(
goby::Exception(
"Configuration vector must not be empty for DynamicSubBuffer"));
109 std::result_of<decltype (&DynamicBufferConfig::ttl)(DynamicBufferConfig)>::
type;
110 using value_base_type =
111 std::result_of<decltype (&DynamicBufferConfig::value_base)(DynamicBufferConfig)>::
type;
113 ttl_type ttl_sum = 0;
114 ttl_type ttl_divisor = 0;
115 value_base_type value_base_sum = 0;
116 value_base_type value_base_divisor = 0;
118 for (
const auto&
cfg : cfgs)
140 ++value_base_divisor;
145 cfg_.
set_ttl(ttl_sum / ttl_divisor);
146 if (value_base_divisor > 0)
163 Value&
top(
typename Clock::time_point reference = Clock::now(),
164 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
166 for (
auto& datum_pair : data_)
168 auto& datum_last_access = datum_pair.first;
169 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
171 last_access_ = reference;
172 datum_last_access = last_access_;
173 return datum_pair.second;
180 size_t top_size(
typename Clock::time_point reference = Clock::now(),
181 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
183 for (
const auto& datum_pair : data_)
185 const auto& datum_last_access = datum_pair.first;
186 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
187 return data_size(datum_pair.second.data);
195 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
197 for (
const auto& datum_pair : data_)
199 const auto& datum_last_access = datum_pair.first;
200 if (datum_last_access == zero_point_ || datum_last_access + ack_timeout < reference)
221 std::pair<double, ValueResult>
222 top_value(
typename Clock::time_point reference = Clock::now(),
223 size_type max_bytes = std::numeric_limits<size_type>::max(),
224 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
const
230 return std::make_pair(-std::numeric_limits<double>::infinity(),
233 return std::make_pair(-std::numeric_limits<double>::infinity(),
238 return std::make_pair(-std::numeric_limits<double>::infinity(),
241 using Duration = std::chrono::microseconds;
243 double dt = std::chrono::duration_cast<Duration>(reference - last_access_).count();
244 double ttl = goby::time::convert_duration<Duration>(cfg_.
ttl_with_units()).count();
247 double v = v_b * dt / ttl;
254 bool in_blackout(
typename Clock::time_point reference = Clock::now())
const
259 return reference <= (last_access_ + blackout);
262 bool empty()
const {
return data_.empty(); }
268 void pop() { data_.pop_front(); }
275 std::vector<Value>
push(
const T& t,
typename Clock::time_point reference = Clock::now())
277 std::vector<Value> exceeded;
280 data_.push_front(std::make_pair(zero_point_, Value({reference, t})));
282 data_.push_back(std::make_pair(zero_point_, Value({reference, t})));
286 exceeded.push_back(data_.back().second);
295 std::vector<Value>
expire(
typename Clock::time_point reference = Clock::now())
297 std::vector<Value> expired;
299 auto ttl = goby::time::convert_duration<typename Clock::duration>(cfg_.
ttl_with_units());
302 while (!data_.empty() && reference > (data_.back().second.push_time + ttl))
304 expired.push_back(data_.back().second);
310 while (!data_.empty() && reference > (data_.front().second.push_time + ttl))
312 expired.push_back(data_.front().second);
327 for (
auto it = data_.begin(), end = data_.end(); it != end; ++it)
329 const auto& datum_pair = it->second;
330 if (datum_pair == value)
337 if (cfg_.
newest_first() && datum_pair.push_time < value.push_time)
339 else if (!cfg_.
newest_first() && datum_pair.push_time > value.push_time)
349 std::deque<std::pair<typename Clock::time_point, Value>> data_;
350 typename Clock::time_point last_access_{Clock::now()};
352 typename Clock::time_point zero_point_{std::chrono::seconds(0)};
356 template <
typename T,
typename Clock = goby::time::SteadyClock>
class DynamicBuffer
362 glog_priority_group_ =
"goby::acomms::buffer::priority::" +
std::to_string(
id);
388 create(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
398 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
400 if (sub_.count(dest_id) && sub_.at(dest_id).count(sub_id))
401 throw(
goby::Exception(
"Subbuffer ID: " + sub_id +
" already exists."));
414 replace(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
423 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
426 create(dest_id, sub_id, cfgs);
437 update(dest_id, sub_id, std::vector<goby::acomms::protobuf::DynamicBufferConfig>(1, cfg));
446 const std::vector<goby::acomms::protobuf::DynamicBufferConfig>& cfgs)
448 auto it = sub_[dest_id].find(sub_id);
449 if (it != sub_[dest_id].end())
450 it->second.update(cfgs);
452 create(dest_id, sub_id, cfgs);
461 sub_[dest_id].erase(sub_id);
469 std::vector<Value>
push(
const Value& fvt)
471 std::vector<Value> exceeded;
472 auto sub_exceeded =
sub(fvt.modem_id, fvt.subbuffer_id).push(fvt.data, fvt.push_time);
473 for (
const auto&
e : sub_exceeded)
474 exceeded.push_back({fvt.modem_id, fvt.subbuffer_id,
e.push_time,
e.data});
481 for (
const auto& sub_id_p : sub_)
483 for (
const auto& sub_p : sub_id_p.second)
485 if (!sub_p.second.empty())
497 for (
const auto& sub_id_p : sub_)
499 for (
const auto& sub_p : sub_id_p.second)
size += sub_p.second.size();
511 size_type max_bytes = std::numeric_limits<size_type>::max(),
512 typename Clock::duration ack_timeout = std::chrono::microseconds(0))
517 glog <<
group(glog_priority_group_) <<
"Starting priority contest (dest: "
520 <<
", max_bytes: " << max_bytes <<
"):" << std::endl;
522 typename std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>::iterator
524 double winning_value = -std::numeric_limits<double>::infinity();
526 auto now = Clock::now();
530 "DynamicBuffer::top() has no queues with this destination"));
534 : sub_.find(dest_id),
537 : ++sub_.find(dest_id);
538 sub_id_it != sub_id_end; ++sub_id_it)
540 for (
auto sub_it = sub_id_it->second.begin(), sub_end = sub_id_it->second.end();
541 sub_it != sub_end; ++sub_it)
545 std::tie(value, result) = sub_it->second.top_value(now, max_bytes, ack_timeout);
547 std::string value_or_reason;
555 value_or_reason =
"empty";
559 value_or_reason =
"blackout";
563 value_or_reason =
"too large";
567 value_or_reason =
"ack wait";
572 <<
" [dest: " << sub_id_it->first
573 <<
", n: " << sub_it->second.size()
574 <<
"]: " << value_or_reason << std::endl;
576 if (value > winning_value)
578 winning_value = value;
579 winning_sub = sub_it;
580 dest_id = sub_id_it->first;
585 if (winning_value == -std::numeric_limits<double>::infinity())
587 "DynamicBuffer::top() has no queue with a winning value"));
589 const auto& top_p = winning_sub->second.top(now, ack_timeout);
591 <<
" (" <<
data_size(top_p.data) <<
"B)" << std::endl;
593 return {dest_id, winning_sub->first, top_p.push_time, top_p.data};
603 return sub(value.modem_id, value.subbuffer_id).erase({value.push_time, value.data});
611 auto now = Clock::now();
612 std::vector<Value> expired;
613 for (
auto& sub_id_p : sub_)
615 for (
auto& sub_p : sub_id_p.second)
617 auto sub_expired = sub_p.second.expire(now);
618 for (
const auto&
e : sub_expired)
619 expired.push_back({sub_id_p.first, sub_p.first,
e.push_time,
e.data});
630 if (!sub_.count(dest_id) || !sub_.at(dest_id).count(sub_id))
632 " does not exist, must call create(...) first."));
633 return sub_.at(dest_id).at(sub_id);
638 std::map<modem_id_type, std::unordered_map<subbuffer_id_type, DynamicSubBuffer<T, Clock>>> sub_;
640 std::string glog_priority_group_;
641 static std::atomic<int> count_;
645 template <
typename T,
typename Clock> std::atomic<int> DynamicBuffer<T, Clock>::count_(0);