Goby Underwater Autonomy Project
Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
|
00001 // copyright 2009 t. schneider tes@mit.edu 00002 // 00003 // this file is part of the Queue Library (libqueue), 00004 // the goby-acomms message queue manager. goby-acomms is a collection of 00005 // libraries for acoustic underwater networking 00006 // 00007 // This program is free software: you can redistribute it and/or modify 00008 // it under the terms of the GNU General Public License as published by 00009 // the Free Software Foundation, either version 3 of the License, or 00010 // (at your option) any later version. 00011 // 00012 // This software is distributed in the hope that it will be useful, 00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 // GNU General Public License for more details. 00016 // 00017 // You should have received a copy of the GNU General Public License 00018 // along with this software. If not, see <http://www.gnu.org/licenses/>. 00019 00020 #include <boost/foreach.hpp> 00021 00022 #include "goby/acomms/xml/xml_parser.h" 00023 #include "goby/util/logger.h" 00024 #include "goby/util/time.h" 00025 #include "goby/util/binary.h" 00026 00027 #include "goby/acomms/xml/xml_parser.h" 00028 #include "queue_manager.h" 00029 #include "queue_constants.h" 00030 #include "queue_xml_callbacks.h" 00031 00032 int goby::acomms::QueueManager::modem_id_ = 0; 00033 00034 goby::acomms::QueueManager::QueueManager(std::ostream* log /* =0 */) 00035 : log_(log), 00036 packet_ack_(0), 00037 packet_dest_(BROADCAST_ID) 00038 {} 00039 00040 void goby::acomms::QueueManager::add_queue(const protobuf::QueueConfig& cfg) 00041 { 00042 Queue q(cfg, log_, modem_id_); 00043 00044 if(queues_.count(cfg.key())) 00045 { 00046 std::stringstream ss; 00047 ss << "Queue: duplicate key specified for key: " << cfg.key(); 00048 throw queue_exception(ss.str()); 00049 } 00050 else if((q.cfg().key().id() > MAX_ID || q.cfg().key().id() < MIN_ID) && q.cfg().key().type() == protobuf::QUEUE_DCCL) 00051 { 00052 std::stringstream ss; 00053 ss << "Queue: key (" << cfg.key() << ") is out of bounds for use with libqueue. Use a id from " << MIN_ID << " to " << MAX_ID; 00054 throw queue_exception(ss.str()); 00055 } 00056 else 00057 { 00058 std::pair<std::map<goby::acomms::protobuf::QueueKey, Queue>::iterator,bool> new_q_pair = 00059 queues_.insert(std::make_pair(cfg.key(), q)); 00060 00061 qsize(&((new_q_pair.first)->second)); 00062 } 00063 00064 00065 00066 if(log_) *log_<< group("q_out") << "added new queue: \n" << q << std::endl; 00067 00068 } 00069 00070 std::set<unsigned> goby::acomms::QueueManager::add_xml_queue_file(const std::string& xml_file) 00071 { 00072 std::vector<protobuf::QueueConfig> cfgs; 00073 00074 // Register handlers for XML parsing 00075 QueueContentHandler content(cfgs); 00076 QueueErrorHandler error; 00077 // instantiate a parser for the xml message files 00078 XMLParser parser(content, error); 00079 std::set<unsigned> added_ids; 00080 00081 parser.parse(xml_file); 00082 BOOST_FOREACH(const protobuf::QueueConfig& c, cfgs) 00083 { 00084 add_queue(c); 00085 added_ids.insert(c.key().id()); 00086 } 00087 00088 return added_ids; 00089 } 00090 00091 void goby::acomms::QueueManager::do_work() 00092 { 00093 typedef std::pair<const protobuf::QueueKey, Queue> P; 00094 for(std::map<protobuf::QueueKey, Queue>::iterator it = queues_.begin(), n = queues_.end(); it != n; ++it) 00095 { 00096 std::vector<protobuf::ModemDataTransmission> expired_msgs = it->second.expire(); 00097 00098 protobuf::ModemDataExpire expire; 00099 protobuf::ModemDataTransmission* data_msg = expire.mutable_orig_msg(); 00100 BOOST_FOREACH(*data_msg, expired_msgs) 00101 { 00102 //expire.mutable_orig_msg()->mutable_queue_key()->CopyFrom(it->first); 00103 signal_expire(expire); 00104 } 00105 00106 } 00107 00108 } 00109 00110 void goby::acomms::QueueManager::push_message(const protobuf::ModemDataTransmission& data_msg) 00111 { 00112 // message is to us, auto-loopback 00113 if(data_msg.base().dest() == modem_id_) 00114 { 00115 if(log_) *log_<< group("q_out") << "outgoing message is for us: using loopback, not physical interface" << std::endl; 00116 00117 handle_modem_receive(data_msg); 00118 } 00119 // we have a queue with this key, so push message for sending 00120 else if(queues_.count(data_msg.queue_key())) 00121 { 00122 if(data_msg.queue_key().type() == protobuf::QUEUE_DCCL && manip_manager_.has(data_msg.queue_key().id(), protobuf::MessageFile::NO_QUEUE)) 00123 { 00124 if(log_) *log_ << group("q_out") << "not queuing DCCL ID: " << data_msg.queue_key().id() << "; NO_QUEUE manipulator is set" << std::endl; 00125 } 00126 else 00127 { 00128 queues_[data_msg.queue_key()].push_message(data_msg); 00129 qsize(&queues_[data_msg.queue_key()]); 00130 } 00131 00132 if(data_msg.queue_key().type() == protobuf::QUEUE_DCCL && manip_manager_.has(data_msg.queue_key().id(), protobuf::MessageFile::LOOPBACK)) 00133 { 00134 if(log_) *log_ << group("q_out") << data_msg.queue_key() << " LOOPBACK manipulator set, sending back to decoder" << std::endl; 00135 handle_modem_receive(data_msg); 00136 } 00137 } 00138 else 00139 { 00140 std::stringstream ss; 00141 ss << "no queue for key: " << data_msg.queue_key(); 00142 throw queue_exception(ss.str()); 00143 } 00144 } 00145 00146 00147 void goby::acomms::QueueManager::flush_queue(const protobuf::QueueFlush& flush) 00148 { 00149 std::map<goby::acomms::protobuf::QueueKey, Queue>::iterator it = queues_.find(flush.key()); 00150 00151 if(it != queues_.end()) 00152 { 00153 it->second.flush(); 00154 if(log_) *log_ << group("q_out") << " flushed queue: " << flush << std::endl; 00155 qsize(&it->second); 00156 } 00157 else 00158 { 00159 if(log_) *log_ << group("q_out") << warn << " cannot find queue to flush: " << flush << std::endl; 00160 } 00161 } 00162 00163 00164 std::string goby::acomms::QueueManager::summary() const 00165 { 00166 std::string s; 00167 typedef std::pair<const protobuf::QueueKey, Queue> P; 00168 BOOST_FOREACH(const P& p, queues_) 00169 s += p.second.summary(); 00170 00171 return s; 00172 } 00173 00174 00175 std::ostream& goby::acomms::operator<< (std::ostream& out, const QueueManager& d) 00176 { 00177 out << d.summary(); 00178 return out; 00179 } 00180 00181 // finds and publishes outgoing data for the modem driver 00182 // first query every Queue for its priority data using 00183 // priority_values(priority, last_send_time) 00184 // priority_values returns false if that object has no data to give 00185 // (either no data at all, or in blackout interval) 00186 // thus, from all the priority values that return true, pick the one with the lowest 00187 // priority value, or given a tie, pick the one with the oldest last_send_time 00188 void goby::acomms::QueueManager::handle_modem_data_request(const protobuf::ModemDataRequest& request_msg, protobuf::ModemDataTransmission* data_msg) 00189 { 00190 data_msg->mutable_base()->set_src(modem_id_); 00191 if(request_msg.frame() == 0) 00192 { 00193 clear_packet(); 00194 data_msg->mutable_base()->set_dest(request_msg.base().dest()); 00195 } 00196 else 00197 { 00198 if(packet_ack_) 00199 data_msg->set_ack_requested(packet_ack_); 00200 00201 data_msg->mutable_base()->set_dest(packet_dest_); 00202 } 00203 // first (0th) user-frame 00204 Queue* winning_queue = find_next_sender(request_msg, *data_msg, true); 00205 00206 // no data at all for this frame ... :( 00207 if(!winning_queue) 00208 { 00209 data_msg->set_ack_requested(packet_ack_); 00210 data_msg->mutable_base()->set_dest(packet_dest_); 00211 if(log_) *log_<< group("q_out") << "no data found. sending blank to firmware" 00212 << ": " << *data_msg << std::endl; 00213 } 00214 else 00215 { 00216 stitch_recursive(request_msg, data_msg, winning_queue); 00217 } 00218 } 00219 00220 bool goby::acomms::QueueManager::stitch_recursive(const protobuf::ModemDataRequest& request_msg, protobuf::ModemDataTransmission* complete_data_msg, Queue* winning_queue) 00221 { 00222 const unsigned CCL_ID_BYTES = HEAD_CCL_ID_SIZE / BITS_IN_BYTE; 00223 00224 // new user frame (e.g. 32B) 00225 protobuf::ModemDataTransmission next_data_msg = winning_queue->give_data(request_msg); 00226 00227 if(log_) *log_<< group("q_out") << "sending data to firmware from: " 00228 << winning_queue->cfg().name() 00229 << ": " << next_data_msg << std::endl; 00230 00231 if(next_data_msg.queue_key().type() == protobuf::QUEUE_DCCL && 00232 manip_manager_.has(next_data_msg.queue_key().id(), 00233 protobuf::MessageFile::LOOPBACK_AS_SENT)) 00234 { 00235 if(log_) *log_ << group("q_out") << next_data_msg.queue_key() << " LOOPBACK_AS_SENT manipulator set, sending back to decoder" << std::endl; 00236 handle_modem_receive(next_data_msg); 00237 } 00238 00239 00240 // 00241 // ACK 00242 // 00243 // insert ack if desired 00244 if(next_data_msg.ack_requested()) 00245 waiting_for_ack_.insert(std::pair<unsigned, Queue*>(request_msg.frame(), winning_queue)); 00246 else 00247 { 00248 winning_queue->pop_message(request_msg.frame()); 00249 qsize(winning_queue); // notify change in queue size 00250 } 00251 00252 // if an ack been set, do not unset these 00253 if (packet_ack_ == false) packet_ack_ = next_data_msg.ack_requested(); 00254 00255 00256 // 00257 // DEST 00258 // 00259 // update destination address 00260 if(request_msg.frame() == 0) 00261 { 00262 // discipline the destination of the packet if initially unset 00263 if(complete_data_msg->base().dest() == QUERY_DESTINATION_ID) 00264 complete_data_msg->mutable_base()->set_dest(next_data_msg.base().dest()); 00265 00266 if(packet_dest_ == BROADCAST_ID) 00267 packet_dest_ = complete_data_msg->base().dest(); 00268 } 00269 else 00270 { 00271 complete_data_msg->mutable_base()->set_dest(packet_dest_); 00272 } 00273 00274 // 00275 // DCCL 00276 // 00277 if(winning_queue->cfg().key().type() == protobuf::QUEUE_DCCL) 00278 { 00279 // e.g. 32B 00280 std::string new_data = next_data_msg.data(); 00281 00282 // insert the size of the next field (e.g. 32B) right after the header 00283 std::string frame_size(USER_FRAME_NEXT_SIZE_BYTES, 00284 static_cast<char>((next_data_msg.data().size()-DCCL_NUM_HEADER_BYTES))); 00285 new_data.insert(DCCL_NUM_HEADER_BYTES, frame_size); 00286 00287 // append without the CCL ID (old size + 31B) 00288 *(complete_data_msg->mutable_data()) += new_data.substr(CCL_ID_BYTES); 00289 00290 bool is_last_user_frame = true; 00291 // if remaining bytes is greater than the DCCL header, we have a chance of adding another user-frame 00292 if((request_msg.max_bytes() - complete_data_msg->data().size()) > DCCL_NUM_HEADER_BYTES 00293 && winning_queue->cfg().key().type() != protobuf::QUEUE_CCL) 00294 { 00295 // fetch the next candidate 00296 winning_queue = find_next_sender(request_msg, *complete_data_msg, false); 00297 if(winning_queue) is_last_user_frame = false; 00298 } 00299 00300 if(!is_last_user_frame) 00301 { 00302 replace_header(false, complete_data_msg, next_data_msg, new_data); 00303 return stitch_recursive(request_msg, complete_data_msg, winning_queue); 00304 } 00305 else 00306 { 00307 replace_header(true, complete_data_msg, next_data_msg, new_data); 00308 // add the CCL ID back on to the message (e.g. 33B) 00309 complete_data_msg->mutable_data()->insert(0, std::string(1, DCCL_CCL_HEADER)); 00310 // remove the size of the next field from the last user-frame (e.g. 32B). 00311 complete_data_msg->mutable_data()->erase(complete_data_msg->data().size()-new_data.size()+DCCL_NUM_HEADER_BYTES, USER_FRAME_NEXT_SIZE_BYTES); 00312 // set the ack to conform to the entire message 00313 complete_data_msg->set_ack_requested(packet_ack_); 00314 00315 return true; 00316 } 00317 } 00318 // 00319 // CCL 00320 // 00321 else 00322 { 00323 // not DCCL, copy the msg and we are done 00324 complete_data_msg->CopyFrom(next_data_msg); 00325 return true; 00326 } 00327 } 00328 00329 void goby::acomms::QueueManager::replace_header(bool is_last_user_frame, protobuf::ModemDataTransmission* data_msg, const protobuf::ModemDataTransmission& next_data_msg, const std::string& new_data) 00330 { 00331 // decode the header so that we can modify the flags 00332 DCCLHeaderDecoder head_decoder(new_data); 00333 00334 // don't put the multimessage flag on the last user-frame 00335 head_decoder[HEAD_MULTIMESSAGE_FLAG] = 00336 (!is_last_user_frame) ? true : false; 00337 // put the broadcast flag on if needed 00338 head_decoder[HEAD_BROADCAST_FLAG] = 00339 (next_data_msg.base().dest() == BROADCAST_ID) ? true : false; 00340 00341 // re-encode the header 00342 DCCLHeaderEncoder head_encoder(head_decoder.get()); 00343 00344 // replace the header without the CCL ID 00345 const unsigned CCL_ID_BYTES = HEAD_CCL_ID_SIZE / BITS_IN_BYTE; 00346 data_msg->mutable_data()->replace(data_msg->data().size()-new_data.size()+CCL_ID_BYTES, 00347 head_encoder.str().size()-CCL_ID_BYTES, 00348 head_encoder.str().substr(CCL_ID_BYTES)); 00349 } 00350 00351 00352 void goby::acomms::QueueManager::clear_packet() 00353 { 00354 typedef std::pair<unsigned, Queue*> P; 00355 BOOST_FOREACH(const P& p, waiting_for_ack_) 00356 p.second->clear_ack_queue(); 00357 00358 waiting_for_ack_.clear(); 00359 00360 packet_ack_ = false; 00361 packet_dest_ = BROADCAST_ID; 00362 } 00363 00364 00365 00366 goby::acomms::Queue* goby::acomms::QueueManager::find_next_sender(const protobuf::ModemDataRequest& request_msg, const protobuf::ModemDataTransmission& data_msg, bool first_user_frame) 00367 { 00368 // competition between variable about who gets to send 00369 double winning_priority; 00370 boost::posix_time::ptime winning_last_send_time; 00371 00372 Queue* winning_queue = 0; 00373 00374 if(log_) *log_<< group("priority") << "starting priority contest\n" 00375 << "requesting: " << request_msg << "\n" 00376 << "have " << data_msg.data().size() << "/" << request_msg.max_bytes() << "B: " << data_msg << std::endl; 00377 00378 00379 for(std::map<protobuf::QueueKey, Queue>::iterator it = queues_.begin(), n = queues_.end(); it != n; ++it) 00380 { 00381 Queue& oq = it->second; 00382 00383 // encode on demand 00384 if(oq.cfg().key().type() == protobuf::QUEUE_DCCL && 00385 manip_manager_.has(oq.cfg().key().id(), protobuf::MessageFile::ON_DEMAND) && 00386 (!oq.size() || oq.newest_msg_time() + ON_DEMAND_SKEW < util::goby_time())) 00387 { 00388 protobuf::ModemDataTransmission data_msg; 00389 data_msg.mutable_queue_key()->CopyFrom(it->first); 00390 signal_data_on_demand(request_msg, &data_msg); 00391 push_message(data_msg); 00392 } 00393 00394 double priority; 00395 boost::posix_time::ptime last_send_time; 00396 if(oq.priority_values(priority, last_send_time, request_msg, data_msg)) 00397 { 00398 // no winner, better winner, or equal & older winner 00399 // AND not CCL when not the first user-frame 00400 if((!winning_queue || priority > winning_priority || 00401 (priority == winning_priority && last_send_time < winning_last_send_time)) 00402 && !(oq.cfg().key().type() == protobuf::QUEUE_CCL && !first_user_frame)) 00403 { 00404 winning_priority = priority; 00405 winning_last_send_time = last_send_time; 00406 winning_queue = &oq; 00407 } 00408 } 00409 } 00410 00411 if(log_) *log_<< group("priority") << "\t" 00412 << "all other queues have no messages" << std::endl; 00413 00414 if(winning_queue) 00415 { 00416 if(log_) *log_<< group("priority") << winning_queue->cfg().name() 00417 << " has highest priority." << std::endl; 00418 } 00419 00420 return winning_queue; 00421 } 00422 00423 00424 void goby::acomms::QueueManager::handle_modem_ack(const protobuf::ModemDataAck& orig_ack_msg) 00425 { 00426 protobuf::ModemDataAck ack_msg(orig_ack_msg); 00427 00428 if(ack_msg.base().dest() != modem_id_) 00429 { 00430 if(log_) *log_<< group("q_in") << warn 00431 << "ignoring ack for modem_id = " << ack_msg.base().dest() << std::endl; 00432 return; 00433 } 00434 else if(!waiting_for_ack_.count(ack_msg.frame())) 00435 { 00436 if(log_) *log_<< group("q_in") 00437 << "got ack but we were not expecting one" << std::endl; 00438 return; 00439 } 00440 else 00441 { 00442 00443 // got an ack, let's pop this! 00444 if(log_) *log_<< group("q_in") << "received ack for this id" << std::endl; 00445 00446 00447 std::multimap<unsigned, Queue *>::iterator it = waiting_for_ack_.find(ack_msg.frame()); 00448 while(it != waiting_for_ack_.end()) 00449 { 00450 Queue* oq = it->second; 00451 00452 protobuf::ModemDataTransmission* removed_msg = ack_msg.mutable_orig_msg(); 00453 if(!oq->pop_message_ack(ack_msg.frame(), removed_msg)) 00454 { 00455 if(log_) *log_<< group("q_in") << warn 00456 << "failed to pop message from " 00457 << oq->cfg().name() << std::endl; 00458 } 00459 else 00460 { 00461 qsize(oq); 00462 //ack_msg.mutable_orig_msg()->mutable_queue_key()->CopyFrom(oq->cfg().key()); 00463 signal_ack(ack_msg); 00464 00465 } 00466 00467 if(log_) *log_<< group("q_in") << ack_msg << std::endl; 00468 00469 waiting_for_ack_.erase(it); 00470 00471 it = waiting_for_ack_.find(ack_msg.frame()); 00472 } 00473 } 00474 00475 return; 00476 } 00477 00478 00479 // parses and publishes incoming data 00480 // by matching the variableID field with the variable specified 00481 // in a "receive = " line of the configuration file 00482 void goby::acomms::QueueManager::handle_modem_receive(const protobuf::ModemDataTransmission& m) 00483 { 00484 // copy so we can modify in various ways 00485 protobuf::ModemDataTransmission message = m; 00486 00487 if(log_) *log_<< group("q_in") << "received message" 00488 << ": " << message << std::endl; 00489 00490 std::string data = message.data(); 00491 if(data.size() < DCCL_NUM_HEADER_BYTES) 00492 return; 00493 00494 DCCLHeaderDecoder head_decoder(data); 00495 int ccl_id = head_decoder[HEAD_CCL_ID]; 00496 00497 // check for queue_dccl type 00498 if(ccl_id == DCCL_CCL_HEADER) 00499 { 00500 unstitch_recursive(&data, &message); 00501 } 00502 // check for ccl type 00503 else 00504 { 00505 protobuf::QueueKey key; 00506 key.set_type(protobuf::QUEUE_CCL); 00507 key.set_id(ccl_id); 00508 00509 std::map<protobuf::QueueKey, Queue>::iterator it = queues_.find(key); 00510 00511 if (it != queues_.end()) 00512 { 00513 message.mutable_queue_key()->CopyFrom(key); 00514 signal_receive_ccl(message); 00515 } 00516 else 00517 { 00518 if(log_) *log_<< group("q_in") << warn << "incoming data string is not for us (not DCCL or known CCL)." << std::endl; 00519 } 00520 } 00521 } 00522 00523 bool goby::acomms::QueueManager::unstitch_recursive(std::string* data, protobuf::ModemDataTransmission* data_msg) 00524 { 00525 unsigned original_dest = data_msg->base().dest(); 00526 DCCLHeaderDecoder head_decoder(*data); 00527 bool multimessage_flag = head_decoder[HEAD_MULTIMESSAGE_FLAG]; 00528 bool broadcast_flag = head_decoder[HEAD_BROADCAST_FLAG]; 00529 unsigned dccl_id = head_decoder[HEAD_DCCL_ID]; 00530 00531 // test multimessage bit 00532 if(multimessage_flag) 00533 { 00534 // extract frame_size 00535 // TODO (tes): Make this work properly for strings larger than one byte 00536 unsigned frame_size = data->substr(DCCL_NUM_HEADER_BYTES, USER_FRAME_NEXT_SIZE_BYTES)[0]; 00537 00538 // erase the frame size byte 00539 data->erase(DCCL_NUM_HEADER_BYTES, USER_FRAME_NEXT_SIZE_BYTES); 00540 00541 // extract the data for this user-frame 00542 data_msg->set_data(data->substr(0, (frame_size + DCCL_NUM_HEADER_BYTES))); 00543 00544 data->erase(USER_FRAME_NEXT_SIZE_BYTES, 00545 (frame_size + DCCL_NUM_HEADER_BYTES-USER_FRAME_NEXT_SIZE_BYTES)); 00546 } 00547 else 00548 { 00549 data_msg->set_data(*data); 00550 } 00551 00552 // reset these flags 00553 head_decoder[HEAD_MULTIMESSAGE_FLAG] = false; 00554 head_decoder[HEAD_BROADCAST_FLAG] = false; 00555 00556 DCCLHeaderEncoder head_encoder(head_decoder.get()); 00557 data_msg->mutable_data()->replace(0, DCCL_NUM_HEADER_BYTES, head_encoder.str()); 00558 // overwrite destination as BROADCAST if broadcast bit is set 00559 data_msg->mutable_base()->set_dest(broadcast_flag ? BROADCAST_ID : original_dest); 00560 publish_incoming_piece(data_msg, dccl_id); 00561 00562 // put the destination back 00563 data_msg->mutable_base()->set_dest(original_dest); 00564 00565 // unstitch until the multimessage flag is no longer set 00566 return multimessage_flag ? unstitch_recursive(data, data_msg) : true; 00567 } 00568 00569 bool goby::acomms::QueueManager::publish_incoming_piece(protobuf::ModemDataTransmission* message, const unsigned incoming_var_id) 00570 { 00571 if(message->base().dest() != BROADCAST_ID && message->base().dest() != modem_id_ && 00572 !manip_manager_.has(incoming_var_id, protobuf::MessageFile::PROMISCUOUS)) 00573 { 00574 if(log_) *log_<< group("q_in") << warn << "ignoring message for modem_id = " 00575 << message->base().dest() << std::endl; 00576 return false; 00577 } 00578 00579 protobuf::QueueKey dccl_key; 00580 dccl_key.set_type(protobuf::QUEUE_DCCL); 00581 dccl_key.set_id(incoming_var_id); 00582 00583 std::map<protobuf::QueueKey, Queue>::iterator it_dccl = queues_.find(dccl_key); 00584 00585 if(it_dccl == queues_.end()) 00586 { 00587 if(log_) *log_<< group("q_in") << warn << "no mapping for this variable ID: " 00588 << incoming_var_id << std::endl; 00589 return false; 00590 } 00591 00592 message->mutable_queue_key()->CopyFrom(dccl_key); 00593 signal_receive(*message); 00594 00595 return true; 00596 } 00597 00598 void goby::acomms::QueueManager::add_flex_groups(util::FlexOstream* tout) 00599 { 00600 tout->add_group("push", util::Colors::lt_cyan, "stack push - outgoing messages (goby_queue)"); 00601 tout->add_group("pop", util::Colors::lt_green, "stack pop - outgoing messages (goby_queue)"); 00602 tout->add_group("priority", util::Colors::yellow, "priority contest (goby_queue)"); 00603 tout->add_group("q_out", util::Colors::cyan, "outgoing queuing messages (goby_queue)"); 00604 tout->add_group("q_in", util::Colors::green, "incoming queuing messages (goby_queue)"); 00605 } 00606 00607 00608 void goby::acomms::QueueManager::set_cfg(const protobuf::QueueManagerConfig& cfg) 00609 { 00610 cfg_ = cfg; 00611 process_cfg(); 00612 } 00613 00614 void goby::acomms::QueueManager::merge_cfg(const protobuf::QueueManagerConfig& cfg) 00615 { 00616 cfg_.MergeFrom(cfg); 00617 process_cfg(); 00618 } 00619 00620 00621 void goby::acomms::QueueManager::process_cfg() 00622 { 00623 queues_.clear(); 00624 waiting_for_ack_.clear(); 00625 manip_manager_.clear(); 00626 00627 for(int i = 0, n = cfg_.message_file_size(); i < n; ++i) 00628 { 00629 std::set<unsigned> new_ids = add_xml_queue_file(cfg_.message_file(i).path()); 00630 BOOST_FOREACH(unsigned new_id, new_ids) 00631 { 00632 for(int j = 0, o = cfg_.message_file(i).manipulator_size(); j < o; ++j) 00633 manip_manager_.add(new_id, cfg_.message_file(i).manipulator(j)); 00634 } 00635 } 00636 00637 for(int i = 0, n = cfg_.queue_size(); i < n; ++i) 00638 add_queue(cfg_.queue(i)); 00639 00640 modem_id_ = cfg_.modem_id(); 00641 } 00642 00643 void goby::acomms::QueueManager::qsize(Queue* q) 00644 { 00645 protobuf::QueueSize size; 00646 size.mutable_key()->CopyFrom(q->cfg().key()); 00647 size.set_size(q->size()); 00648 signal_queue_size_change(size); 00649 }