Goby3  3.1.5a
2024.05.23
queue.h
Go to the documentation of this file.
1 // Copyright 2009-2021:
2 // GobySoft, LLC (2013-)
3 // Massachusetts Institute of Technology (2007-2014)
4 // Community contributors (see AUTHORS file)
5 // File authors:
6 // Toby Schneider <toby@gobysoft.org>
7 //
8 //
9 // This file is part of the Goby Underwater Autonomy Project Libraries
10 // ("The Goby Libraries").
11 //
12 // The Goby Libraries are free software: you can redistribute them and/or modify
13 // them under the terms of the GNU Lesser General Public License as published by
14 // the Free Software Foundation, either version 2.1 of the License, or
15 // (at your option) any later version.
16 //
17 // The Goby Libraries are distributed in the hope that they will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public License
23 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
24 
25 #ifndef GOBY_ACOMMS_QUEUE_QUEUE_H
26 #define GOBY_ACOMMS_QUEUE_QUEUE_H
27 
28 #include <cstddef> // for size_t
29 #include <iostream> // for ostream
30 #include <list> // for list
31 #include <map> // for multimap
32 #include <memory> // for shared_ptr
33 #include <string> // for string
34 #include <vector> // for vector
35 
36 #include <boost/any.hpp> // for any
37 #include <boost/date_time/posix_time/posix_time_config.hpp> // for time_dur...
38 #include <boost/date_time/posix_time/ptime.hpp> // for ptime
39 #include <boost/units/quantity.hpp> // for quantity
40 #include <google/protobuf/descriptor.h> // for Descriptor
41 
42 #include "goby/acomms/dccl/dccl.h" // for DCCLCodec
43 #include "goby/acomms/protobuf/queue.pb.h" // for QueuedMe...
44 #include "goby/time/convert.h" // for convert
45 
46 namespace google
47 {
48 namespace protobuf
49 {
50 class Message;
51 } // namespace protobuf
52 } // namespace google
53 
54 namespace goby
55 {
56 namespace acomms
57 {
58 class QueueManager;
59 namespace protobuf
60 {
61 class ModemTransmission;
62 } // namespace protobuf
63 
65 {
66  std::shared_ptr<google::protobuf::Message> dccl_msg;
68 };
69 
70 typedef std::list<QueuedMessage>::iterator messages_it;
71 using waiting_for_ack_it = std::multimap<unsigned int, messages_it>::iterator;
72 
73 class Queue
74 {
75  public:
76  Queue(const google::protobuf::Descriptor* desc, QueueManager* parent,
78 
79  bool push_message(const std::shared_ptr<google::protobuf::Message>& dccl_msg);
80  bool push_message(const std::shared_ptr<google::protobuf::Message>& dccl_msg,
82 
84 
85  boost::any find_queue_field(const std::string& field_name,
87 
88  goby::acomms::QueuedMessage give_data(unsigned frame);
89  bool pop_message(unsigned frame);
90  bool pop_message_ack(unsigned frame, std::shared_ptr<google::protobuf::Message>& removed_msg);
91  void stream_for_pop(const QueuedMessage& queued_msg);
92 
93  std::vector<std::shared_ptr<google::protobuf::Message> > expire();
94 
95  bool get_priority_values(double* priority, boost::posix_time::ptime* last_send_time,
96  const protobuf::ModemTransmission& request_msg,
97  const std::string& data);
98 
99  // returns true if empty
100  bool clear_ack_queue(unsigned start_frame);
101 
102  void flush();
103 
104  size_t size() const { return messages_.size(); }
105 
106  boost::posix_time::ptime last_send_time() const { return last_send_time_; }
107 
108  boost::posix_time::ptime newest_msg_time() const
109  {
110  return size() ? goby::time::convert<boost::posix_time::ptime>(
111  messages_.back().meta.time_with_units())
112  : boost::posix_time::ptime();
113  }
114 
115  void info(std::ostream* os) const;
116 
117  std::string name() const { return desc_->full_name(); }
118 
120  {
121  cfg_ = cfg;
122  process_cfg();
123  }
124  void process_cfg();
125 
127 
128  const google::protobuf::Descriptor* descriptor() const { return desc_; }
129 
130  int id() { return goby::acomms::DCCLCodec::get()->id(desc_); }
131 
132  private:
133  waiting_for_ack_it find_ack_value(messages_it it_to_find);
134  messages_it next_message_it();
135 
136  void set_latest_metadata(const google::protobuf::FieldDescriptor* field,
137  const boost::any& field_value, const boost::any& wire_value);
138 
139  double time_duration2double(const boost::posix_time::time_duration& time_of_day);
140 
141  private:
142  Queue& operator=(const Queue&);
143  Queue(const Queue&);
144 
145  const google::protobuf::Descriptor* desc_;
146  QueueManager* parent_;
148 
149  // maps role onto FieldDescriptor::full_name() or empty string if static role
150  std::map<protobuf::QueuedMessageEntry::RoleType, std::string> roles_;
151 
152  boost::posix_time::ptime last_send_time_;
153 
154  std::list<QueuedMessage> messages_;
155 
156  // map frame number onto messages list iterator
157  // can have multiples in the same frame now
158  std::multimap<unsigned, messages_it> waiting_for_ack_;
159 
160  protobuf::QueuedMessageMeta static_meta_;
161 };
162 std::ostream& operator<<(std::ostream& os, const Queue& oq);
163 } // namespace acomms
164 
165 } // namespace goby
166 #endif
goby::acomms::protobuf::ModemTransmission
Definition: modem_message.pb.h:166
goby::acomms::Queue::pop_message
bool pop_message(unsigned frame)
goby::acomms::Queue::queue_message_options
const protobuf::QueuedMessageEntry & queue_message_options()
Definition: queue.h:126
goby::acomms::QueuedMessage
Definition: queue.h:64
queue.pb.h
goby::acomms::protobuf::QueuedMessageMeta
Definition: queue.pb.h:1126
goby::acomms::Queue
Definition: queue.h:73
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::acomms::Queue::last_send_time
boost::posix_time::ptime last_send_time() const
Definition: queue.h:106
goby::acomms::QueuedMessage::dccl_msg
std::shared_ptr< google::protobuf::Message > dccl_msg
Definition: queue.h:66
goby::acomms::Queue::size
size_t size() const
Definition: queue.h:104
goby::acomms::DCCLCodec::id
unsigned id() const
Definition: dccl.h:169
goby::acomms::QueuedMessage::meta
protobuf::QueuedMessageMeta meta
Definition: queue.h:67
goby::acomms::Queue::set_cfg
void set_cfg(const protobuf::QueuedMessageEntry &cfg)
Definition: queue.h:119
goby::acomms::Queue::info
void info(std::ostream *os) const
goby::acomms::Queue::stream_for_pop
void stream_for_pop(const QueuedMessage &queued_msg)
goby::acomms::DCCLCodec::get
static DCCLCodec * get()
DCCLCodec is a singleton class; use this to get a pointer to the class.
Definition: dccl.h:132
goby::acomms::waiting_for_ack_it
std::multimap< unsigned int, messages_it >::iterator waiting_for_ack_it
Definition: queue.h:71
goby::acomms::operator<<
std::ostream & operator<<(std::ostream &os, const MACManager &mac)
goby::field
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::FieldOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyFieldOptions >, 11, false > field
Definition: option_extensions.pb.h:1323
dccl.h
goby::acomms::Queue::get_priority_values
bool get_priority_values(double *priority, boost::posix_time::ptime *last_send_time, const protobuf::ModemTransmission &request_msg, const std::string &data)
goby::acomms::Queue::meta_from_msg
protobuf::QueuedMessageMeta meta_from_msg(const google::protobuf::Message &dccl_msg)
goby::acomms::Queue::Queue
Queue(const google::protobuf::Descriptor *desc, QueueManager *parent, protobuf::QueuedMessageEntry cfg=protobuf::QueuedMessageEntry())
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::acomms::Queue::process_cfg
void process_cfg()
goby::acomms::Queue::flush
void flush()
goby::acomms::Queue::give_data
goby::acomms::QueuedMessage give_data(unsigned frame)
goby::acomms::Queue::expire
std::vector< std::shared_ptr< google::protobuf::Message > > expire()
convert.h
google::protobuf::Message
Definition: message.h:189
goby::acomms::QueueManager
provides an API to the goby-acomms Queuing Library.
Definition: queue_manager.h:66
goby::acomms::Queue::pop_message_ack
bool pop_message_ack(unsigned frame, std::shared_ptr< google::protobuf::Message > &removed_msg)
goby::acomms::messages_it
std::list< QueuedMessage >::iterator messages_it
Definition: queue.h:70
goby::acomms::Queue::name
std::string name() const
Definition: queue.h:117
goby::acomms::protobuf::QueuedMessageEntry
Definition: queue.pb.h:319
goby::acomms::Queue::clear_ack_queue
bool clear_ack_queue(unsigned start_frame)
goby::acomms::Queue::id
int id()
Definition: queue.h:130
goby::acomms::Queue::find_queue_field
boost::any find_queue_field(const std::string &field_name, const google::protobuf::Message &msg)
goby::acomms::Queue::push_message
bool push_message(const std::shared_ptr< google::protobuf::Message > &dccl_msg)
google
Definition: dccl.h:57
goby::acomms::Queue::descriptor
const google::protobuf::Descriptor * descriptor() const
Definition: queue.h:128
goby::acomms::Queue::newest_msg_time
boost::posix_time::ptime newest_msg_time() const
Definition: queue.h:108