24 #ifndef GOBY_MIDDLEWARE_IO_DETAIL_TCP_SERVER_INTERFACE_H
25 #define GOBY_MIDDLEWARE_IO_DETAIL_TCP_SERVER_INTERFACE_H
33 #include <boost/asio/buffer.hpp>
34 #include <boost/asio/error.hpp>
35 #include <boost/asio/ip/tcp.hpp>
36 #include <boost/asio/write.hpp>
37 #include <boost/system/error_code.hpp>
74 template <
typename TCPServerThreadType>
75 class TCPSession :
public std::enable_shared_from_this<TCPSession<TCPServerThreadType>>
78 TCPSession(boost::asio::ip::tcp::socket socket, TCPServerThreadType& server)
79 : socket_(std::move(socket)),
88 auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
89 if (server_.index() != -1)
90 event->set_index(server_.index());
92 *
event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
93 *
event->mutable_remote_endpoint() =
94 endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
95 event->set_number_of_clients(server_.clients_.size());
97 <<
"Event: " <<
event->ShortDebugString() << std::endl;
98 server_.publish_in(event);
103 server_.clients_.insert(this->shared_from_this());
105 auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
106 if (server_.index() != -1)
107 event->set_index(server_.index());
109 *
event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
110 *
event->mutable_remote_endpoint() =
111 endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
112 event->set_number_of_clients(server_.clients_.size());
114 <<
"Event: " <<
event->ShortDebugString() << std::endl;
115 server_.publish_in(event);
120 const boost::asio::ip::tcp::endpoint&
local_endpoint() {
return local_endpoint_; }
122 const std::string&
glog_group() {
return server_.glog_group(); }
125 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
127 auto self(this->shared_from_this());
128 boost::asio::async_write(
129 socket_, boost::asio::buffer(io_msg->
data()),
130 [
this,
self, io_msg](boost::system::error_code ec, std::size_t bytes_transferred) {
133 this->handle_write_success(bytes_transferred);
137 this->handle_write_error(ec);
145 server_.handle_write_success(bytes_transferred);
150 server_.clients_.erase(this->shared_from_this());
154 std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
156 *io_msg->
mutable_tcp_src() = endpoint_convert<protobuf::TCPEndPoint>(remote_endpoint_);
157 *io_msg->
mutable_tcp_dest() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
159 server_.handle_read_success(bytes_transferred, io_msg);
164 if (ec != boost::asio::error::eof)
167 server_.clients_.erase(this->shared_from_this());
170 const typename TCPServerThreadType::ConfigType&
cfg() {
return server_.
cfg(); }
175 virtual void async_read() = 0;
178 boost::asio::ip::tcp::socket socket_;
179 TCPServerThreadType& server_;
180 boost::asio::ip::tcp::endpoint remote_endpoint_;
181 boost::asio::ip::tcp::endpoint local_endpoint_;
186 PubSubLayer subscribe_layer,
typename Config,
template <
class>
class ThreadType,
187 bool use_indexed_groups =
false>
189 :
public IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
190 boost::asio::ip::tcp::acceptor, ThreadType, use_indexed_groups>
192 using Base =
IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, Config,
193 boost::asio::ip::tcp::acceptor, ThreadType, use_indexed_groups>;
195 using ConfigType = Config;
202 tcp_socket_(this->mutable_io())
205 this->interthread().template publish<line_in_group>(ready);
210 template <
typename TCPServerThreadType>
friend class TCPSession;
214 void async_read()
override { async_accept(); }
218 void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
override;
221 void open_socket()
override { open_acceptor(); }
222 void open_acceptor();
224 virtual void start_session(boost::asio::ip::tcp::socket tcp_socket) = 0;
227 boost::asio::ip::tcp::endpoint remote_endpoint_;
228 boost::asio::ip::tcp::endpoint local_endpoint_;
230 boost::asio::ip::tcp::socket tcp_socket_;
232 std::set<std::shared_ptr<
233 TCPSession<TCPServerThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
234 Config, ThreadType, use_indexed_groups>>>>
246 template <
class>
class ThreadType,
bool use_indexed_groups>
248 subscribe_layer, Config, ThreadType,
249 use_indexed_groups>::open_acceptor()
251 auto& acceptor = this->mutable_socket();
252 auto protocol = this->cfg().ipv6() ? boost::asio::ip::tcp::v6() :
boost::asio::ip::tcp::v4();
253 acceptor.open(protocol);
255 if (this->cfg().set_reuseaddr())
258 boost::asio::socket_base::reuse_address option(
true);
259 acceptor.set_option(option);
262 acceptor.bind(boost::asio::ip::tcp::endpoint(protocol, this->cfg().bind_port()));
267 <<
"Successfully bound acceptor to port: " << this->cfg().bind_port()
268 <<
" and began listening" << std::endl;
270 local_endpoint_ = acceptor.local_endpoint();
272 auto event = std::make_shared<goby::middleware::protobuf::TCPServerEvent>();
273 if (this->index() != -1)
274 event->set_index(this->index());
276 *
event->mutable_local_endpoint() = endpoint_convert<protobuf::TCPEndPoint>(local_endpoint_);
278 <<
"Event: " <<
event->ShortDebugString() << std::endl;
279 this->publish_in(event);
286 template <
class>
class ThreadType,
bool use_indexed_groups>
288 subscribe_layer, Config, ThreadType,
289 use_indexed_groups>::async_accept()
291 auto& acceptor = this->mutable_socket();
292 acceptor.async_accept(tcp_socket_, [
this](boost::system::error_code ec) {
296 <<
"Received connection from: "
297 << tcp_socket_.remote_endpoint() << std::endl;
299 start_session(std::move(tcp_socket_));
301 this->async_accept();
305 this->handle_read_error(ec);
314 template <
class>
class ThreadType,
bool use_indexed_groups>
316 line_in_group, line_out_group, publish_layer, subscribe_layer, Config, ThreadType,
317 use_indexed_groups>::async_write(std::shared_ptr<const goby::middleware::protobuf::IOData>
321 throw(
goby::Exception(
"TCPServerThread requires 'tcp_dest' field to be set in IOData"));
324 throw(
goby::Exception(
"TCPServerThread requires 'tcp_dest' field to have 'addr'/'port' set "
325 "or all_clients=true in IOData"));
327 for (
auto& client : clients_)
331 endpoint_convert<protobuf::TCPEndPoint>(client->remote_endpoint())))
333 client->async_write(io_msg);