Goby3  3.1.5a
2024.05.23
io_interface.h
Go to the documentation of this file.
1 // Copyright 2019-2022:
2 // GobySoft, LLC (2013-)
3 // Community contributors (see AUTHORS file)
4 // File authors:
5 // Toby Schneider <toby@gobysoft.org>
6 //
7 //
8 // This file is part of the Goby Underwater Autonomy Project Libraries
9 // ("The Goby Libraries").
10 //
11 // The Goby Libraries are free software: you can redistribute them and/or modify
12 // them under the terms of the GNU Lesser General Public License as published by
13 // the Free Software Foundation, either version 2.1 of the License, or
14 // (at your option) any later version.
15 //
16 // The Goby Libraries are distributed in the hope that they will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public License
22 // along with Goby. If not, see <http://www.gnu.org/licenses/>.
23 
24 #ifndef GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
25 #define GOBY_MIDDLEWARE_IO_DETAIL_IO_INTERFACE_H
26 
27 #include <chrono> // for seconds
28 #include <exception> // for exception
29 #include <memory> // for shared_ptr
30 #include <mutex> // for mutex, lock_...
31 #include <ostream> // for endl, size_t
32 #include <string> // for string, oper...
33 #include <thread> // for thread
34 #include <unistd.h> // for usleep
35 
36 #include <boost/asio/write.hpp> // for async_write
37 #include <boost/system/error_code.hpp> // for error_code
38 
39 #include "goby/exception.h" // for Exception
40 #include "goby/middleware/application/multi_thread.h" // for SimpleThread
41 #include "goby/middleware/common.h" // for thread_id
42 #include "goby/middleware/io/groups.h" // for status
43 #include "goby/middleware/protobuf/io.pb.h" // for IOError, IOS...
44 #include "goby/time/steady_clock.h" // for SteadyClock
45 #include "goby/util/asio_compat.h"
46 #include "goby/util/debug_logger.h" // for glog
47 
48 #include "io_transporters.h"
49 
50 namespace goby
51 {
52 namespace middleware
53 {
54 class Group;
55 class InterThreadTransporter;
56 template <typename InnerTransporter> class InterProcessForwarder;
57 namespace io
58 {
59 enum class ThreadState
60 {
62 };
63 
64 namespace detail
65 {
66 template <typename ProtobufEndpoint, typename ASIOEndpoint>
67 ProtobufEndpoint endpoint_convert(const ASIOEndpoint& asio_ep)
68 {
69  ProtobufEndpoint pb_ep;
70  pb_ep.set_addr(asio_ep.address().to_string());
71  pb_ep.set_port(asio_ep.port());
72  return pb_ep;
73 }
74 
75 template <const goby::middleware::Group& line_in_group,
76  const goby::middleware::Group& line_out_group, PubSubLayer publish_layer,
77  PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
78  template <class> class ThreadType, bool use_indexed_groups = false>
79 class IOThread : public ThreadType<IOConfig>,
80  public IOPublishTransporter<
81  IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
82  IOConfig, SocketType, ThreadType, use_indexed_groups>,
83  line_in_group, publish_layer, use_indexed_groups>,
85  IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer,
86  IOConfig, SocketType, ThreadType, use_indexed_groups>,
87  line_out_group, subscribe_layer, use_indexed_groups>
88 {
89  public:
94  IOThread(const IOConfig& config, int index, std::string glog_group = "i/o")
95  : ThreadType<IOConfig>(config, this->loop_max_frequency(), index),
97  IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
98  SocketType, ThreadType, use_indexed_groups>,
99  line_in_group, publish_layer, use_indexed_groups>(index),
101  IOThread<line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig,
102  SocketType, ThreadType, use_indexed_groups>,
103  line_out_group, subscribe_layer, use_indexed_groups>(index),
104  glog_group_(glog_group + " / t" + std::to_string(goby::middleware::gettid())),
105  thread_name_(glog_group)
106  {
107  auto data_out_callback =
108  [this](std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) {
109  if (!io_msg->has_index() || io_msg->index() == this->index())
110  {
111  write(io_msg);
112  }
113  };
114 
115  this->template subscribe_out<goby::middleware::protobuf::IOData>(data_out_callback);
116 
117  if (!glog_group_added_)
118  {
120  glog_group_added_ = true;
121  }
122  }
123 
124  void initialize() override
125  {
126  // thread to handle synchonization between boost::asio and goby condition_variable signaling
127  incoming_mail_notify_thread_.reset(new std::thread([this]() {
128  while (this->alive())
129  {
130  std::unique_lock<std::mutex> lock(incoming_mail_notify_mutex_);
131  this->interthread().cv()->wait(lock);
132  // post empty handler to cause loop() to return and allow incoming mail to be handled
133  io_.post([]() {});
134  }
135  }));
136 
137  this->set_name(thread_name_);
138  }
139 
140  void finalize() override
141  {
142  // join incoming mail thread
143  {
144  std::lock_guard<std::mutex> l(incoming_mail_notify_mutex_);
145  this->interthread().cv()->notify_all();
146  }
147  incoming_mail_notify_thread_->join();
148  incoming_mail_notify_thread_.reset();
149  }
150 
151  virtual ~IOThread()
152  {
153  socket_.reset();
154 
155  // for non clean shutdown, avoid abort
156  if (incoming_mail_notify_thread_)
157  incoming_mail_notify_thread_->detach();
158 
159  auto status = std::make_shared<protobuf::IOStatus>();
160  status->set_state(protobuf::IO__LINK_CLOSED);
161 
162  this->publish_in(status);
163  this->template unsubscribe_out<goby::middleware::protobuf::IOData>();
164  }
165 
166  template <class IOThreadImplementation>
167  friend void basic_async_write(IOThreadImplementation* this_thread,
168  std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg);
169 
170  protected:
171  void write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
172  {
173  goby::glog.is_debug2() &&
174  goby::glog << group(glog_group_) << "(" << io_msg->data().size() << "B) <"
175  << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
176  << " " << io_msg->ShortDebugString() << std::endl;
177  if (io_msg->data().empty())
178  return;
179  if (!socket_ || !socket_->is_open())
180  return;
181 
182  this->async_write(io_msg);
183  }
184 
185  void handle_read_success(std::size_t bytes_transferred, const std::string& bytes)
186  {
187  auto io_msg = std::make_shared<goby::middleware::protobuf::IOData>();
188  *io_msg->mutable_data() = bytes;
189 
190  handle_read_success(bytes_transferred, io_msg);
191  }
192 
193  void handle_read_success(std::size_t bytes_transferred,
194  std::shared_ptr<goby::middleware::protobuf::IOData> io_msg)
195  {
196  if (this->index() != -1)
197  io_msg->set_index(this->index());
198 
199  goby::glog.is_debug2() &&
200  goby::glog << group(glog_group_) << "(" << bytes_transferred << "B) >"
201  << ((this->index() == -1) ? std::string() : std::to_string(this->index()))
202  << " " << io_msg->ShortDebugString() << std::endl;
203 
204  this->publish_in(io_msg);
205  }
206 
207  void handle_write_success(std::size_t bytes_transferred) {}
208  void handle_read_error(const boost::system::error_code& ec);
209  void handle_write_error(const boost::system::error_code& ec);
210 
212  SocketType& mutable_socket()
213  {
214  if (socket_)
215  return *socket_;
216  else
217  throw goby::Exception("Attempted to access null socket/serial_port");
218  }
219 
221 
223  bool socket_is_open() { return socket_ && socket_->is_open(); }
224 
226  virtual void open_socket() = 0;
227 
229  virtual void async_read() = 0;
230 
232  virtual void async_write(std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg) = 0;
233 
234  const std::string& glog_group() { return glog_group_; }
235 
236  private:
238  void try_open();
239 
241  void loop() override;
242 
243  private:
245  std::unique_ptr<SocketType> socket_;
246 
247  const goby::time::SteadyClock::duration min_backoff_interval_{std::chrono::seconds(1)};
248  const goby::time::SteadyClock::duration max_backoff_interval_{std::chrono::seconds(128)};
249  goby::time::SteadyClock::duration backoff_interval_{min_backoff_interval_};
251 
252  std::mutex incoming_mail_notify_mutex_;
253  std::unique_ptr<std::thread> incoming_mail_notify_thread_;
254 
255  std::string glog_group_;
256  std::string thread_name_;
257  bool glog_group_added_{false};
258 };
259 
260 template <class IOThreadImplementation>
261 void basic_async_write(IOThreadImplementation* this_thread,
262  std::shared_ptr<const goby::middleware::protobuf::IOData> io_msg)
263 {
264  boost::asio::async_write(
265  this_thread->mutable_socket(), boost::asio::buffer(io_msg->data()),
266  // capture io_msg in callback to ensure write buffer exists until async_write is done
267  [this_thread, io_msg](const boost::system::error_code& ec, std::size_t bytes_transferred) {
268  if (!ec && bytes_transferred > 0)
269  {
270  this_thread->handle_write_success(bytes_transferred);
271  }
272  else
273  {
274  this_thread->handle_write_error(ec);
275  }
276  });
277 }
278 
279 } // namespace detail
280 } // namespace io
281 } // namespace middleware
282 } // namespace goby
283 
284 template <const goby::middleware::Group& line_in_group,
285  const goby::middleware::Group& line_out_group,
286  goby::middleware::io::PubSubLayer publish_layer,
287  goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
288  template <class> class ThreadType, bool use_indexed_groups>
289 void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
290  subscribe_layer, IOConfig, SocketType, ThreadType,
291  use_indexed_groups>::try_open()
292 {
293  try
294  {
295  socket_.reset(new SocketType(io_));
296  open_socket();
297 
298  // messages read from the socket
299  this->async_read();
300 
301  // reset io_context, which ran out of work
302  io_.reset();
303 
304  // successful, reset backoff
305  backoff_interval_ = min_backoff_interval_;
306 
307  auto status = std::make_shared<protobuf::IOStatus>();
308  if (this->index() != -1)
309  status->set_index(this->index());
310 
311  status->set_state(protobuf::IO__LINK_OPEN);
312  this->publish_in(status);
313 
314  goby::glog.is_debug2() && goby::glog << group(glog_group_) << "Successfully opened socket"
315  << std::endl;
316 
317  // update to avoid thrashing on open success but read/write failure
318  decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
319  next_open_attempt_ = now + backoff_interval_;
320  }
321  catch (const std::exception& e)
322  {
323  auto status = std::make_shared<protobuf::IOStatus>();
324  if (this->index() != -1)
325  status->set_index(this->index());
326 
328  goby::middleware::protobuf::IOError& error = *status->mutable_error();
330  error.set_text(e.what() + std::string(": config (") + this->cfg().ShortDebugString() + ")");
331  this->publish_in(status);
332 
333  goby::glog.is_warn() && goby::glog << group(glog_group_)
334  << "Failed to open/configure socket/serial_port: "
335  << error.ShortDebugString() << std::endl;
336 
337  if (backoff_interval_ < max_backoff_interval_)
338  backoff_interval_ *= 2.0;
339 
340  decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
341  next_open_attempt_ = now + backoff_interval_;
342 
343  goby::glog.is_warn() && goby::glog << group(glog_group_) << "Will retry in "
344  << backoff_interval_ / std::chrono::seconds(1)
345  << " seconds" << std::endl;
346  socket_.reset();
347  }
348 }
349 
350 template <const goby::middleware::Group& line_in_group,
351  const goby::middleware::Group& line_out_group,
352  goby::middleware::io::PubSubLayer publish_layer,
353  goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
354  template <class> class ThreadType, bool use_indexed_groups>
355 void goby::middleware::io::detail::IOThread<line_in_group, line_out_group, publish_layer,
356  subscribe_layer, IOConfig, SocketType, ThreadType,
357  use_indexed_groups>::loop()
358 {
359  if (socket_ && socket_->is_open())
360  {
361  // run the io service (blocks until either we read something
362  // from the socket or a subscription is available
363  // as signaled from an empty handler in the incoming_mail_notify_thread)
364  io_.run_one();
365  }
366  else
367  {
368  decltype(next_open_attempt_) now(goby::time::SteadyClock::now());
369  if (now > next_open_attempt_)
370  {
371  try_open();
372  }
373  else
374  {
375  // poll in case we need to quit
376  io_.poll();
377  usleep(100000); // avoid pegging CPU while waiting to attempt reopening socket
378  }
379  }
380 }
381 
382 template <const goby::middleware::Group& line_in_group,
383  const goby::middleware::Group& line_out_group,
384  goby::middleware::io::PubSubLayer publish_layer,
385  goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
386  template <class> class ThreadType, bool use_indexed_groups>
388  line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
389  use_indexed_groups>::handle_read_error(const boost::system::error_code& ec)
390 {
391  auto status = std::make_shared<protobuf::IOStatus>();
392  if (this->index() != -1)
393  status->set_index(this->index());
394 
396  goby::middleware::protobuf::IOError& error = *status->mutable_error();
398  error.set_text(ec.message());
399  this->publish_in(status);
400 
401  goby::glog.is_warn() && goby::glog << group(glog_group_)
402  << "Failed to read from the socket/serial_port: "
403  << error.ShortDebugString() << std::endl;
404 
405  socket_.reset();
406 }
407 
408 template <const goby::middleware::Group& line_in_group,
409  const goby::middleware::Group& line_out_group,
410  goby::middleware::io::PubSubLayer publish_layer,
411  goby::middleware::io::PubSubLayer subscribe_layer, typename IOConfig, typename SocketType,
412  template <class> class ThreadType, bool use_indexed_groups>
414  line_in_group, line_out_group, publish_layer, subscribe_layer, IOConfig, SocketType, ThreadType,
415  use_indexed_groups>::handle_write_error(const boost::system::error_code& ec)
416 {
417  auto status = std::make_shared<protobuf::IOStatus>();
418  if (this->index() != -1)
419  status->set_index(this->index());
420 
422  goby::middleware::protobuf::IOError& error = *status->mutable_error();
424  error.set_text(ec.message());
425  this->publish_in(status);
426 
427  goby::glog.is_warn() && goby::glog << group(glog_group_)
428  << "Failed to write to the socket/serial_port: "
429  << error.ShortDebugString() << std::endl;
430  socket_.reset();
431 }
432 
433 #endif
goby::middleware::io::detail::IOThread::handle_read_success
void handle_read_success(std::size_t bytes_transferred, std::shared_ptr< goby::middleware::protobuf::IOData > io_msg)
Definition: io_interface.h:193
goby::time::SteadyClock::duration
std::chrono::microseconds duration
Duration type.
Definition: steady_clock.h:42
io_transporters.h
goby::middleware::io::detail::IOThread::handle_write_error
void handle_write_error(const boost::system::error_code &ec)
Definition: io_interface.h:415
goby::middleware::io::detail::IOThread::async_read
virtual void async_read()=0
Starts an asynchronous read on the socket.
io.pb.h
goby::middleware::io::detail::IOThread::mutable_socket
SocketType & mutable_socket()
Access the (mutable) socket (or serial_port) object.
Definition: io_interface.h:212
goby
The global namespace for the Goby project.
Definition: acomms_constants.h:33
goby::util::FlexOstream::is_warn
bool is_warn()
Definition: flex_ostream.h:82
goby::middleware::io::detail::IOThread::mutable_io
boost::asio::io_context & mutable_io()
Definition: io_interface.h:220
goby::acomms::abc::protobuf::config
extern ::google::protobuf::internal::ExtensionIdentifier< ::goby::acomms::protobuf::DriverConfig, ::google::protobuf::internal::MessageTypeTraits< ::goby::acomms::abc::protobuf::Config >, 11, false > config
Definition: abc_driver.pb.h:203
goby::util::logger::mutex
std::recursive_mutex mutex
goby::util::e
constexpr T e
Definition: constants.h:35
detail
detail namespace with internal helper functions
Definition: json.hpp:246
goby::time::SteadyClock::now
static time_point now() noexcept
Returns the current steady time unless SimulatorSettings::using_sim_time == true in which case a simu...
Definition: steady_clock.h:49
group
goby::util::logger::GroupSetter group(std::string n)
Definition: logger_manipulators.h:134
goby::middleware::protobuf::IOData::set_index
void set_index(::google::protobuf::int32 value)
Definition: io.pb.h:1907
goby::middleware::io::ThreadState
ThreadState
Definition: io_interface.h:59
goby::middleware::io::detail::IOThread::initialize
void initialize() override
Definition: io_interface.h:124
goby::util::FlexOstream::is_debug2
bool is_debug2()
Definition: flex_ostream.h:85
goby::middleware::InterProcessForwarder
Implements the forwarder concept for the interprocess layer.
Definition: io_interface.h:56
goby::util::FlexOstream::add_group
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages.
goby::util::logger_lock::lock
@ lock
Definition: flex_ostreambuf.h:62
goby::middleware::acomms::groups::status
constexpr Group status
Definition: groups.h:43
io_service
goby::middleware::io::detail::IOSubscribeTransporter
Definition: io_transporters.h:121
goby::time::SteadyClock::time_point
std::chrono::time_point< SteadyClock > time_point
Definition: steady_clock.h:45
goby::middleware::io::detail::IOThread::~IOThread
virtual ~IOThread()
Definition: io_interface.h:151
goby::middleware::protobuf::IOError
Definition: io.pb.h:1367
goby::util::Colors::red
@ red
Definition: term_color.h:114
multi_thread.h
goby::middleware::protobuf::IO__LINK_CLOSED
@ IO__LINK_CLOSED
Definition: io.pb.h:190
goby::middleware::io::PubSubLayer
PubSubLayer
Definition: io_transporters.h:38
goby::middleware::protobuf::IOData::data
const ::std::string & data() const
Definition: io.pb.h:2103
steady_clock.h
to_string
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.hpp:24301
goby::middleware::io::detail::IOThread::handle_read_success
void handle_read_success(std::size_t bytes_transferred, const std::string &bytes)
Definition: io_interface.h:185
goby::middleware::io::detail::IOPublishTransporter
Definition: io_transporters.h:75
goby::middleware::io::detail::IOThread::socket_is_open
bool socket_is_open()
Does the socket exist and is it open?
Definition: io_interface.h:223
goby::middleware::io::detail::IOThread::glog_group
const std::string & glog_group()
Definition: io_interface.h:234
goby::middleware::io::detail::IOThread::basic_async_write
friend void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
Definition: io_interface.h:261
asio_compat.h
groups.h
goby::middleware::io::detail::endpoint_convert
ProtobufEndpoint endpoint_convert(const ASIOEndpoint &asio_ep)
Definition: io_interface.h:67
goby::middleware::Group
Class for grouping publications in the Goby middleware. Analogous to "topics" in ROS,...
Definition: group.h:59
goby::middleware::io::detail::IOThread::IOThread
IOThread(const IOConfig &config, int index, std::string glog_group="i/o")
Constructs the thread.
Definition: io_interface.h:94
goby::middleware::protobuf::IOError::IO__INIT_FAILURE
static const ErrorCode IO__INIT_FAILURE
Definition: io.pb.h:1460
debug_logger.h
common.h
goby::middleware::protobuf::IO__CRITICAL_FAILURE
@ IO__CRITICAL_FAILURE
Definition: io.pb.h:191
goby::middleware::protobuf::IOError::IO__READ_FAILURE
static const ErrorCode IO__READ_FAILURE
Definition: io.pb.h:1462
goby::middleware::io::detail::basic_async_write
void basic_async_write(IOThreadImplementation *this_thread, std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
Definition: io_interface.h:261
goby::middleware::io::detail::IOThread::write
void write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)
Definition: io_interface.h:171
goby::middleware::protobuf::IOError::IO__WRITE_FAILURE
static const ErrorCode IO__WRITE_FAILURE
Definition: io.pb.h:1464
goby::middleware::io::detail::IOThread::finalize
void finalize() override
Definition: io_interface.h:140
goby::middleware::io::detail::IOThread
Definition: io_interface.h:79
goby::middleware::protobuf::IO__LINK_OPEN
@ IO__LINK_OPEN
Definition: io.pb.h:189
goby::Exception
simple exception class for goby applications
Definition: exception.h:34
exception.h
goby::middleware::io::detail::IOThread::handle_write_success
void handle_write_success(std::size_t bytes_transferred)
Definition: io_interface.h:207
goby::middleware::io::ThreadState::SUBSCRIPTIONS_COMPLETE
@ SUBSCRIPTIONS_COMPLETE
goby::glog
util::FlexOstream glog
Access the Goby logger through this object.
goby::middleware::to_string
std::string to_string(goby::middleware::protobuf::Layer layer)
Definition: common.h:44
detail::cbor_tag_handler_t::error
@ error
throw a parse_error exception in case of a tag
goby::middleware::io::detail::IOThread::open_socket
virtual void open_socket()=0
Opens the newly created socket/serial_port.
google::protobuf::Message::ShortDebugString
string ShortDebugString() const
goby::middleware::io::detail::IOThread::handle_read_error
void handle_read_error(const boost::system::error_code &ec)
Definition: io_interface.h:389
goby::middleware::io::detail::IOThread::async_write
virtual void async_write(std::shared_ptr< const goby::middleware::protobuf::IOData > io_msg)=0
Starts an asynchronous write from data published.