24 #ifndef GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
25 #define GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
36 #include <boost/asio/write.hpp>
37 #include <boost/system/error_code.hpp>
55 class InterThreadTransporter;
66 template <
typename ProtobufEndpo
int,
typename ASIOEndpo
int>
69 ProtobufEndpoint pb_ep;
70 pb_ep.set_addr(asio_ep.address().to_string());
71 pb_ep.set_port(asio_ep.port());
77 PubSubLayer subscribe_layer,
typename IOConfig,
typename SocketType,
78 template <
class>
class ThreadType,
bool use_indexed_groups =
false>
81 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
82 IOConfig, SocketType, ThreadType, use_indexed_groups>,
83 line_in_group, publish_layer, use_indexed_groups>,
85 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
86 IOConfig, SocketType, ThreadType, use_indexed_groups>,
87 line_out_group, subscribe_layer, use_indexed_groups>
95 : ThreadType<IOConfig>(
config, this->loop_max_frequency(), index),
97 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
98 SocketType, ThreadType, use_indexed_groups>,
99 line_in_group, publish_layer, use_indexed_groups>(index),
101 IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
102 SocketType, ThreadType, use_indexed_groups>,
103 line_out_group, subscribe_layer, use_indexed_groups>(index),
107 auto data_out_callback =
108 [
this](std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) {
109 if (!io_msg->has_index() || io_msg->index() == this->index())
115 this->
template subscribe_out<goby::middleware::protobuf::IOData>(data_out_callback);
117 if (!glog_group_added_)
120 glog_group_added_ =
true;
127 incoming_mail_notify_thread_.reset(
new std::thread([
this]() {
128 while (this->alive())
130 std::unique_lock<std::mutex>
lock(incoming_mail_notify_mutex_);
131 this->interthread().cv()->wait(
lock);
137 this->set_name(thread_name_);
144 std::lock_guard<std::mutex> l(incoming_mail_notify_mutex_);
145 this->interthread().cv()->notify_all();
147 incoming_mail_notify_thread_->join();
148 incoming_mail_notify_thread_.reset();
156 if (incoming_mail_notify_thread_)
157 incoming_mail_notify_thread_->detach();
159 auto status = std::make_shared<protobuf::IOStatus>();
163 this->
template unsubscribe_out<goby::middleware::protobuf::IOData>();
166 template <
class IOThreadImplementation>
168 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg);
171 void write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
175 << ((this->index() == -1) ? std::string() :
std::to_string(this->index()))
177 if (io_msg->
data().empty())
179 if (!socket_ || !socket_->is_open())
187 auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
188 *io_msg->mutable_data() = bytes;
194 std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
196 if (this->index() != -1)
201 << ((this->index() == -1) ? std::string() :
std::to_string(this->index()))
204 this->publish_in(io_msg);
232 virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
241 void loop()
override;
245 std::unique_ptr<SocketType> socket_;
253 std::unique_ptr<std::thread> incoming_mail_notify_thread_;
255 std::string glog_group_;
256 std::string thread_name_;
257 bool glog_group_added_{
false};
260 template <
class IOThreadImplementation>
262 std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
264 boost::asio::async_write(
265 this_thread->mutable_socket(), boost::asio::buffer(io_msg->
data()),
267 [this_thread, io_msg](
const boost::system::error_code& ec, std::size_t bytes_transferred) {
268 if (!ec && bytes_transferred > 0)
270 this_thread->handle_write_success(bytes_transferred);
274 this_thread->handle_write_error(ec);
288 template <
class>
class ThreadType,
bool use_indexed_groups>
290 subscribe_layer, IOConfig, SocketType, ThreadType,
291 use_indexed_groups>::try_open()
295 socket_.reset(
new SocketType(io_));
305 backoff_interval_ = min_backoff_interval_;
307 auto status = std::make_shared<protobuf::IOStatus>();
308 if (this->index() != -1)
309 status->set_index(this->index());
319 next_open_attempt_ = now + backoff_interval_;
321 catch (
const std::exception&
e)
323 auto status = std::make_shared<protobuf::IOStatus>();
324 if (this->index() != -1)
325 status->set_index(this->index());
330 error.set_text(
e.what() + std::string(
": config (") + this->cfg().ShortDebugString() +
")");
334 <<
"Failed to open/configure socket/serial_port: "
335 <<
error.ShortDebugString() << std::endl;
337 if (backoff_interval_ < max_backoff_interval_)
338 backoff_interval_ *= 2.0;
341 next_open_attempt_ = now + backoff_interval_;
344 << backoff_interval_ / std::chrono::seconds(1)
345 <<
" seconds" << std::endl;
354 template <
class>
class ThreadType,
bool use_indexed_groups>
356 subscribe_layer, IOConfig, SocketType, ThreadType,
357 use_indexed_groups>::loop()
359 if (socket_ && socket_->is_open())
369 if (now > next_open_attempt_)
386 template <
class>
class ThreadType,
bool use_indexed_groups>
388 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
389 use_indexed_groups>::handle_read_error(
const boost::system::error_code& ec)
391 auto status = std::make_shared<protobuf::IOStatus>();
392 if (this->index() != -1)
393 status->set_index(this->index());
398 error.set_text(ec.message());
402 <<
"Failed to read from the socket/serial_port: "
403 << error.ShortDebugString() << std::endl;
412 template <
class>
class ThreadType,
bool use_indexed_groups>
414 line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
415 use_indexed_groups>::handle_write_error(
const boost::system::error_code& ec)
417 auto status = std::make_shared<protobuf::IOStatus>();
418 if (this->index() != -1)
419 status->set_index(this->index());
424 error.set_text(ec.message());
428 <<
"Failed to write to the socket/serial_port: "
429 << error.ShortDebugString() << std::endl;