24 #ifndef GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
25 #define GOBY_MIDDLEWARE_IO_UDP_ONE_TO_MANY_H
28 #include <boost/asio/buffer.hpp>
29 #include <boost/asio/ip/udp.hpp>
30 #include <boost/asio/ip/udp.hpp>
31 #include <boost/asio/socket_base.hpp>
32 #include <boost/system/error_code.hpp>
64 bool use_indexed_groups =
false>
66 :
public detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
67 boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>
70 detail::IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
71 boost::asio::ip::udp::socket, ThreadType, use_indexed_groups>;
82 this->
interthread().template publish<line_in_group>(ready);
94 async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
override;
98 void open_socket()
override;
101 static constexpr
int max_udp_size{65507};
102 std::array<char, max_udp_size> rx_message_;
103 boost::asio::ip::udp::endpoint sender_endpoint_;
104 boost::asio::ip::udp::endpoint local_endpoint_;
114 template <
class>
class ThreadType,
bool use_indexed_groups>
116 subscribe_layer, Config, ThreadType,
117 use_indexed_groups>::open_socket()
119 auto protocol = this->cfg().ipv6() ? boost::asio::ip::udp::v6() :
boost::asio::ip::udp::v4();
120 this->mutable_socket().open(protocol);
122 if (this->cfg().set_reuseaddr())
125 boost::asio::socket_base::reuse_address option(
true);
126 this->mutable_socket().set_option(option);
129 if (this->cfg().set_broadcast())
132 this->mutable_socket().set_option(boost::asio::socket_base::broadcast(
true));
135 this->mutable_socket().bind(boost::asio::ip::udp::endpoint(protocol, this->cfg().bind_port()));
136 local_endpoint_ = this->mutable_socket().local_endpoint();
143 template <
class>
class ThreadType,
bool use_indexed_groups>
145 subscribe_layer, Config, ThreadType,
146 use_indexed_groups>::async_read()
148 this->mutable_socket().async_receive_from(
149 boost::asio::buffer(rx_message_), sender_endpoint_,
150 [
this](
const boost::system::error_code& ec,
size_t bytes_transferred)
152 if (!ec && bytes_transferred > 0)
154 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
155 *io_msg->mutable_data() =
156 std::string(rx_message_.begin(), rx_message_.begin() + bytes_transferred);
158 *io_msg->mutable_udp_src() =
159 detail::endpoint_convert<protobuf::UDPEndPoint>(sender_endpoint_);
160 *io_msg->mutable_udp_dest() =
161 detail::endpoint_convert<protobuf::UDPEndPoint>(local_endpoint_);
163 this->handle_read_success(bytes_transferred, io_msg);
168 this->handle_read_error(ec);
177 template <
class>
class ThreadType,
bool use_indexed_groups>
179 line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
180 use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
184 throw(
goby::Exception(
"UDPOneToManyThread requires 'udp_dest' field to be set in IOData"));
186 boost::asio::ip::udp::resolver resolver(this->mutable_io());
187 boost::asio::ip::udp::endpoint remote_endpoint =
189 boost::asio::ip::resolver_query_base::numeric_service});
191 this->mutable_socket().async_send_to(
192 boost::asio::buffer(io_msg->
data()), remote_endpoint,
193 [
this, io_msg](
const boost::system::error_code& ec, std::size_t bytes_transferred)
195 if (!ec && bytes_transferred > 0)
197 this->handle_write_success(bytes_transferred);
201 this->handle_write_error(ec);