23 #ifndef ZEROMQNODE20110413H 24 #define ZEROMQNODE20110413H 26 #include <boost/bind.hpp> 27 #include <boost/function.hpp> 28 #include <boost/signals2.hpp> 29 #include <boost/thread/mutex.hpp> 33 #include "goby/common/protobuf/zero_mq_node_config.pb.h" 37 #include "core_constants.h" 38 #include "goby/common/logger.h" 48 : global_blackout_(boost::posix_time::not_a_date_time), local_blackout_set_(
false),
49 global_blackout_set_(
false)
54 : socket_(socket), global_blackout_(boost::posix_time::not_a_date_time),
55 local_blackout_set_(
false), global_blackout_set_(
false)
59 void set_global_blackout(boost::posix_time::time_duration duration);
60 void set_blackout(MarshallingScheme marshalling_scheme,
const std::string& identifier,
61 boost::posix_time::time_duration duration);
63 void clear_blackout(MarshallingScheme marshalling_scheme,
const std::string& identifier)
65 blackout_info_.erase(std::make_pair(marshalling_scheme, identifier));
66 local_blackout_set_ =
false;
68 void clear_global_blackout()
70 global_blackout_ = boost::posix_time::not_a_date_time;
71 global_blackout_set_ =
false;
76 bool check_blackout(MarshallingScheme marshalling_scheme,
const std::string& identifier);
78 void set_socket(boost::shared_ptr<zmq::socket_t> socket) { socket_ = socket; }
80 boost::shared_ptr<zmq::socket_t>& socket() {
return socket_; }
85 BlackoutInfo(boost::posix_time::time_duration interval = boost::posix_time::not_a_date_time)
86 : blackout_interval(interval), last_post_time(boost::posix_time::neg_infin)
90 boost::posix_time::time_duration blackout_interval;
91 boost::posix_time::ptime last_post_time;
94 boost::shared_ptr<zmq::socket_t> socket_;
96 boost::posix_time::time_duration global_blackout_;
97 bool local_blackout_set_;
98 bool global_blackout_set_;
99 std::map<std::pair<MarshallingScheme, std::string>, BlackoutInfo> blackout_info_;
121 void subscribe_all(
int socket_id);
122 void unsubscribe_all(
int socket_id);
124 void send(MarshallingScheme marshalling_scheme,
const std::string& identifier,
125 const std::string& body,
int socket_id);
127 void subscribe(MarshallingScheme marshalling_scheme,
const std::string& identifier,
130 void unsubscribe(MarshallingScheme marshalling_scheme,
const std::string& identifier,
134 void connect_inbox_slot(
void (C::*mem_func)(MarshallingScheme,
const std::string&,
135 const std::string&,
int),
138 goby::glog.is(goby::common::logger::DEBUG1) &&
139 goby::glog <<
"ZeroMQService: made connection for: " <<
typeid(obj).name() << std::endl;
140 connect_inbox_slot(boost::bind(mem_func, obj, _1, _2, _3, _4));
143 void connect_inbox_slot(
144 boost::function<
void(MarshallingScheme marshalling_scheme,
const std::string& identifier,
145 const std::string& body,
int socket_id)>
148 inbox_signal_.connect(slot);
151 bool poll(
long timeout = -1);
156 poll_callbacks_.clear();
162 void register_poll_item(
const zmq::pollitem_t& item,
void (C::*mem_func)(
const void*,
int,
int),
165 register_poll_item(item, boost::bind(mem_func, obj, _1, _2, _3));
169 register_poll_item(
const zmq::pollitem_t& item,
170 boost::function<
void(
const void* data,
int size,
int message_part)> callback)
172 poll_items_.push_back(item);
173 poll_callbacks_.insert(std::make_pair(poll_items_.size() - 1, callback));
176 boost::shared_ptr<zmq::context_t> zmq_context() {
return context_; }
178 boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
179 const std::string& identifier,
int socket_id)>
182 boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
183 const std::string& identifier,
int socket_id)>
186 boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
187 const std::string& identifier,
int socket_id)>
190 boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
191 const std::string& identifier,
int socket_id)>
192 post_subscribe_hooks;
194 static std::string glog_out_group() {
return "goby::common::zmq::out"; }
195 static std::string glog_in_group() {
return "goby::common::zmq::in"; }
207 void handle_receive(
const void* data,
int size,
int message_part,
int socket_id);
209 int socket_type(protobuf::ZeroMQServiceConfig::Socket::SocketType type);
212 boost::shared_ptr<zmq::context_t> context_;
213 std::map<int, ZeroMQSocket> sockets_;
214 std::vector<zmq::pollitem_t> poll_items_;
219 std::map<size_t, boost::function<void(const void* data, int size, int message_part)> >
222 boost::signals2::signal<void(MarshallingScheme marshalling_scheme,
223 const std::string& identifier,
const std::string& body,
226 boost::mutex poll_mutex_;
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.