Goby3  3.1.5a
2024.05.23
udp_one_to_many.h
Go to the documentation of this file.
1 // Copyright 2019-2023:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
25 #define GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
26 
27 #include <array> // for array
28 #include <boost/asio/buffer.hpp> // for buffer
29 #include <boost/asio/ip/udp.hpp>
30 #include <boost/asio/ip/udp.hpp> // for udp, udp::endpoint
31 #include <boost/asio/socket_base.hpp> // for socket_base
32 #include <boost/system/error_code.hpp> // for error_code
33 #include <cstddef> // for size_t
34 #include <memory> // for shared_ptr, __s...
35 #include <string> // for string, to_string
36 
37 #include "goby/exception.h" // for Exception
38 #include "goby/middleware/io/detail/io_interface.h" // for PubSubLayer
39 #include "goby/middleware/protobuf/io.pb.h" // for IOData, UDPEndP...
40 #include "goby/middleware/protobuf/udp_config.pb.h" // for UDPOneToManyConfig
41 
42 namespace goby
43 {
44 namespace middleware
45 {
46 class Group;
47 }
48 } // namespace goby
49 
50 namespace goby
51 {
52 namespace middleware
53 {
54 namespace io
55 {
56 template <const goby::middleware::Group& line_in_group,
57  const goby::middleware::Group& line_out_group,
58  // by default publish all incoming traffic to interprocess for logging
59  PubSubLayer publish_layer = PubSubLayer::INTERPROCESS,
60  // but only subscribe on interthread for outgoing traffic
61  PubSubLayer subscribe_layer = PubSubLayer::INTERTHREAD,
63  template <class> class ThreadType = goby::middleware::SimpleThread,
64  bool use_indexed_groups = false>
66  : public detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
67  boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>
68 {
69  using Base =
70  detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
71  boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>;
72 
73  public:
76  UDPOneToManyThread(const Config& config, int index = -1, bool is_final = true)
77  : Base(config, index, std::string("udp: ") + std::to_string(config.bind_port()))
78  {
79  if (is_final)
80  {
82  this->interthread().template publish<line_in_group>(ready);
83  }
84  }
85 
86  ~UDPOneToManyThread() override {}
87 
88  protected:
90  virtual void async_read() override;
91 
93  virtual void
94  async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) override;
95 
96  private:
98  void open_socket() override;
99 
100  private:
101  static constexpr int max_udp_size{65507};
102  std::array<char, max_udp_size> rx_message_;
103  boost::asio::ip::udp::endpoint sender_endpoint_;
104  boost::asio::ip::udp::endpoint local_endpoint_;
105 };
106 } // namespace io
107 } // namespace middleware
108 } // namespace goby
109 
110 template <const goby::middleware::Group& line_in_group,
111  const goby::middleware::Group& line_out_group,
112  goby::middleware::io::PubSubLayer publish_layer,
113  goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
114  template <class> class ThreadType, bool use_indexed_groups>
115 void goby::middleware::io::UDPOneToManyThread<line_in_group, line_out_group, publish_layer,
116  subscribe_layer, Config, ThreadType,
117  use_indexed_groups>::open_socket()
118 {
119  auto protocol = this->cfg().ipv6() ? boost::asio::ip::udp::v6() : boost::asio::ip::udp::v4();
120  this->mutable_socket().open(protocol);
121 
122  if (this->cfg().set_reuseaddr())
123  {
124  // SO_REUSEADDR
125  boost::asio::socket_base::reuse_address option(true);
126  this->mutable_socket().set_option(option);
127  }
128 
129  if (this->cfg().set_broadcast())
130  {
131  // SO_BROADCAST
132  this->mutable_socket().set_option(boost::asio::socket_base::broadcast(true));
133  }
134 
135  this->mutable_socket().bind(boost::asio::ip::udp::endpoint(protocol, this->cfg().bind_port()));
136  local_endpoint_ = this->mutable_socket().local_endpoint();
137 }
138 
139 template <const goby::middleware::Group& line_in_group,
140  const goby::middleware::Group& line_out_group,
141  goby::middleware::io::PubSubLayer publish_layer,
142  goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
143  template <class> class ThreadType, bool use_indexed_groups>
144 void goby::middleware::io::UDPOneToManyThread<line_in_group, line_out_group, publish_layer,
145  subscribe_layer, Config, ThreadType,
146  use_indexed_groups>::async_read()
147 {
148  this->mutable_socket().async_receive_from(
149  boost::asio::buffer(rx_message_), sender_endpoint_,
150  [this](const boost::system::error_code& ec, size_t bytes_transferred)
151  {
152  if (!ec && bytes_transferred > 0)
153  {
154  auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
155  *io_msg->mutable_data() =
156  std::string(rx_message_.begin(), rx_message_.begin() + bytes_transferred);
157 
158  *io_msg->mutable_udp_src() =
159  detail::endpoint_convert<protobuf::UDPEndPoint>(sender_endpoint_);
160  *io_msg->mutable_udp_dest() =
161  detail::endpoint_convert<protobuf::UDPEndPoint>(local_endpoint_);
162 
163  this->handle_read_success(bytes_transferred, io_msg);
164  this->async_read();
165  }
166  else
167  {
168  this->handle_read_error(ec);
169  }
170  });
171 }
172 
173 template <const goby::middleware::Group& line_in_group,
174  const goby::middleware::Group& line_out_group,
175  goby::middleware::io::PubSubLayer publish_layer,
176  goby::middleware::io::PubSubLayer subscribe_layer, typename Config,
177  template <class> class ThreadType, bool use_indexed_groups>
179  line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
180  use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
181  io_msg)
182 {
183  if (!io_msg->has_udp_dest())
184  throw(goby::Exception("UDPOneToManyThread requires 'udp_dest' field to be set in IOData"));
185 
186  boost::asio::ip::udp::resolver resolver(this->mutable_io());
187  boost::asio::ip::udp::endpoint remote_endpoint =
188  *resolver.resolve({io_msg->udp_dest().addr(), std::to_string(io_msg->udp_dest().port()),
189  boost::asio::ip::resolver_query_base::numeric_service});
190 
191  this->mutable_socket().async_send_to(
192  boost::asio::buffer(io_msg->data()), remote_endpoint,
193  [this, io_msg](const boost::system::error_code& ec, std::size_t bytes_transferred)
194  {
195  if (!ec && bytes_transferred > 0)
196  {
197  this->handle_write_success(bytes_transferred);
198  }
199  else
200  {
201  this->handle_write_error(ec);
202  }
203  });
204 }
205 
206 #endif
goby::middleware::io::UDPOneToManyThread
Definition: udp_one_to_many.h:65
io.pb.h
goby::middleware::protobuf::UDPOneToManyConfig
Definition: udp_config.pb.h:80
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::acomms::abc::protobuf::config
extern ::google::protobuf::internal::ExtensionIdentifier< ::goby::acomms::protobuf::DriverConfig, ::google::protobuf::internal::MessageTypeTraits< ::goby::acomms::abc::protobuf::Config >, 11, false > config
Definition: abc_driver.pb.h:203
goby::middleware::Thread< goby::middleware::protobuf::UDPOneToManyConfig, InterVehicleForwarder< InterProcessForwarder< InterThreadTransporter > > >::index
int index() const
Definition: thread.h:145
goby::middleware::io::UDPOneToManyThread::UDPOneToManyThread
UDPOneToManyThread(const Config &config, int index=-1, bool is_final=true)
Constructs the thread.
Definition: udp_one_to_many.h:76
boost
Definition: udp_driver.h:41
goby::middleware::protobuf::IOData::udp_dest
const ::goby::middleware::protobuf::UDPEndPoint & udp_dest() const
Definition: io.pb.h:2028
goby::middleware::SimpleThread
Implements Thread for a three layer middleware setup ([ intervehicle [ interprocess [ interthread ] ]...
Definition: simple_thread.h:43
goby::middleware::io::PubSubLayer
PubSubLayer
Definition: io_transporters.h:38
goby::middleware::protobuf::IOData::has_udp_dest
bool has_udp_dest() const
Definition: io.pb.h:2002
goby::middleware::io::UDPOneToManyThread::async_write
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg) override
Starts an asynchronous write from data published.
Definition: udp_one_to_many.h:180
goby::middleware::protobuf::IOData::data
const ::std::string & data() const
Definition: io.pb.h:2103
goby::middleware::io::UDPOneToManyThread::~UDPOneToManyThread
~UDPOneToManyThread() override
Definition: udp_one_to_many.h:86
goby::middleware::protobuf::UDPEndPoint::port
::google::protobuf::uint32 port() const
Definition: io.pb.h:1757
to_string
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.hpp:24301
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
goby::middleware::SimpleThread< goby::middleware::protobuf::UDPOneToManyConfig >::interthread
InterThreadTransporter & interthread()
Access the transporter on the interthread layer (this is the innermost transporter)
Definition: simple_thread.h:96
io_interface.h
goby::middleware::io::detail::IOThread
Definition: io_interface.h:79
goby::middleware::io::PubSubLayer::INTERPROCESS
@ INTERPROCESS
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
exception.h
goby::middleware::io::ThreadState::SUBSCRIPTIONS_COMPLETE
@ SUBSCRIPTIONS_COMPLETE
goby::middleware::to_string
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition: common.h:44
goby::middleware::io::UDPOneToManyThread::async_read
virtual void async_read() override
Starts an asynchronous read on the udp socket.
Definition: udp_one_to_many.h:146
goby::middleware::io::PubSubLayer::INTERTHREAD
@ INTERTHREAD
udp_config.pb.h