Goby3  3.1.5a
2024.05.23
driver_thread.h
Go to the documentation of this file.
1 // Copyright 2019-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
25 #define GOBY_MIDDLEWARE_TRANSPORT_INTERVEHICLE_DRIVER_THREAD_H
26 
27 #include <algorithm>
28 #include <chrono>
29 #include <cstddef>
30 #include <map>
31 #include <memory>
32 #include <ostream>
33 #include <set>
34 #include <string>
35 #include <vector>
36 
37 #include <boost/units/quantity.hpp>
38 
43 #include "goby/middleware/group.h"
51 #include "goby/time/convert.h"
52 #include "goby/time/steady_clock.h"
53 #include "goby/time/system_clock.h"
54 #include "goby/time/types.h"
56 
57 namespace goby
58 {
59 namespace acomms
60 {
61 namespace protobuf
62 {
63 class ModemTransmission;
64 } // namespace protobuf
65 } // namespace acomms
66 
67 namespace middleware
68 {
69 template <typename Data> class Publisher;
70 
71 namespace protobuf
72 {
73 inline size_t data_size(const SerializerTransporterMessage& msg) { return msg.data().size(); }
74 
76 {
77  return (a.key().serialize_time() == b.key().serialize_time() &&
79  a.key().type() == b.key().type() && a.key().group() == b.key().group() &&
80  a.data() == b.data());
81 }
82 
84 {
85  if (a.key().serialize_time() != b.key().serialize_time())
86  return a.key().serialize_time() < b.key().serialize_time();
87  else if (a.key().marshalling_scheme() != b.key().marshalling_scheme())
88  return a.key().marshalling_scheme() < b.key().marshalling_scheme();
89  else if (a.key().type() != b.key().type())
90  return a.key().type() < b.key().type();
91  else if (a.key().group() != b.key().group())
92  return a.key().group() < b.key().group();
93  else
94  return a.data() < b.data();
95 }
96 
97 } // namespace protobuf
98 
99 namespace intervehicle
100 {
101 namespace protobuf
102 {
103 inline bool operator==(const TransporterConfig& a, const TransporterConfig& b)
104 {
105  return a.SerializeAsString() == b.SerializeAsString();
106 }
107 
108 } // namespace protobuf
109 
110 template <typename Data>
111 std::shared_ptr<goby::middleware::protobuf::SerializerTransporterMessage>
112 serialize_publication(const Data& d, const Group& group, const Publisher<Data>& publisher)
113 {
115  auto* sbytes = new std::string(bytes.begin(), bytes.end());
116  auto msg = std::make_shared<goby::middleware::protobuf::SerializerTransporterMessage>();
117 
118  auto* key = msg->mutable_key();
119  key->set_marshalling_scheme(MarshallingScheme::DCCL);
121  key->set_group(std::string(group));
122  key->set_group_numeric(group.numeric());
123  auto now = goby::time::SystemClock::now<goby::time::MicroTime>();
124  key->set_serialize_time_with_units(now);
125  *key->mutable_cfg() = publisher.cfg();
126  msg->set_allocated_data(sbytes);
127  return msg;
128 }
129 
131  : public goby::middleware::Thread<intervehicle::protobuf::PortalConfig::LinkConfig,
132  InterProcessForwarder<InterThreadTransporter>>
133 {
134  public:
138 
140  void loop() override;
141  int tx_queue_size() { return buffer_.size(); }
142 
143  private:
144  void _data_request(goby::acomms::protobuf::ModemTransmission* msg);
145  void _buffer_message(
146  const std::shared_ptr<const goby::middleware::protobuf::SerializerTransporterMessage>& msg);
147  void _receive(const goby::acomms::protobuf::ModemTransmission& rx_msg);
148  void _forward_subscription(intervehicle::protobuf::Subscription subscription);
149  void _accept_subscription(const intervehicle::protobuf::Subscription& subscription);
150  void _expire_value(const goby::time::SteadyClock::time_point now,
153 
154  subbuffer_id_type _create_buffer_id(unsigned dccl_id, unsigned group);
155 
157  _create_buffer_id(const goby::middleware::protobuf::SerializerTransporterKey& key)
158  {
159  return _create_buffer_id(detail::DCCLSerializerParserHelperBase::id(key.type()),
160  key.group_numeric());
161  }
162 
163  subbuffer_id_type _create_buffer_id(const intervehicle::protobuf::Subscription& subscription)
164  {
165  return _create_buffer_id(subscription.dccl_id(), subscription.group());
166  }
167 
168  void _try_create_or_update_buffer(modem_id_type dest_id, const subbuffer_id_type& buffer_id);
169 
170  modem_id_type _broadcast_id() { return cfg().modem_id() & cfg().subnet_mask(); }
171 
172  // id within subnet
173  modem_id_type _id_within_subnet(modem_id_type id) { return id - _broadcast_id(); }
174 
175  // full id
176  modem_id_type _full_id(modem_id_type id_in_subnet) { return id_in_subnet + _broadcast_id(); }
177 
178  bool _dest_is_in_subnet(modem_id_type dest_id)
179  {
180  bool dest_in_subnet =
181  (dest_id & cfg().subnet_mask()) == (cfg().modem_id() & cfg().subnet_mask());
182  if (!dest_in_subnet)
184  << "Dest: " << dest_id
185  << " is not in subnet (our id: " << cfg().modem_id()
186  << ", mask: " << cfg().subnet_mask() << ")" << std::endl;
187 
188  return dest_in_subnet;
189  }
190 
191  void _publish_subscription_report(const intervehicle::protobuf::Subscription& changed);
192 
193  private:
194  std::unique_ptr<InterThreadTransporter> interthread_;
195  std::unique_ptr<InterProcessForwarder<InterThreadTransporter>> interprocess_;
196 
197  std::multimap<subbuffer_id_type, goby::middleware::protobuf::SerializerTransporterKey>
198  publisher_buffer_cfg_;
199 
200  std::map<modem_id_type, std::multimap<subbuffer_id_type, intervehicle::protobuf::Subscription>>
201  subscriber_buffer_cfg_;
202 
203  std::map<subbuffer_id_type, std::set<modem_id_type>> subbuffers_created_;
204 
206  std::set<modem_id_type> subscription_subbuffers_;
207 
209 
210  using frame_type = int;
211  std::map<frame_type, std::vector<goby::acomms::DynamicBuffer<buffer_data_type>::Value>>
212  pending_ack_;
213 
214  std::unique_ptr<goby::acomms::ModemDriverBase> driver_;
216 
217  std::string glog_group_;
218 
219  static std::map<std::string, void*> driver_plugins_;
220 
221  goby::time::SteadyClock::time_point next_modem_report_time_;
222  const goby::time::SteadyClock::duration modem_report_interval_;
223 };
224 
225 } // namespace intervehicle
226 } // namespace middleware
227 } // namespace goby
228 
229 #endif
goby::acomms::protobuf::ModemTransmission
Definition: modem_message.pb.h:166
goby::middleware::protobuf::data_size
size_t data_size(const SerializerTransporterMessage &msg)
Definition: driver_thread.h:73
goby::middleware::protobuf::SerializerTransporterKey::serialize_time
::google::protobuf::uint64 serialize_time() const
Definition: serializer_transporter.pb.h:1057
goby::middleware::intervehicle::protobuf::TransporterConfig
Definition: intervehicle_transporter_config.pb.h:77
goby::middleware::intervehicle::protobuf::Subscription
Definition: intervehicle.pb.h:822
goby::time::SteadyClock::duration
std::chrono::microseconds duration
Duration type.
Definition: steady_clock.h:42
goby::acomms::DynamicBuffer::Value
Definition: dynamic_buffer.h:371
system_clock.h
goby::middleware::intervehicle::ModemDriverThread
Definition: driver_thread.h:130
goby::middleware::protobuf::SerializerTransporterMessage
Definition: serializer_transporter.pb.h:462
interface.h
types.h
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::middleware::SerializerParserHelper
Class for parsing and serializing a given marshalling scheme. Must be specialized for a particular sc...
Definition: interface.h:97
goby::middleware::intervehicle::protobuf::Subscription::dccl_id
::google::protobuf::uint32 dccl_id() const
Definition: intervehicle.pb.h:3433
intervehicle.pb.h
group.h
goby::middleware::Thread< intervehicle::protobuf::PortalConfig::LinkConfig, InterProcessForwarder< InterThreadTransporter > >::cfg
const intervehicle::protobuf::PortalConfig::LinkConfig & cfg() const
Definition: thread.h:201
goby::util::FlexOstream::is_debug3
bool is_debug3()
Definition: flex_ostream.h:86
goby::middleware::protobuf::SerializerTransporterKey::type
const ::std::string & type() const
Definition: serializer_transporter.pb.h:901
goby::middleware::protobuf::SerializerTransporterKey
Definition: serializer_transporter.pb.h:244
dynamic_buffer.h
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::Publisher
Class that holds additional metadata and callback functions related to a publication (and is optional...
Definition: driver_thread.h:69
goby::acomms::MACManager
provides an API to the goby-acomms MAC library. MACManager is essentially a std::list<protobuf::Modem...
Definition: mac_manager.h:50
goby::middleware::intervehicle::protobuf::Subscription::group
::google::protobuf::uint32 group() const
Definition: intervehicle.pb.h:3457
driver_base.h
goby::middleware::protobuf::SerializerTransporterKey::group
const ::std::string & group() const
Definition: serializer_transporter.pb.h:967
goby::middleware::protobuf::operator<
bool operator<(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
Definition: tcp_server_interface.h:60
goby::middleware::intervehicle::ModemDriverThread::tx_queue_size
int tx_queue_size()
Definition: driver_thread.h:141
goby::time::SteadyClock::time_point
std::chrono::time_point< SteadyClock > time_point
Definition: steady_clock.h:45
goby::middleware::protobuf::SerializerTransporterMessage::data
const ::std::string & data() const
Definition: serializer_transporter.pb.h:1255
goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig::subnet_mask
::google::protobuf::uint32 subnet_mask() const
Definition: intervehicle.pb.h:2808
goby::middleware::intervehicle::protobuf::ExpireData_ExpireReason
ExpireData_ExpireReason
Definition: intervehicle.pb.h:167
mac_manager.h
interthread.h
goby::middleware::Thread
Represents a thread of execution within the Goby middleware, interleaving periodic events (loop()) wi...
Definition: thread.h:60
dccl.h
steady_clock.h
goby::middleware::protobuf::operator==
bool operator==(const TCPEndPoint &ep_a, const TCPEndPoint &ep_b)
Definition: tcp_server_interface.h:65
goby::middleware::intervehicle::ModemDriverThread::modem_id_type
goby::acomms::DynamicBuffer< buffer_data_type >::modem_id_type modem_id_type
Definition: driver_thread.h:136
goby::middleware::Publisher::cfg
const goby::middleware::protobuf::TransporterConfig & cfg() const
Returns the metadata configuration.
Definition: publisher.h:81
interprocess.h
flex_ostream.h
goby::msg
extern ::google::protobuf::internal::ExtensionIdentifier< ::google::protobuf::MessageOptions, ::google::protobuf::internal::MessageTypeTraits< ::goby::GobyMessageOptions >, 11, false > msg
Definition: option_extensions.pb.h:1327
goby::middleware::protobuf::SerializerTransporterKey::marshalling_scheme
::google::protobuf::int32 marshalling_scheme() const
Definition: serializer_transporter.pb.h:877
convert.h
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
goby::acomms::DynamicBuffer
Represents a time-dependent priority queue for several groups of messages (multiple DynamicSubBuffers...
Definition: dynamic_buffer.h:356
serializer_transporter.pb.h
goby::middleware::MarshallingScheme::DCCL
@ DCCL
Definition: interface.h:54
goby::acomms::DynamicBuffer::subbuffer_id_type
std::string subbuffer_id_type
Definition: dynamic_buffer.h:367
goby::middleware::detail::DCCLSerializerParserHelperBase::id
static unsigned id(CharIterator begin, CharIterator end)
Definition: dccl_serializer_parser.h:134
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
google::protobuf::MessageLite::SerializeAsString
string SerializeAsString() const
goby::middleware::intervehicle::ModemDriverThread::ModemDriverThread
ModemDriverThread(const intervehicle::protobuf::PortalConfig::LinkConfig &cfg)
goby::middleware::intervehicle::protobuf::operator==
bool operator==(const TransporterConfig &a, const TransporterConfig &b)
Definition: driver_thread.h:103
thread.h
goby::middleware::protobuf::SerializerTransporterMessage::key
const ::goby::middleware::protobuf::SerializerTransporterKey & key() const
Definition: serializer_transporter.pb.h:1200
intervehicle_transporter_config.pb.h
goby::middleware::intervehicle::serialize_publication
std::shared_ptr< goby::middleware::protobuf::SerializerTransporterMessage > serialize_publication(const Data &d, const Group &group, const Publisher< Data > &publisher)
Definition: driver_thread.h:112
goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig::modem_id
::google::protobuf::uint32 modem_id() const
Definition: intervehicle.pb.h:2784
goby::middleware::intervehicle::ModemDriverThread::subbuffer_id_type
goby::acomms::DynamicBuffer< buffer_data_type >::subbuffer_id_type subbuffer_id_type
Definition: driver_thread.h:137
goby::middleware::intervehicle::protobuf::PortalConfig_LinkConfig
Definition: intervehicle.pb.h:208
goby::middleware::intervehicle::ModemDriverThread::loop
void loop() override
int