23 #include "goby/common/logger.h" 24 #include "goby/util/as.h" 25 #include "goby/util/binary.h" 27 #include "goby/common/exception.h" 28 #include "zeromq_packet.h" 29 #include "zeromq_service.h" 33 using goby::util::hex_encode;
37 #if ZMQ_VERSION_MAJOR == 2 38 #define zmq_msg_send(msg, sock, opt) zmq_send(sock, msg, opt) 39 #define zmq_msg_recv(msg, sock, opt) zmq_recv(sock, msg, opt) 40 #define ZMQ_POLL_DIVISOR 1 // zmq_poll is usec 41 #define more_t int64_t 44 #define ZMQ_POLL_DIVISOR 1000 // zmq_poll is msec 47 goby::common::ZeroMQService::ZeroMQService(boost::shared_ptr<zmq::context_t> context)
53 goby::common::ZeroMQService::ZeroMQService() : context_(new zmq::context_t(2)) { init(); }
55 void goby::common::ZeroMQService::init()
61 void goby::common::ZeroMQService::process_cfg(
const protobuf::ZeroMQServiceConfig& cfg)
63 for (
int i = 0, n = cfg.socket_size(); i < n; ++i)
65 if (!sockets_.count(cfg.socket(i).socket_id()))
68 boost::shared_ptr<zmq::socket_t> new_socket(
69 new zmq::socket_t(*context_, socket_type(cfg.socket(i).socket_type())));
71 sockets_.insert(std::make_pair(cfg.socket(i).socket_id(), ZeroMQSocket(new_socket)));
74 zmq::pollitem_t item = {(
void*)*new_socket, 0, ZMQ_POLLIN, 0};
77 if (cfg.socket(i).socket_type() != protobuf::ZeroMQServiceConfig::Socket::PUBLISH)
79 register_poll_item(item, boost::bind(&goby::common::ZeroMQService::handle_receive,
80 this, _1, _2, _3, cfg.socket(i).socket_id()));
84 boost::shared_ptr<zmq::socket_t> this_socket =
85 socket_from_id(cfg.socket(i).socket_id()).socket();
87 if (cfg.socket(i).connect_or_bind() == protobuf::ZeroMQServiceConfig::Socket::CONNECT)
90 switch (cfg.socket(i).transport())
92 case protobuf::ZeroMQServiceConfig::Socket::INPROC:
93 endpoint =
"inproc://" + cfg.socket(i).socket_name();
96 case protobuf::ZeroMQServiceConfig::Socket::IPC:
97 endpoint =
"ipc://" + cfg.socket(i).socket_name();
100 case protobuf::ZeroMQServiceConfig::Socket::TCP:
101 endpoint =
"tcp://" + cfg.socket(i).ethernet_address() +
":" +
102 as<std::string>(cfg.socket(i).ethernet_port());
105 case protobuf::ZeroMQServiceConfig::Socket::PGM:
106 endpoint =
"pgm://" + cfg.socket(i).ethernet_address() +
";" +
107 cfg.socket(i).multicast_address() +
":" +
108 as<std::string>(cfg.socket(i).ethernet_port());
111 case protobuf::ZeroMQServiceConfig::Socket::EPGM:
112 endpoint =
"epgm://" + cfg.socket(i).ethernet_address() +
";" +
113 cfg.socket(i).multicast_address() +
":" +
114 as<std::string>(cfg.socket(i).ethernet_port());
120 this_socket->connect(endpoint.c_str());
121 glog.is(DEBUG1) &&
glog << group(glog_out_group())
122 << cfg.socket(i).ShortDebugString()
123 <<
" connected to endpoint - " << endpoint << std::endl;
125 catch (std::exception& e)
127 std::stringstream ess;
128 ess <<
"cannot connect to: " << endpoint <<
": " << e.what();
132 else if (cfg.socket(i).connect_or_bind() == protobuf::ZeroMQServiceConfig::Socket::BIND)
134 std::string endpoint;
135 switch (cfg.socket(i).transport())
137 case protobuf::ZeroMQServiceConfig::Socket::INPROC:
138 endpoint =
"inproc://" + cfg.socket(i).socket_name();
141 case protobuf::ZeroMQServiceConfig::Socket::IPC:
142 endpoint =
"ipc://" + cfg.socket(i).socket_name();
145 case protobuf::ZeroMQServiceConfig::Socket::TCP:
146 endpoint =
"tcp://*:" + as<std::string>(cfg.socket(i).ethernet_port());
149 case protobuf::ZeroMQServiceConfig::Socket::PGM:
153 case protobuf::ZeroMQServiceConfig::Socket::EPGM:
160 this_socket->bind(endpoint.c_str());
162 glog << group(glog_out_group()) <<
"bound to endpoint - " << endpoint
163 <<
", Socket: " << cfg.socket(i).ShortDebugString() << std::endl;
165 catch (std::exception& e)
167 std::stringstream ess;
168 ess <<
"cannot bind to: " << endpoint <<
": " << e.what();
175 goby::common::ZeroMQService::~ZeroMQService()
181 int goby::common::ZeroMQService::socket_type(protobuf::ZeroMQServiceConfig::Socket::SocketType type)
185 case protobuf::ZeroMQServiceConfig::Socket::PUBLISH:
return ZMQ_PUB;
186 case protobuf::ZeroMQServiceConfig::Socket::SUBSCRIBE:
return ZMQ_SUB;
187 case protobuf::ZeroMQServiceConfig::Socket::REPLY:
return ZMQ_REP;
188 case protobuf::ZeroMQServiceConfig::Socket::REQUEST:
200 std::map<int, ZeroMQSocket>::iterator it = sockets_.find(socket_id);
201 if (it != sockets_.end())
204 throw(
goby::Exception(
"Attempted to access socket_id " + as<std::string>(socket_id) +
205 " which does not exist"));
208 void goby::common::ZeroMQService::subscribe_all(
int socket_id)
210 socket_from_id(socket_id).socket()->setsockopt(ZMQ_SUBSCRIBE, 0, 0);
213 void goby::common::ZeroMQService::unsubscribe_all(
int socket_id)
215 socket_from_id(socket_id).socket()->setsockopt(ZMQ_UNSUBSCRIBE, 0, 0);
218 void goby::common::ZeroMQService::subscribe(MarshallingScheme marshalling_scheme,
219 const std::string& identifier,
int socket_id)
221 pre_subscribe_hooks(marshalling_scheme, identifier, socket_id);
223 std::string zmq_filter = zeromq_packet_make_header(marshalling_scheme, identifier);
224 int NULL_TERMINATOR_SIZE = 1;
225 zmq_filter.resize(zmq_filter.size() - NULL_TERMINATOR_SIZE);
226 socket_from_id(socket_id).socket()->setsockopt(ZMQ_SUBSCRIBE, zmq_filter.c_str(),
229 glog.is(DEBUG1) &&
glog << group(glog_in_group()) <<
"subscribed for marshalling " 230 << marshalling_scheme <<
" with identifier: [" << identifier
231 <<
"] using zmq_filter: " << goby::util::hex_encode(zmq_filter)
234 post_subscribe_hooks(marshalling_scheme, identifier, socket_id);
237 void goby::common::ZeroMQService::unsubscribe(MarshallingScheme marshalling_scheme,
238 const std::string& identifier,
int socket_id)
240 std::string zmq_filter = zeromq_packet_make_header(marshalling_scheme, identifier);
241 int NULL_TERMINATOR_SIZE = 1;
242 zmq_filter.resize(zmq_filter.size() - NULL_TERMINATOR_SIZE);
243 socket_from_id(socket_id).socket()->setsockopt(ZMQ_UNSUBSCRIBE, zmq_filter.c_str(),
246 glog.is(DEBUG1) &&
glog << group(glog_in_group()) <<
"unsubscribed for marshalling " 247 << marshalling_scheme <<
" with identifier: [" << identifier
248 <<
"] using zmq_filter: " << goby::util::hex_encode(zmq_filter)
252 void goby::common::ZeroMQService::send(MarshallingScheme marshalling_scheme,
253 const std::string& identifier,
const std::string& body,
256 pre_send_hooks(marshalling_scheme, identifier, socket_id);
261 zmq::message_t msg(raw.size());
262 memcpy(msg.data(), raw.c_str(), raw.size());
265 glog << group(glog_out_group()) <<
"Sent message (hex): " 266 << hex_encode(std::string(static_cast<const char*>(msg.data()), msg.size()))
268 socket_from_id(socket_id).socket()->send(msg);
270 post_send_hooks(marshalling_scheme, identifier, socket_id);
273 void goby::common::ZeroMQService::handle_receive(
const void* data,
int size,
int message_part,
276 std::string bytes(static_cast<const char*>(data), size);
278 glog.is(DEBUG3) &&
glog << group(glog_in_group())
279 <<
"Received message (hex): " << goby::util::hex_encode(bytes)
282 MarshallingScheme marshalling_scheme = MARSHALLING_UNKNOWN;
283 std::string identifier;
286 switch (message_part)
292 glog.is(DEBUG3) &&
glog << group(glog_in_group()) <<
"Received message of type: [" 293 << identifier <<
"]" << std::endl;
295 glog.is(DEBUG3) &&
glog << group(glog_in_group()) <<
"Body [" 296 << goby::util::hex_encode(body) <<
"]" << std::endl;
298 if (socket_from_id(socket_id).check_blackout(marshalling_scheme, identifier))
300 inbox_signal_(marshalling_scheme, identifier, body, socket_id);
306 throw(std::runtime_error(
307 "Got more parts to the message than expecting (expecting only 1)"));
312 bool goby::common::ZeroMQService::poll(
long timeout )
314 boost::mutex::scoped_lock slock(poll_mutex_);
317 bool had_events =
false;
318 zmq::poll(&poll_items_[0], poll_items_.size(), timeout / ZMQ_POLL_DIVISOR);
319 for (
int i = 0, n = poll_items_.size(); i < n; ++i)
321 if (poll_items_[i].revents & ZMQ_POLLIN)
323 int message_part = 0;
325 size_t more_size =
sizeof(more_t);
330 int rc = zmq_msg_init(&part);
334 glog.is(DEBUG1) &&
glog << warn <<
"zmq_msg_init failed" << std::endl;
339 rc = zmq_msg_recv(&part, poll_items_[i].socket, 0);
340 glog.is(DEBUG3) &&
glog << group(glog_in_group()) <<
"Had event for poll item " << i
342 poll_callbacks_[i](zmq_msg_data(&part), zmq_msg_size(&part), message_part);
346 glog.is(DEBUG1) &&
glog << warn <<
"zmq_recv failed" << std::endl;
351 rc = zmq_getsockopt(poll_items_[i].socket, ZMQ_RCVMORE, &more, &more_size);
355 glog.is(DEBUG1) &&
glog << warn <<
"zmq_getsocketopt failed" << std::endl;
359 zmq_msg_close(&part);
368 void goby::common::ZeroMQSocket::set_global_blackout(boost::posix_time::time_duration duration)
370 glog.is(DEBUG2) &&
glog << group(ZeroMQService::glog_in_group()) <<
"Global blackout set to " 371 << duration << std::endl;
372 global_blackout_ = duration;
373 global_blackout_set_ =
true;
376 void goby::common::ZeroMQSocket::set_blackout(MarshallingScheme marshalling_scheme,
377 const std::string& identifier,
378 boost::posix_time::time_duration duration)
380 glog.is(DEBUG2) &&
glog << group(ZeroMQService::glog_in_group())
381 <<
"Blackout for marshalling scheme: " << marshalling_scheme
382 <<
" and identifier " << identifier <<
" set to " << duration
384 blackout_info_[std::make_pair(marshalling_scheme, identifier)] = BlackoutInfo(duration);
385 local_blackout_set_ =
true;
388 bool goby::common::ZeroMQSocket::check_blackout(MarshallingScheme marshalling_scheme,
389 const std::string& identifier)
391 if (!local_blackout_set_ && !global_blackout_set_)
399 BlackoutInfo& blackout_info =
400 blackout_info_[std::make_pair(marshalling_scheme, identifier)];
402 const boost::posix_time::ptime& last_post_time = blackout_info.last_post_time;
404 if ((local_blackout_set_ &&
405 this_time - last_post_time > blackout_info.blackout_interval))
407 blackout_info.last_post_time = this_time;
410 else if ((global_blackout_set_ &&
411 this_time - last_post_time > global_blackout_))
413 blackout_info.last_post_time = this_time;
418 glog.is(DEBUG3) &&
glog << group(ZeroMQService::glog_in_group())
419 <<
"Message (marshalling scheme: " << marshalling_scheme
420 <<
", identifier: " << identifier <<
")" 421 <<
" is in blackout: this time:" << this_time
422 <<
", last time: " << last_post_time
423 <<
", global blackout: " << global_blackout_
424 <<
", local blackout: " << blackout_info.blackout_interval
425 <<
", difference last and this: " << this_time - last_post_time
void zeromq_packet_encode(std::string *raw, MarshallingScheme marshalling_scheme, const std::string &identifier, const std::string &body)
Encodes a packet for Goby over ZeroMQ.
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages...
common::FlexOstream glog
Access the Goby logger through this object.
simple exception class for goby applications
void zeromq_packet_decode(const std::string &raw, MarshallingScheme *marshalling_scheme, std::string *identifier, std::string *body)
Decodes a packet for Goby over ZeroMQ.