22 #include "goby/common/logger.h" 23 #include "goby/common/logger/term_color.h" 24 #include "goby/common/pubsub_node_wrapper.h" 25 #include "goby/common/zeromq_application_base.h" 26 #include "goby/moos/moos_node.h" 28 #include "goby/pb/protobuf_node.h" 29 #include "goby/pb/protobuf_pubsub_node_wrapper.h" 31 #include "moos_gateway_config.pb.h" 35 bool MOOSGateway_OnConnect(
void* pParam);
36 bool MOOSGateway_OnDisconnect(
void* pParam);
50 friend bool ::MOOSGateway_OnConnect(
void* pParam);
51 friend bool ::MOOSGateway_OnDisconnect(
void* pParam);
57 void check_for_new_moos_variables();
58 bool clears_subscribe_filters(
const std::string& moos_variable);
60 void pb_inbox(boost::shared_ptr<google::protobuf::Message> msg,
const std::string& group);
67 CMOOSCommClient moos_client_;
72 MAX_CONNECTION_TIMEOUT = 10
75 std::set<std::string> subscribed_vars_;
78 std::multimap<std::string, std::string> moos2pb_;
81 std::multimap<std::string, std::string> pb2moos_;
83 bool moos_resubscribe_required_;
90 int main(
int argc,
char* argv[])
93 goby::run<goby::moos::MOOSGateway>(argc, argv, &cfg);
99 : ZeroMQApplicationBase(&zeromq_service_, cfg),
101 goby_moos_pubsub_client_(
this, cfg_.pb_convert()
103 : cfg_.base().pubsub_config()),
104 goby_pb_pubsub_client_(
this, cfg_.pb_convert()
105 ? cfg_.base().pubsub_config()
107 moos_resubscribe_required_(
false)
109 goby::util::DynamicProtobufManager::enable_compilation();
112 for (
int i = 0, n = cfg_.load_shared_library_size(); i < n; ++i)
114 glog.is(VERBOSE) &&
glog <<
"Loading shared library: " << cfg_.load_shared_library(i)
117 void* handle = dlopen(cfg_.load_shared_library(i).c_str(), RTLD_LAZY);
120 glog << die <<
"Failed ... check path provided or add to /etc/ld.so.conf " 121 <<
"or LD_LIBRARY_PATH" << std::endl;
126 for (
int i = 0, n = cfg_.load_proto_file_size(); i < n; ++i)
128 glog.is(VERBOSE) &&
glog <<
"Loading protobuf file: " << cfg_.load_proto_file(i)
131 if (!goby::util::DynamicProtobufManager::find_descriptor(cfg_.load_proto_file(i)))
132 glog.is(DIE) &&
glog <<
"Failed to load file." << std::endl;
135 moos_client_.SetOnConnectCallBack(MOOSGateway_OnConnect,
this);
136 moos_client_.SetOnDisconnectCallBack(MOOSGateway_OnDisconnect,
this);
137 moos_client_.Run(cfg_.moos_server_host().c_str(), cfg_.moos_server_port(),
138 cfg_.base().app_name().c_str(), cfg_.moos_comm_tick());
140 glog.is(VERBOSE) &&
glog <<
"Waiting to connect to MOOSDB ... " << std::endl;
143 while (!moos_client_.IsConnected())
146 if (i > MAX_CONNECTION_TIMEOUT)
148 glog.is(DIE) &&
glog <<
"Failed to connect to MOOSDB in " << MAX_CONNECTION_TIMEOUT
149 <<
" seconds. Check `moos_server_host` and `moos_server_port`" 156 for (
int i = 0, n = cfg_.goby_subscribe_filter_size(); i < n; ++i)
158 if (!cfg_.pb_convert())
159 goby_moos_pubsub_client_.subscribe(cfg_.goby_subscribe_filter(i));
161 goby_pb_pubsub_client_.subscribe(cfg_.goby_subscribe_filter(i));
164 for (
int i = 0, n = cfg_.pb_pair_size(); i < n; ++i)
166 if (cfg_.pb_pair(i).direction() ==
167 protobuf::MOOSGatewayConfig::ProtobufMOOSBridgePair::PB_TO_MOOS)
169 pb2moos_.insert(std::make_pair(cfg_.pb_pair(i).pb_group(), cfg_.pb_pair(i).moos_var()));
170 goby::pb::DynamicProtobufNode::subscribe(
171 goby::common::PubSubNodeWrapperBase::SOCKET_SUBSCRIBE,
172 boost::bind(&MOOSGateway::pb_inbox,
this, _1, cfg_.pb_pair(i).pb_group()),
173 cfg_.pb_pair(i).pb_group());
175 else if (cfg_.pb_pair(i).direction() ==
176 protobuf::MOOSGatewayConfig::ProtobufMOOSBridgePair::MOOS_TO_PB)
178 moos2pb_.insert(std::make_pair(cfg_.pb_pair(i).moos_var(), cfg_.pb_pair(i).pb_group()));
179 if (!subscribed_vars_.count(cfg_.pb_pair(i).moos_var()))
181 moos_client_.Register(cfg_.pb_pair(i).moos_var(), 0);
182 subscribed_vars_.insert(cfg_.pb_pair(i).moos_var());
187 glog.
add_group(
"from_moos", common::Colors::lt_magenta,
"MOOS -> Goby");
188 glog.
add_group(
"to_moos", common::Colors::lt_green,
"Goby -> MOOS");
191 goby::moos::MOOSGateway::~MOOSGateway() {}
193 void goby::moos::MOOSGateway::moos_inbox(
CMOOSMsg& msg)
196 if (msg.GetSourceAux().find(application_name()) != std::string::npos)
200 msg.SetSourceAux(msg.GetSourceAux() + (msg.GetSourceAux().size() ?
"/" :
"") +
203 glog.is(VERBOSE) &&
glog << group(
"to_moos") << msg << std::endl;
205 moos_client_.Post(msg);
209 void goby::moos::MOOSGateway::loop()
211 check_for_new_moos_variables();
212 std::list<CMOOSMsg> moos_msgs;
214 moos_client_.Fetch(moos_msgs);
216 BOOST_FOREACH (
CMOOSMsg& msg, moos_msgs)
219 if (msg.GetSourceAux().find(application_name()) != std::string::npos)
222 msg.SetSourceAux(msg.GetSourceAux() + (msg.GetSourceAux().size() ?
"/" :
"") +
225 glog.is(VERBOSE) &&
glog << group(
"from_moos") << msg << std::endl;
227 if (!cfg_.pb_convert())
228 goby_moos_pubsub_client_.publish(msg);
230 if (cfg_.pb_convert() && moos2pb_.count(msg.GetKey()))
232 boost::shared_ptr<google::protobuf::Message> pbmsg =
233 dynamic_parse_for_moos(msg.GetString());
236 typedef std::multimap<std::string, std::string>::iterator It;
237 std::pair<It, It> it_range = moos2pb_.equal_range(msg.GetKey());
238 for (It it = it_range.first; it != it_range.second; ++it)
239 goby_pb_pubsub_client_.publish(*pbmsg, it->second);
247 void goby::moos::MOOSGateway::check_for_new_moos_variables()
249 const double DEFAULT_WILDCARD_TIME = 1.0;
250 static double dfLastWildCardTime = -1.0;
251 if (MOOSTime() - dfLastWildCardTime > DEFAULT_WILDCARD_TIME)
254 if (moos_client_.ServerRequest(
"VAR_SUMMARY", InMail, 2.0,
false))
256 if (InMail.size() != 1)
258 glog.is(WARN) &&
glog <<
"ServerRequest for VAR_SUMMARY returned incorrect mail " 259 "size (should be one)";
263 std::vector<std::string> all_var;
264 std::string var_str(InMail.begin()->GetString());
265 boost::split(all_var, var_str, boost::is_any_of(
","));
267 BOOST_FOREACH (
const std::string& s, all_var)
269 if (!subscribed_vars_.count(s))
271 if (clears_subscribe_filters(s))
273 glog.is(VERBOSE) &&
glog <<
"moos_client_.Register for " << s << std::endl;
274 moos_client_.Register(s, 0);
275 subscribed_vars_.insert(s);
280 dfLastWildCardTime = MOOSTime();
285 bool goby::moos::MOOSGateway::clears_subscribe_filters(
const std::string& moos_variable)
287 for (
int i = 0, n = cfg_.moos_subscribe_filter_size(); i < n; ++i)
289 if (moos_variable.compare(0, cfg_.moos_subscribe_filter(i).size(),
290 cfg_.moos_subscribe_filter(i)) == 0)
296 void goby::moos::MOOSGateway::pb_inbox(boost::shared_ptr<google::protobuf::Message> msg,
297 const std::string& group)
299 glog.is(DEBUG2) &&
glog <<
"PB --> MOOS: Group: " << group
300 <<
", msg type: " << msg->GetDescriptor()->full_name() << std::endl;
302 std::string serialized;
303 serialize_for_moos(&serialized, *msg);
305 typedef std::multimap<std::string, std::string>::iterator It;
306 std::pair<It, It> it_range = pb2moos_.equal_range(group);
308 for (It it = it_range.first; it != it_range.second; ++it)
309 moos_client_.Notify(it->second, serialized);
312 bool MOOSGateway_OnConnect(
void* pParam)
318 if (pApp->moos_resubscribe_required_)
320 for (std::set<std::string>::const_iterator it = pApp->subscribed_vars_.begin(),
321 end = pApp->subscribed_vars_.end();
324 pApp->moos_client_.Register(*it, 0);
325 glog.is(DEBUG2) &&
glog <<
"Resubscribing for: " << *it << std::endl;
337 bool MOOSGateway_OnDisconnect(
void* pParam)
342 pApp->moos_resubscribe_required_ =
true;
Helpers for MOOS applications for serializing and parsed Google Protocol buffers messages.
void add_group(const std::string &name, Colors::Color color=Colors::nocolor, const std::string &description="")
Add another group to the logger. A group provides related manipulator for categorizing log messages...
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.