Goby Underwater Autonomy Project
Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
|
00001 // t. schneider tes@mit.edu 06.05.08 00002 // ocean engineering graudate student - mit / whoi joint program 00003 // massachusetts institute of technology (mit) 00004 // laboratory for autonomous marine sensing systems (lamss) 00005 // 00006 // this is pAcommsHandler.cpp, part of pAcommsHandler 00007 // 00008 // see the readme file within this directory for information 00009 // pertaining to usage and purpose of this script. 00010 // 00011 // This program is free software: you can redistribute it and/or modify 00012 // it under the terms of the GNU General Public License as published by 00013 // the Free Software Foundation, either version 3 of the License, or 00014 // (at your option) any later version. 00015 // 00016 // This software is distributed in the hope that it will be useful, 00017 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 // GNU General Public License for more details. 00020 // 00021 // You should have received a copy of the GNU General Public License 00022 // along with this software. If not, see <http://www.gnu.org/licenses/>. 00023 00024 #include <cctype> 00025 00026 #include <boost/lexical_cast.hpp> 00027 #include <boost/algorithm/string.hpp> 00028 #include <boost/date_time/posix_time/posix_time.hpp> 00029 #include <boost/numeric/conversion/cast.hpp> 00030 #include <boost/foreach.hpp> 00031 #include <boost/math/special_functions/fpclassify.hpp> 00032 00033 #include "pAcommsHandler.h" 00034 #include "goby/util/sci.h" 00035 00036 using namespace goby::util::tcolor; 00037 using goby::acomms::operator<<; 00038 using goby::util::glogger; 00039 using goby::util::as; 00040 using google::protobuf::uint32; 00041 using goby::acomms::NaN; 00042 using goby::acomms::DCCLMessageVal; 00043 00044 pAcommsHandlerConfig CpAcommsHandler::cfg_; 00045 CpAcommsHandler* CpAcommsHandler::inst_ = 0; 00046 00047 CpAcommsHandler* CpAcommsHandler::get_instance() 00048 { 00049 if(!inst_) 00050 inst_ = new CpAcommsHandler(); 00051 return inst_; 00052 } 00053 00054 CpAcommsHandler::CpAcommsHandler() 00055 : TesMoosApp(&cfg_), 00056 dccl_(&glogger()), 00057 queue_manager_(&glogger()), 00058 driver_(0), 00059 mac_(&glogger()) 00060 { 00061 goby::acomms::connect(&queue_manager_.signal_receive, 00062 this, &CpAcommsHandler::queue_incoming_data); 00063 goby::acomms::connect(&queue_manager_.signal_receive_ccl, 00064 this, &CpAcommsHandler::queue_incoming_data); 00065 goby::acomms::connect(&queue_manager_.signal_ack, 00066 this, &CpAcommsHandler::queue_ack); 00067 goby::acomms::connect(&queue_manager_.signal_data_on_demand, 00068 this, &CpAcommsHandler::queue_on_demand); 00069 goby::acomms::connect(&queue_manager_.signal_queue_size_change, 00070 this, &CpAcommsHandler::queue_qsize); 00071 goby::acomms::connect(&queue_manager_.signal_expire, 00072 this, &CpAcommsHandler::queue_expire); 00073 00074 process_configuration(); 00075 00076 // bind the lower level pieces of goby-acomms together 00077 if(driver_) 00078 { 00079 goby::acomms::bind(*driver_, queue_manager_); 00080 goby::acomms::bind(mac_, *driver_); 00081 00082 // bind our methods to the rest of the goby-acomms signals 00083 goby::acomms::connect(&driver_->signal_all_outgoing, 00084 this, &CpAcommsHandler::modem_raw_out); 00085 goby::acomms::connect(&driver_->signal_all_incoming, 00086 this, &CpAcommsHandler::modem_raw_in); 00087 goby::acomms::connect(&driver_->signal_range_reply, 00088 this, &CpAcommsHandler::modem_range_reply); 00089 } 00090 00091 00092 do_subscriptions(); 00093 } 00094 00095 CpAcommsHandler::~CpAcommsHandler() 00096 {} 00097 00098 void CpAcommsHandler::loop() 00099 { 00100 dccl_loop(); 00101 if(driver_) driver_->do_work(); 00102 mac_.do_work(); 00103 queue_manager_.do_work(); 00104 } 00105 00106 00107 void CpAcommsHandler::dccl_loop() 00108 { 00109 std::set<unsigned> ids; 00110 if(dccl_.is_time_trigger(ids)) 00111 { 00112 BOOST_FOREACH(unsigned id, ids) 00113 { 00114 goby::acomms::protobuf::ModemDataTransmission mm; 00115 pack(id, &mm); 00116 } 00117 } 00118 if(cfg_.tcp_share_enable() && tcp_share_server_) 00119 { 00120 std::string s; 00121 while(tcp_share_server_->readline(&s)) 00122 { 00123 goby::acomms::protobuf::ModemDataTransmission msg; 00124 parse_for_moos(s, &msg); 00125 00126 glogger() << group("tcp") << "incoming: " << msg << std::endl; 00127 00128 // post for others to use as needed in MOOS 00129 00130 goby::acomms::DCCLHeaderDecoder decoder(msg.data()); 00131 unsigned dccl_id = decoder[goby::acomms::HEAD_DCCL_ID]; 00132 std::string in_var = dccl_.get_incoming_hex_var(dccl_id); 00133 00134 publish(in_var,s); 00135 unpack(msg); 00136 } 00137 } 00138 00139 } 00140 00141 00142 void CpAcommsHandler::do_subscriptions() 00143 // RegisterVariables: register for variables we want to get mail for 00144 { 00145 // non dccl 00146 typedef std::pair<std::string, goby::acomms::protobuf::QueueKey> P; 00147 BOOST_FOREACH(const P& p, out_moos_var2queue_) 00148 { 00149 if(p.second.type() != goby::acomms::protobuf::QUEUE_DCCL) 00150 subscribe(p.first, &CpAcommsHandler::handle_message_push, this); 00151 } 00152 00153 00154 00155 // ping 00156 subscribe(MOOS_VAR_COMMAND_RANGING, &CpAcommsHandler::handle_ranging_request, this); 00157 00158 // update comms cycle 00159 subscribe(MOOS_VAR_CYCLE_UPDATE, &CpAcommsHandler::handle_mac_cycle_update, this); 00160 subscribe(MOOS_VAR_POLLER_UPDATE, &CpAcommsHandler::handle_mac_cycle_update, this); 00161 00162 subscribe(MOOS_VAR_FLUSH_QUEUE, &CpAcommsHandler::handle_flush_queue, this); 00163 00164 00165 std::set<std::string> enc_vars, dec_vars; 00166 00167 BOOST_FOREACH(unsigned id, dccl_.all_message_ids()) 00168 { 00169 std::set<std::string> vars = dccl_.get_pubsub_encode_vars(id); 00170 enc_vars.insert(vars.begin(), vars.end()); 00171 00172 vars = dccl_.get_pubsub_decode_vars(id); 00173 dec_vars.insert(vars.begin(), vars.end()); 00174 } 00175 BOOST_FOREACH(const std::string& s, dec_vars) 00176 { 00177 if(!s.empty()) 00178 { 00179 subscribe(s, &CpAcommsHandler::dccl_inbox, this); 00180 00181 glogger() << group("dccl_dec") 00182 << "registering (dynamic) for decoding related hex var: {" 00183 << s << "}" << std::endl; 00184 } 00185 } 00186 BOOST_FOREACH(const std::string& s, enc_vars) 00187 { 00188 if(!s.empty()) 00189 { 00190 subscribe(s, &CpAcommsHandler::dccl_inbox, this); 00191 00192 glogger() << group("dccl_dec") << "registering (dynamic) for encoding related hex var: {" << s << "}" << std::endl; 00193 } 00194 } 00195 00196 glogger() << group("dccl_enc") << dccl_.brief_summary() << std::endl; 00197 } 00198 00199 00200 // 00201 // Mail Handlers 00202 // 00203 00204 00205 void CpAcommsHandler::dccl_inbox(const CMOOSMsg& msg) 00206 { 00207 const std::string& key = msg.GetKey(); 00208 bool is_str = msg.IsString(); 00209 const std::string& sval = msg.GetString(); 00210 00211 std::set<unsigned> ids; 00212 unsigned id; 00213 if(dccl_.is_publish_trigger(ids, key, sval)) 00214 { 00215 BOOST_FOREACH(unsigned id, ids) 00216 { 00217 goby::acomms::protobuf::ModemDataTransmission mm; 00218 pack(id, &mm); 00219 } 00220 } 00221 else if(dccl_.is_incoming(id, key) && is_str && 00222 !(msg.GetSource() == GetAppName() && msg.GetCommunity() == goby::util::as<std::string>(cfg_.common().community()))) 00223 { 00224 goby::acomms::protobuf::ModemDataTransmission mm; 00225 parse_for_moos(sval, &mm); 00226 unpack(mm); 00227 } 00228 } 00229 00230 void CpAcommsHandler::handle_ranging_request(const CMOOSMsg& msg) 00231 { 00232 goby::acomms::protobuf::ModemRangingRequest request_msg; 00233 parse_for_moos(msg.GetString(), &request_msg); 00234 glogger() << "ranging request: " << request_msg << std::endl; 00235 if(driver_) driver_->handle_initiate_ranging(&request_msg); 00236 } 00237 00238 void CpAcommsHandler::handle_flush_queue(const CMOOSMsg& msg) 00239 { 00240 goby::acomms::protobuf::QueueFlush flush; 00241 std::string moos_string = boost::trim_copy(msg.GetString()); 00242 // if contains no spaces, assume it is a "key=value," string 00243 if(moos_string.find(" ") == std::string::npos) 00244 from_moos_comma_equals_string(&flush, moos_string); 00245 // assume it is a TextFormat protobuf message 00246 else 00247 parse_for_moos(moos_string, &flush); 00248 00249 glogger() << "queue flush request: " << flush << std::endl; 00250 queue_manager_.flush_queue(flush); 00251 } 00252 00253 00254 void CpAcommsHandler::handle_message_push(const CMOOSMsg& msg) 00255 { 00256 goby::acomms::protobuf::ModemDataTransmission new_message; 00257 parse_for_moos(msg.GetString(), &new_message); 00258 00259 if(!new_message.base().has_time()) 00260 new_message.mutable_base()->set_time(as<std::string>(goby::util::goby_time())); 00261 00262 goby::acomms::protobuf::QueueKey& qk = out_moos_var2queue_[msg.GetKey()]; 00263 00264 if(new_message.data().empty()) 00265 glogger() << warn << "message is either empty or contains invalid data" << std::endl; 00266 else if(!(qk.type() == goby::acomms::protobuf::QUEUE_DCCL)) 00267 { 00268 new_message.mutable_queue_key()->CopyFrom(out_moos_var2queue_[msg.GetKey()]); 00269 00270 std::string serialized; 00271 serialize_for_moos(&serialized, new_message); 00272 publish(MOOS_VAR_OUTGOING_DATA, serialized); 00273 queue_manager_.push_message(new_message); 00274 } 00275 } 00276 00277 00278 void CpAcommsHandler::handle_mac_cycle_update(const CMOOSMsg& msg) 00279 { 00280 goby::acomms::protobuf::MACUpdate update_msg; 00281 parse_for_moos(msg.GetString(), &update_msg); 00282 00283 glogger() << "got update for MAC: " << update_msg << std::endl; 00284 00285 if(!update_msg.dest() == cfg_.modem_id()) 00286 { 00287 glogger() << "update not for us" << std::endl; 00288 return; 00289 } 00290 00291 switch(update_msg.update_type()) 00292 { 00293 case goby::acomms::protobuf::MACUpdate::REPLACE: 00294 mac_.clear_all_slots(); 00295 // fall through intentional 00296 case goby::acomms::protobuf::MACUpdate::ADD: 00297 BOOST_FOREACH(const goby::acomms::protobuf::Slot& slot, update_msg.slot()) 00298 mac_.add_slot(slot); 00299 break; 00300 00301 case goby::acomms::protobuf::MACUpdate::REMOVE: 00302 BOOST_FOREACH(const goby::acomms::protobuf::Slot& slot, update_msg.slot()) 00303 mac_.remove_slot(slot); 00304 break; 00305 } 00306 } 00307 00308 // 00309 // Callbacks from goby libraries 00310 // 00311 void CpAcommsHandler::queue_qsize(const goby::acomms::protobuf::QueueSize& size) 00312 { 00313 glogger() << "New qsize: " << size << std::endl; 00314 00315 std::string serialized; 00316 serialize_for_moos(&serialized, size); 00317 00318 publish(MOOS_VAR_QSIZE, serialized); 00319 } 00320 void CpAcommsHandler::queue_expire(const goby::acomms::protobuf::ModemDataExpire& message) 00321 { 00322 std::string serialized; 00323 serialize_for_moos(&serialized, message); 00324 00325 publish(MOOS_VAR_EXPIRE, serialized); 00326 } 00327 00328 void CpAcommsHandler::queue_ack(const goby::acomms::protobuf::ModemDataAck& message) 00329 { 00330 std::string serialized; 00331 serialize_for_moos(&serialized, message); 00332 00333 publish(MOOS_VAR_ACK, serialized); 00334 } 00335 00336 00337 00338 void CpAcommsHandler::queue_incoming_data(const goby::acomms::protobuf::ModemDataTransmission& message) 00339 { 00340 std::string serialized; 00341 serialize_for_moos(&serialized, message); 00342 CMOOSMsg m(MOOS_NOTIFY, MOOS_VAR_INCOMING_DATA, serialized, -1); 00343 publish(m); 00344 00345 // we know what this type is 00346 if(in_queue2moos_var_.count(message.queue_key())) 00347 { 00348 CMOOSMsg m_specific(MOOS_NOTIFY, in_queue2moos_var_[message.queue_key()], serialized, -1); 00349 00350 publish(m_specific); 00351 00352 glogger() << group("q_in") << "published received data to " 00353 << in_queue2moos_var_[message.queue_key()] << ": " << message << std::endl; 00354 00355 if(message.queue_key().type() == goby::acomms::protobuf::QUEUE_DCCL) 00356 unpack(message); 00357 } 00358 } 00359 00360 00361 void CpAcommsHandler::queue_on_demand(const goby::acomms::protobuf::ModemDataRequest& request_msg, goby::acomms::protobuf::ModemDataTransmission* data_msg) 00362 { 00363 pack(data_msg->queue_key().id(), data_msg); 00364 } 00365 00366 00367 00368 void CpAcommsHandler::modem_raw_in(const goby::acomms::protobuf::ModemMsgBase& base_msg) 00369 { 00370 if(base_msg.raw().length() < MAX_MOOS_PACKET) 00371 publish(MOOS_VAR_NMEA_IN, base_msg.raw()); 00372 } 00373 00374 void CpAcommsHandler::modem_raw_out(const goby::acomms::protobuf::ModemMsgBase& base_msg) 00375 { 00376 std::string out; 00377 serialize_for_moos(&out, base_msg); 00378 00379 if(base_msg.raw().length() < MAX_MOOS_PACKET) 00380 publish(MOOS_VAR_NMEA_OUT, base_msg.raw()); 00381 00382 } 00383 00384 void CpAcommsHandler::modem_range_reply(const goby::acomms::protobuf::ModemRangingReply& message) 00385 { 00386 glogger() << "got range reply: " << message << std::endl; 00387 std::string serialized; 00388 serialize_for_moos(&serialized, message); 00389 publish(MOOS_VAR_RANGING, serialized); 00390 } 00391 00392 00393 // 00394 // READ CONFIGURATION 00395 // 00396 00397 void CpAcommsHandler::process_configuration() 00398 { 00399 // create driver object 00400 switch(cfg_.driver_type()) 00401 { 00402 case pAcommsHandlerConfig::DRIVER_WHOI_MICROMODEM: 00403 driver_ = new goby::acomms::MMDriver(&glogger()); 00404 break; 00405 00406 case pAcommsHandlerConfig::DRIVER_ABC_EXAMPLE_MODEM: 00407 driver_ = new goby::acomms::ABCDriver(&glogger()); 00408 break; 00409 00410 case pAcommsHandlerConfig::DRIVER_NONE: break; 00411 } 00412 00413 // add groups to flexostream human terminal output 00414 mac_.add_flex_groups(&glogger()); 00415 dccl_.add_flex_groups(&glogger()); 00416 queue_manager_.add_flex_groups(&glogger()); 00417 if(driver_) driver_->add_flex_groups(&glogger()); 00418 glogger().add_group("tcp", goby::util::Colors::green, "tcp share"); 00419 00420 if(cfg_.has_modem_id_lookup_path()) 00421 glogger() << modem_lookup_.read_lookup_file(cfg_.modem_id_lookup_path()) << std::endl; 00422 else 00423 glogger() << warn << "no modem_id_lookup_path in moos file. this is required for conversions between modem_id and vehicle name / type." << std::endl; 00424 00425 glogger() << "reading in geodesy information: " << std::endl; 00426 if (!cfg_.common().has_lat_origin() || !cfg_.common().has_lon_origin()) 00427 { 00428 glogger() << die << "no lat_origin or lon_origin specified in configuration. this is required for geodesic conversion" << std::endl; 00429 } 00430 else 00431 { 00432 if(geodesy_.Initialise(cfg_.common().lat_origin(), cfg_.common().lon_origin())) 00433 glogger() << "success!" << std::endl; 00434 else 00435 glogger() << die << "could not initialize geodesy" << std::endl; 00436 } 00437 00438 // check and propogate modem id 00439 if(cfg_.modem_id() == goby::acomms::BROADCAST_ID) 00440 glogger() << die << "modem_id = " << goby::acomms::BROADCAST_ID << " is reserved for broadcast messages. You must specify a modem_id != " << goby::acomms::BROADCAST_ID << " for this vehicle." << std::endl; 00441 00442 publish("MODEM_ID", cfg_.modem_id()); 00443 publish("VEHICLE_ID", cfg_.modem_id()); 00444 00445 cfg_.mutable_queue_cfg()->set_modem_id(cfg_.modem_id()); 00446 cfg_.mutable_mac_cfg()->set_modem_id(cfg_.modem_id()); 00447 cfg_.mutable_dccl_cfg()->set_modem_id(cfg_.modem_id()); 00448 cfg_.mutable_driver_cfg()->set_modem_id(cfg_.modem_id()); 00449 00450 // do a unique merge of the message files for dccl and queue 00451 for(int i = 0, n = cfg_.dccl_cfg().message_file_size(); i < n; ++i) 00452 { 00453 bool exists = false; 00454 for(int j = 0, o = cfg_.queue_cfg().message_file_size(); j < o; ++j) 00455 { 00456 if(cfg_.queue_cfg().message_file(j).path() == cfg_.dccl_cfg().message_file(i).path()) 00457 exists = true; 00458 } 00459 00460 if(!exists) 00461 cfg_.mutable_queue_cfg()->add_message_file()->CopyFrom(cfg_.dccl_cfg().message_file(i)); 00462 00463 } 00464 00465 for(int i = 0, n = cfg_.queue_cfg().message_file_size(); i < n; ++i) 00466 { 00467 bool exists = false; 00468 for(int j = 0, o = cfg_.dccl_cfg().message_file_size(); j < o; ++j) 00469 { 00470 if(cfg_.dccl_cfg().message_file(j).path() == cfg_.queue_cfg().message_file(i).path()) 00471 exists = true; 00472 } 00473 00474 if(!exists) 00475 cfg_.mutable_dccl_cfg()->add_message_file()->CopyFrom(cfg_.queue_cfg().message_file(i)); 00476 00477 } 00478 00479 // start goby-acomms classes 00480 if(driver_) driver_->startup(cfg_.driver_cfg()); 00481 mac_.startup(cfg_.mac_cfg()); 00482 queue_manager_.set_cfg(cfg_.queue_cfg()); 00483 dccl_.set_cfg(cfg_.dccl_cfg()); 00484 00485 // initialize maps between incoming / outgoing MOOS variables and DCCL ids 00486 std::set<unsigned> ids = dccl_.all_message_ids(); 00487 BOOST_FOREACH(unsigned id, ids) 00488 { 00489 goby::acomms::protobuf::QueueKey key; 00490 key.set_type(goby::acomms::protobuf::QUEUE_DCCL); 00491 key.set_id(id); 00492 out_moos_var2queue_[dccl_.get_outgoing_hex_var(id)] = key; 00493 in_queue2moos_var_[key] = dccl_.get_incoming_hex_var(id); 00494 } 00495 00496 for(int i = 0, n = cfg_.queue_cfg().queue_size(); i < n; ++i) 00497 { 00498 in_queue2moos_var_[cfg_.queue_cfg().queue(i).key()] = cfg_.queue_cfg().queue(i).in_pubsub_var(); 00499 out_moos_var2queue_[cfg_.queue_cfg().queue(i).out_pubsub_var()] = 00500 cfg_.queue_cfg().queue(i).key(); 00501 00502 } 00503 00504 00505 // tcp share 00506 if(cfg_.tcp_share_enable()) 00507 { 00508 glogger() << group("tcp") << "tcp_share_port is: " << cfg_.tcp_share_port() << std::endl; 00509 00510 tcp_share_server_ = new goby::util::TCPServer(cfg_.tcp_share_port()); 00511 tcp_share_server_->start(); 00512 glogger() << group("tcp") << "starting TCP server on: " << cfg_.tcp_share_port() << std::endl; 00513 } 00514 00515 BOOST_FOREACH(const std::string& full_ip, cfg_.tcp_share_to_ip()) 00516 { 00517 if(!cfg_.tcp_share_enable()) 00518 { 00519 glogger() << group("tcp") << "tcp_share_in_address given but tcp_enable is false" << std::endl; 00520 } 00521 else 00522 { 00523 std::string ip; 00524 unsigned port = cfg_.tcp_share_port(); 00525 00526 std::string::size_type colon_pos = full_ip.find(":"); 00527 if(colon_pos == std::string::npos) 00528 ip = full_ip; 00529 else 00530 { 00531 ip = full_ip.substr(0, colon_pos); 00532 port = boost::lexical_cast<unsigned>(full_ip.substr(colon_pos+1)); 00533 } 00534 IP ip_and_port(ip, port); 00535 tcp_share_map_[ip_and_port] = new goby::util::TCPClient(ip, port); 00536 tcp_share_map_[ip_and_port]->start(); 00537 glogger() << group("tcp") << "starting TCP client to "<< ip << ":" << port << std::endl; 00538 } 00539 } 00540 00541 // set up algorithms 00542 dccl_.add_algorithm("power_to_dB", boost::bind(&CpAcommsHandler::alg_power_to_dB, this, _1)); 00543 dccl_.add_algorithm("dB_to_power", boost::bind(&CpAcommsHandler::alg_dB_to_power, this, _1)); 00544 00545 dccl_.add_adv_algorithm("TSD_to_soundspeed", boost::bind(&CpAcommsHandler::alg_TSD_to_soundspeed, this, _1, _2)); 00546 dccl_.add_adv_algorithm("subtract", boost::bind(&CpAcommsHandler::alg_subtract, this, _1, _2)); 00547 dccl_.add_adv_algorithm("add", boost::bind(&CpAcommsHandler::alg_add, this, _1, _2)); 00548 dccl_.add_algorithm("abs", boost::bind(&CpAcommsHandler::alg_abs, this, _1)); 00549 00550 dccl_.add_algorithm("to_lower", boost::bind(&CpAcommsHandler::alg_to_lower, this, _1)); 00551 dccl_.add_algorithm("to_upper", boost::bind(&CpAcommsHandler::alg_to_upper, this, _1)); 00552 dccl_.add_algorithm("angle_0_360", boost::bind(&CpAcommsHandler::alg_angle_0_360, this, _1)); 00553 dccl_.add_algorithm("angle_-180_180", boost::bind(&CpAcommsHandler::alg_angle_n180_180, this, _1)); 00554 00555 dccl_.add_adv_algorithm("lat2utm_y", boost::bind(&CpAcommsHandler::alg_lat2utm_y, this, _1, _2)); 00556 dccl_.add_adv_algorithm("lon2utm_x", boost::bind(&CpAcommsHandler::alg_lon2utm_x, this, _1, _2)); 00557 dccl_.add_adv_algorithm("utm_x2lon", boost::bind(&CpAcommsHandler::alg_utm_x2lon, this, _1, _2)); 00558 dccl_.add_adv_algorithm("utm_y2lat", boost::bind(&CpAcommsHandler::alg_utm_y2lat, this, _1, _2)); 00559 dccl_.add_algorithm("modem_id2name", boost::bind(&CpAcommsHandler::alg_modem_id2name, this, _1)); 00560 dccl_.add_algorithm("modem_id2type", boost::bind(&CpAcommsHandler::alg_modem_id2type, this, _1)); 00561 dccl_.add_algorithm("name2modem_id", boost::bind(&CpAcommsHandler::alg_name2modem_id, this, _1)); 00562 00563 dccl_.add_algorithm("lat2hemisphere_initial", boost::bind(&CpAcommsHandler::alg_lat2hemisphere_initial, this, _1)); 00564 dccl_.add_algorithm("lon2hemisphere_initial", boost::bind(&CpAcommsHandler::alg_lon2hemisphere_initial, this, _1)); 00565 00566 dccl_.add_algorithm("lat2nmea_lat", boost::bind(&CpAcommsHandler::alg_lat2nmea_lat, this, _1)); 00567 dccl_.add_algorithm("lon2nmea_lon", boost::bind(&CpAcommsHandler::alg_lon2nmea_lon, this, _1)); 00568 00569 dccl_.add_algorithm("unix_time2nmea_time", boost::bind(&CpAcommsHandler::alg_unix_time2nmea_time, this, _1)); 00570 00571 } 00572 00573 00574 void CpAcommsHandler::pack(unsigned dccl_id, goby::acomms::protobuf::ModemDataTransmission* modem_message) 00575 { 00576 // don't bother packing if we can't encode this 00577 if(dccl_.manip_manager().has(dccl_id, goby::acomms::protobuf::MessageFile::NO_ENCODE)) 00578 return; 00579 00580 // encode the message and notify the MOOSDB 00581 std::map<std::string, std::vector<DCCLMessageVal> >& in = repeat_buffer_[dccl_id]; 00582 00583 // first entry 00584 if(!repeat_count_.count(dccl_id)) 00585 repeat_count_[dccl_id] = 0; 00586 00587 ++repeat_count_[dccl_id]; 00588 00589 BOOST_FOREACH(const std::string& moos_var, dccl_.get_pubsub_src_vars(dccl_id)) 00590 { 00591 const CMOOSMsg& moos_msg = dynamic_vars()[moos_var]; 00592 00593 bool is_dbl = valid(moos_msg) ? moos_msg.IsDouble() : false; 00594 bool is_str = valid(moos_msg) ? moos_msg.IsString() : false; 00595 double dval = valid(moos_msg) ? moos_msg.GetDouble() : NaN; 00596 std::string sval = valid(moos_msg) ? moos_msg.GetString() : ""; 00597 00598 if(is_str) 00599 in[moos_var].push_back(sval); 00600 else if(is_dbl) 00601 in[moos_var].push_back(dval); 00602 else 00603 in[moos_var].push_back(DCCLMessageVal()); 00604 } 00605 00606 // send this message out the door 00607 if(repeat_count_[dccl_id] == dccl_.get_repeat(dccl_id)) 00608 { 00609 try 00610 { 00611 // encode 00612 dccl_.pubsub_encode(dccl_id, modem_message, in); 00613 00614 std::string out_var = dccl_.get_outgoing_hex_var(dccl_id); 00615 00616 std::string serialized; 00617 serialize_for_moos(&serialized, *modem_message); 00618 publish(MOOS_VAR_OUTGOING_DATA, serialized); 00619 publish(out_var, serialized); 00620 00621 modem_message->mutable_queue_key()->set_type(goby::acomms::protobuf::QUEUE_DCCL); 00622 modem_message->mutable_queue_key()->set_id(dccl_id); 00623 queue_manager_.push_message(*modem_message); 00624 00625 handle_tcp_share(modem_message); 00626 } 00627 catch(std::exception& e) 00628 { 00629 glogger() << group("dccl_enc") << warn << "could not encode message: " << *modem_message << ", reason: " << e.what() << std::endl; 00630 } 00631 00632 // flush buffer 00633 repeat_buffer_[dccl_id].clear(); 00634 // reset counter 00635 repeat_count_[dccl_id] = 0; 00636 } 00637 else 00638 { 00639 glogger() << group("dccl_enc") << "finished buffering part " << repeat_count_[dccl_id] << " of " << dccl_.get_repeat(dccl_id) << " for repeated message: " << dccl_.id2name(dccl_id) << ". Nothing has been sent yet." << std::endl; 00640 } 00641 00642 } 00643 00644 void CpAcommsHandler::unpack(goby::acomms::protobuf::ModemDataTransmission modem_message) 00645 { 00646 handle_tcp_share(&modem_message); 00647 00648 try 00649 { 00650 if(modem_message.base().dest() != cfg_.modem_id() && modem_message.base().dest() != goby::acomms::BROADCAST_ID && !dccl_.manip_manager().has(modem_message.queue_key().id(), goby::acomms::protobuf::MessageFile::PROMISCUOUS)) 00651 { 00652 glogger() << group("dccl_dec") << "ignoring message for modem_id " << modem_message.base().dest() << std::endl; 00653 return; 00654 } 00655 00656 std::multimap<std::string, DCCLMessageVal> out; 00657 00658 dccl_.pubsub_decode(modem_message, &out); 00659 00660 typedef std::pair<std::string, DCCLMessageVal> P; 00661 BOOST_FOREACH(const P& p, out) 00662 { 00663 if(p.second.type() == goby::acomms::cpp_double) 00664 { 00665 double dval = p.second; 00666 CMOOSMsg m(MOOS_NOTIFY, p.first, dval, -1); 00667 publish(m); 00668 } 00669 else 00670 { 00671 std::string sval = p.second; 00672 CMOOSMsg m(MOOS_NOTIFY, p.first, sval, -1); 00673 publish(m); 00674 } 00675 } 00676 } 00677 catch(std::exception& e) 00678 { 00679 glogger() << group("dccl_dec") << warn << "could not decode message: " << modem_message << ", reason: " << e.what() << std::endl; 00680 } 00681 } 00682 00683 00684 00685 void CpAcommsHandler::handle_tcp_share(goby::acomms::protobuf::ModemDataTransmission* modem_message) 00686 { 00687 goby::acomms::DCCLHeaderDecoder decoder(modem_message->data()); 00688 unsigned dccl_id = decoder[goby::acomms::HEAD_DCCL_ID]; 00689 00690 if(cfg_.tcp_share_enable() && dccl_.manip_manager().has(dccl_id, goby::acomms::protobuf::MessageFile::TCP_SHARE_IN)) 00691 { 00692 typedef std::pair<IP, goby::util::TCPClient*> P; 00693 BOOST_FOREACH(const P&p, tcp_share_map_) 00694 { 00695 if(p.second->active()) 00696 { 00697 std::stringstream ss; 00698 ss << p.second->local_endpoint() << ":" << cfg_.tcp_share_port(); 00699 modem_message->AddExtension(pAcommsHandlerExtensions::seen_ip, ss.str()); 00700 00701 std::string serialized; 00702 serialize_for_moos(&serialized, *modem_message); 00703 00704 bool ip_seen = false; 00705 for(int i = 0, n = modem_message->ExtensionSize(pAcommsHandlerExtensions::seen_ip); i < n; ++i) 00706 { 00707 if(modem_message->GetExtension(pAcommsHandlerExtensions::seen_ip, i) == p.first.ip_and_port()) 00708 ip_seen = true; 00709 } 00710 00711 00712 if(!ip_seen) 00713 { 00714 glogger() << group("tcp") << "dccl_id: " << dccl_id << ": " << *modem_message << " to " << p.first.ip << ":" << p.first.port << std::endl; 00715 p.second->write(serialized + "\r\n"); 00716 } 00717 else 00718 { 00719 glogger() << group("tcp") << p.first.ip << ":" << p.first.port << " has already seen this message, so not sending." << std::endl; 00720 } 00721 } 00722 else 00723 glogger() << group("tcp") << warn << p.first.ip << ":" << p.first.port << " is not connected." << std::endl; 00724 } 00725 } 00726 } 00727 00728 00729 // 00730 // DCCL ALGORITHMS 00731 // 00732 00733 00734 void CpAcommsHandler::alg_power_to_dB(DCCLMessageVal& val_to_mod) 00735 { 00736 val_to_mod = 10*log10(double(val_to_mod)); 00737 } 00738 00739 void CpAcommsHandler::alg_dB_to_power(DCCLMessageVal& val_to_mod) 00740 { 00741 val_to_mod = pow(10.0, double(val_to_mod)/10.0); 00742 } 00743 00744 // applied to "T" (temperature), references are "S" (salinity), then "D" (depth) 00745 void CpAcommsHandler::alg_TSD_to_soundspeed(DCCLMessageVal& val, 00746 const std::vector<DCCLMessageVal>& ref_vals) 00747 { 00748 val.set(goby::util::mackenzie_soundspeed(val, ref_vals[0], ref_vals[1]), 3); 00749 } 00750 00751 // operator-= 00752 void CpAcommsHandler::alg_subtract(DCCLMessageVal& val, 00753 const std::vector<DCCLMessageVal>& ref_vals) 00754 { 00755 double d = val; 00756 BOOST_FOREACH(const::DCCLMessageVal& mv, ref_vals) 00757 d -= double(mv); 00758 00759 val.set(d, val.precision()); 00760 } 00761 00762 // operator+= 00763 void CpAcommsHandler::alg_add(DCCLMessageVal& val, 00764 const std::vector<DCCLMessageVal>& ref_vals) 00765 { 00766 double d = val; 00767 BOOST_FOREACH(const::DCCLMessageVal& mv, ref_vals) 00768 d += double(mv); 00769 val.set(d, val.precision()); 00770 } 00771 00772 00773 00774 void CpAcommsHandler::alg_to_upper(DCCLMessageVal& val_to_mod) 00775 { 00776 val_to_mod = boost::algorithm::to_upper_copy(std::string(val_to_mod)); 00777 } 00778 00779 void CpAcommsHandler::alg_to_lower(DCCLMessageVal& val_to_mod) 00780 { 00781 val_to_mod = boost::algorithm::to_lower_copy(std::string(val_to_mod)); 00782 } 00783 00784 void CpAcommsHandler::alg_angle_0_360(DCCLMessageVal& angle) 00785 { 00786 double a = angle; 00787 while (a < 0) 00788 a += 360; 00789 while (a >=360) 00790 a -= 360; 00791 angle = a; 00792 } 00793 00794 void CpAcommsHandler::alg_angle_n180_180(DCCLMessageVal& angle) 00795 { 00796 double a = angle; 00797 while (a < -180) 00798 a += 360; 00799 while (a >=180) 00800 a -= 360; 00801 angle = a; 00802 } 00803 00804 void CpAcommsHandler::alg_lat2utm_y(DCCLMessageVal& mv, 00805 const std::vector<DCCLMessageVal>& ref_vals) 00806 { 00807 double lat = mv; 00808 double lon = ref_vals[0]; 00809 double x = NaN; 00810 double y = NaN; 00811 00812 if(!(boost::math::isnan)(lat) && !(boost::math::isnan)(lon)) geodesy_.LatLong2LocalUTM(lat, lon, y, x); 00813 mv = y; 00814 } 00815 00816 void CpAcommsHandler::alg_lon2utm_x(DCCLMessageVal& mv, 00817 const std::vector<DCCLMessageVal>& ref_vals) 00818 { 00819 double lon = mv; 00820 double lat = ref_vals[0]; 00821 double x = NaN; 00822 double y = NaN; 00823 00824 if(!(boost::math::isnan)(lat) && !(boost::math::isnan)(lon)) geodesy_.LatLong2LocalUTM(lat, lon, y, x); 00825 mv = x; 00826 } 00827 00828 00829 void CpAcommsHandler::alg_utm_x2lon(DCCLMessageVal& mv, 00830 const std::vector<DCCLMessageVal>& ref_vals) 00831 { 00832 double x = mv; 00833 double y = ref_vals[0]; 00834 00835 double lat = NaN; 00836 double lon = NaN; 00837 if(!(boost::math::isnan)(y) && !(boost::math::isnan)(x)) geodesy_.UTM2LatLong(x, y, lat, lon); 00838 mv = lon; 00839 } 00840 00841 void CpAcommsHandler::alg_utm_y2lat(DCCLMessageVal& mv, 00842 const std::vector<DCCLMessageVal>& ref_vals) 00843 { 00844 double y = mv; 00845 double x = ref_vals[0]; 00846 00847 double lat = NaN; 00848 double lon = NaN; 00849 if(!(boost::math::isnan)(x) && !(boost::math::isnan)(y)) geodesy_.UTM2LatLong(x, y, lat, lon); 00850 mv = lat; 00851 } 00852 00853 00854 void CpAcommsHandler::alg_modem_id2name(DCCLMessageVal& in) 00855 { 00856 bool is_numeric = true; 00857 BOOST_FOREACH(const char c, std::string(in)) 00858 { 00859 if(!isdigit(c)) 00860 { 00861 is_numeric = false; 00862 break; 00863 } 00864 } 00865 if(is_numeric) in = modem_lookup_.get_name_from_id(boost::lexical_cast<unsigned>(std::string(in))); 00866 } 00867 00868 void CpAcommsHandler::alg_modem_id2type(DCCLMessageVal& in) 00869 { 00870 bool is_numeric = true; 00871 BOOST_FOREACH(const char c, std::string(in)) 00872 { 00873 if(!isdigit(c)) 00874 { 00875 is_numeric = false; 00876 break; 00877 } 00878 } 00879 if(is_numeric) in = modem_lookup_.get_type_from_id(boost::lexical_cast<unsigned>(std::string(in))); 00880 00881 } 00882 00883 void CpAcommsHandler::alg_name2modem_id(DCCLMessageVal& in) 00884 { 00885 std::stringstream ss; 00886 ss << modem_lookup_.get_id_from_name(std::string(in)); 00887 in = ss.str(); 00888 } 00889 00890 void CpAcommsHandler::alg_lat2hemisphere_initial(goby::acomms::DCCLMessageVal& val_to_mod) 00891 { 00892 double lat = val_to_mod; 00893 if(lat < 0) 00894 val_to_mod = "S"; 00895 else 00896 val_to_mod = "N"; 00897 } 00898 00899 void CpAcommsHandler::alg_lon2hemisphere_initial(goby::acomms::DCCLMessageVal& val_to_mod) 00900 { 00901 double lon = val_to_mod; 00902 if(lon < 0) 00903 val_to_mod = "W"; 00904 else 00905 val_to_mod = "E"; 00906 } 00907 00908 void CpAcommsHandler::alg_abs(goby::acomms::DCCLMessageVal& val_to_mod) 00909 { 00910 val_to_mod = std::abs(double(val_to_mod)); 00911 } 00912 00913 void CpAcommsHandler::alg_unix_time2nmea_time(goby::acomms::DCCLMessageVal& val_to_mod) 00914 { 00915 double unix_time = val_to_mod; 00916 boost::posix_time::ptime ptime = goby::util::unix_double2ptime(unix_time); 00917 00918 // HHMMSS.SSSSSS 00919 boost::format f("%02d%02d%02d.%06d"); 00920 f % ptime.time_of_day().hours() % ptime.time_of_day().minutes() % ptime.time_of_day().seconds() % (ptime.time_of_day().fractional_seconds() * 1000000 / boost::posix_time::time_duration::ticks_per_second()); 00921 00922 val_to_mod = f.str(); 00923 } 00924 00925 00926 void CpAcommsHandler::alg_lat2nmea_lat(goby::acomms::DCCLMessageVal& val_to_mod) 00927 { 00928 double lat = val_to_mod; 00929 00930 // DDMM.MM 00931 boost::format f("%02d%02d.%04d"); 00932 00933 int degrees = std::floor(lat); 00934 int minutes = std::floor((lat - degrees) * 60); 00935 int ten_thousandth_minutes = std::floor(((lat - degrees) * 60 - minutes) * 10000); 00936 00937 f % degrees % minutes % ten_thousandth_minutes; 00938 00939 val_to_mod = f.str(); 00940 } 00941 00942 00943 00944 void CpAcommsHandler::alg_lon2nmea_lon(goby::acomms::DCCLMessageVal& val_to_mod) 00945 { 00946 double lon = val_to_mod; 00947 00948 // DDDMM.MM 00949 boost::format f("%03d%02d.%04d"); 00950 00951 int degrees = std::floor(lon); 00952 int minutes = std::floor((lon - degrees) * 60); 00953 int ten_thousandth_minutes = std::floor(((lon - degrees) * 60 - minutes) * 10000); 00954 00955 f % degrees % minutes % ten_thousandth_minutes; 00956 00957 val_to_mod = f.str(); 00958 } 00959