Goby Underwater Autonomy Project
Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
|
00001 // copyright 2008, 2009 t. schneider tes@mit.edu 00002 // 00003 // this file is part of the Queue Library (libqueue), 00004 // the goby-acomms message queue manager. goby-acomms is a collection of 00005 // libraries for acoustic underwater networking 00006 // 00007 // This program is free software: you can redistribute it and/or modify 00008 // it under the terms of the GNU General Public License as published by 00009 // the Free Software Foundation, either version 3 of the License, or 00010 // (at your option) any later version. 00011 // 00012 // This software is distributed in the hope that it will be useful, 00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 // GNU General Public License for more details. 00016 // 00017 // You should have received a copy of the GNU General Public License 00018 // along with this software. If not, see <http://www.gnu.org/licenses/>. 00019 00020 #include "goby/acomms/acomms_constants.h" 00021 #include "goby/util/logger.h" 00022 00023 #include "queue.h" 00024 #include "queue_manager.h" 00025 00026 using goby::util::goby_time; 00027 00028 goby::acomms::Queue::Queue(const protobuf::QueueConfig cfg /* = 0 */, 00029 std::ostream* log /* = 0 */, 00030 int modem_id /* = 0 */) 00031 : cfg_(cfg), 00032 last_send_time_(goby_time()), 00033 log_(log) 00034 {} 00035 00036 00037 // add a new message 00038 bool goby::acomms::Queue::push_message(const protobuf::ModemDataTransmission& data_msg) 00039 { 00040 if(data_msg.data().empty()) 00041 { 00042 if(log_) *log_ << group("q_out") << warn 00043 << "empty message attempted to be pushed to queue " 00044 << cfg_.name() << std::endl; 00045 return false; 00046 } 00047 else if(cfg_.key().type() == protobuf::QUEUE_CCL && data_msg.data()[0] != char(cfg_.key().id())) 00048 { 00049 if(log_) *log_ << group("q_out") << warn 00050 << "CCL message attempted to be pushed to queue " 00051 << cfg_.name() << " that doesn't have proper CCL byte. " 00052 << "expecting: " << cfg_.key().id() << ", given: " << int(data_msg.data()[0]) 00053 << std::endl; 00054 return false; 00055 } 00056 00057 00058 00059 messages_.push_back(data_msg); 00060 00061 protobuf::ModemDataTransmission* new_data_msg = &messages_.back(); 00062 00063 if(!new_data_msg->has_ack_requested()) 00064 new_data_msg->set_ack_requested(cfg_.ack()); 00065 00066 // needed for CCL messages 00067 new_data_msg->mutable_base()->set_src(QueueManager::modem_id_); 00068 00069 // pop messages off the stack if the queue is full 00070 if(cfg_.max_queue() && messages_.size() > cfg_.max_queue()) 00071 { 00072 messages_it it_to_erase = 00073 cfg_.newest_first() ? messages_.begin() : messages_.end(); 00074 00075 // want "back" iterator not "end" 00076 if(it_to_erase == messages_.end()) --it_to_erase; 00077 00078 // if we were waiting for an ack for this, erase that too 00079 waiting_for_ack_it it = find_ack_value(it_to_erase); 00080 if(it != waiting_for_ack_.end()) waiting_for_ack_.erase(it); 00081 00082 if(log_) *log_ << group("pop") << "queue exceeded for " << cfg_.name() << 00083 ". removing: " << *it_to_erase << std::endl; 00084 00085 messages_.erase(it_to_erase); 00086 } 00087 00088 if(log_) *log_ << group("push") << "pushing" << " to send stack " 00089 << cfg_.name() << " (qsize " << size() << "/" 00090 << cfg_.max_queue() << "): " << data_msg << std::endl; 00091 00092 return true; 00093 } 00094 00095 messages_it goby::acomms::Queue::next_message_it() 00096 { 00097 messages_it it_to_give = 00098 cfg_.newest_first() ? messages_.end() : messages_.begin(); 00099 if(it_to_give == messages_.end()) --it_to_give; // want "back" iterator not "end" 00100 00101 // find a value that isn't already waiting to be acknowledged 00102 while(find_ack_value(it_to_give) != waiting_for_ack_.end()) 00103 cfg_.newest_first() ? --it_to_give : ++it_to_give; 00104 00105 return it_to_give; 00106 } 00107 00108 goby::acomms::protobuf::ModemDataTransmission goby::acomms::Queue::give_data(const protobuf::ModemDataRequest& request_msg) 00109 { 00110 messages_it it_to_give = next_message_it(); 00111 00112 bool ack = it_to_give->ack_requested(); 00113 // broadcast cannot acknowledge 00114 if(it_to_give->base().dest() == BROADCAST_ID && ack == true) 00115 { 00116 if(log_) *log_ << group("pop") << warn << "overriding ack request and setting ack = false because dest = BROADCAST (0) cannot acknowledge messages" << std::endl; 00117 ack = false; 00118 } 00119 00120 it_to_give->set_ack_requested(ack); 00121 00122 if(ack) 00123 waiting_for_ack_.insert(std::pair<unsigned, messages_it>(request_msg.frame(), 00124 it_to_give)); 00125 00126 last_send_time_ = goby_time(); 00127 00128 return *it_to_give; 00129 } 00130 00131 00132 // gives priority values. returns false if in blackout interval or if no data or if messages of wrong size, true if not in blackout 00133 bool goby::acomms::Queue::priority_values(double& priority, 00134 boost::posix_time::ptime& last_send_time, 00135 const protobuf::ModemDataRequest& request_msg, 00136 const protobuf::ModemDataTransmission& data_msg) 00137 { 00138 priority = util::time_duration2double((goby_time()-last_send_time_))/cfg_.ttl()*cfg_.value_base(); 00139 00140 last_send_time = last_send_time_; 00141 00142 // no messages left to send 00143 if(messages_.size() <= waiting_for_ack_.size()) return false; 00144 00145 messages_it next_msg_it = next_message_it(); 00146 00147 // for followup user-frames, destination must be either zero (broadcast) 00148 // or the same as the first user-frame 00149 00150 if (last_send_time_ + boost::posix_time::seconds(cfg_.blackout_time()) > goby_time()) 00151 { 00152 if(log_) *log_<< group("priority") << "\t" << cfg_.name() << " is in blackout" << std::endl; 00153 return false; 00154 } 00155 // wrong size 00156 else if(request_msg.has_max_bytes() && 00157 (next_msg_it->data().size() > (request_msg.max_bytes() - data_msg.data().size()))) 00158 { 00159 if(log_) *log_<< group("priority") << "\t" << cfg_.name() << " next message is too large {" << next_msg_it->data().size() << "}" << std::endl; 00160 return false; 00161 } 00162 // wrong destination 00163 else if((data_msg.base().has_dest() 00164 && data_msg.base().dest() != QUERY_DESTINATION_ID 00165 && next_msg_it->base().dest() != BROADCAST_ID 00166 && data_msg.base().dest() != next_msg_it->base().dest())) 00167 { 00168 if(log_) *log_<< group("priority") << "\t" << cfg_.name() << " next message has wrong destination (must be BROADCAST (0) or same as first user-frame)" << std::endl; 00169 return false; 00170 } 00171 // wrong ack value UNLESS message can be broadcast 00172 else if((data_msg.has_ack_requested() && !data_msg.ack_requested() && 00173 next_msg_it->ack_requested() && data_msg.base().dest() != acomms::BROADCAST_ID)) 00174 { 00175 if(log_) *log_<< group("priority") << "\t" << cfg_.name() << " next message requires ACK and the packet does not" << std::endl; 00176 return false; 00177 } 00178 else // ok! 00179 { 00180 if(log_) *log_<< group("priority") << "\t" << cfg().name() 00181 << " (" << next_msg_it->data().size() 00182 << "B) has priority value" 00183 << ": " << priority << std::endl; 00184 return true; 00185 } 00186 00187 } 00188 00189 bool goby::acomms::Queue::pop_message(unsigned frame) 00190 { 00191 if (cfg_.newest_first() && !messages_.back().ack_requested()) 00192 { 00193 stream_for_pop(messages_.back()); 00194 messages_.pop_back(); 00195 } 00196 else if(!cfg_.newest_first() && !messages_.front().ack_requested()) 00197 { 00198 stream_for_pop(messages_.front()); 00199 messages_.pop_front(); 00200 } 00201 else 00202 { 00203 return false; 00204 } 00205 00206 return true; 00207 } 00208 00209 bool goby::acomms::Queue::pop_message_ack(unsigned frame, protobuf::ModemDataTransmission* data_msg) 00210 { 00211 // pop message from the ack stack 00212 if(waiting_for_ack_.count(frame)) 00213 { 00214 // remove a messages in this frame that needs ack 00215 waiting_for_ack_it it = waiting_for_ack_.find(frame); 00216 *data_msg = *(it->second); 00217 00218 stream_for_pop(*data_msg); 00219 00220 // remove the message 00221 messages_.erase(it->second); 00222 // clear the acknowledgement map entry for this message 00223 waiting_for_ack_.erase(it); 00224 00225 } 00226 else 00227 { 00228 return false; 00229 } 00230 00231 return true; 00232 } 00233 00234 void goby::acomms::Queue::stream_for_pop(const protobuf::ModemDataTransmission& data_msg) 00235 { 00236 if(log_) *log_ << group("pop") << "popping" << " from send stack " 00237 << cfg_.name() << " (qsize " << size()-1 00238 << "/" << cfg_.max_queue() << "): " << data_msg << std::endl; 00239 } 00240 00241 std::vector<goby::acomms::protobuf::ModemDataTransmission> goby::acomms::Queue::expire() 00242 { 00243 std::vector<protobuf::ModemDataTransmission> expired_msgs; 00244 00245 while(!messages_.empty()) 00246 { 00247 if((goby::util::as<boost::posix_time::ptime>(messages_.front().base().time()) 00248 + boost::posix_time::seconds(cfg_.ttl())) < goby_time()) 00249 { 00250 expired_msgs.push_back(messages_.front()); 00251 if(log_) *log_ << group("pop") << "expiring" << " from send stack " 00252 << cfg_.name() << " (qsize " << size()-1 00253 << "/" << cfg_.max_queue() << "): " << messages_.front() << std::endl; 00254 // if we were waiting for an ack for this, erase that too 00255 waiting_for_ack_it it = find_ack_value(messages_.begin()); 00256 if(it != waiting_for_ack_.end()) waiting_for_ack_.erase(it); 00257 00258 messages_.pop_front(); 00259 } 00260 else 00261 { 00262 return expired_msgs; 00263 } 00264 } 00265 00266 return expired_msgs; 00267 } 00268 00269 waiting_for_ack_it goby::acomms::Queue::find_ack_value(messages_it it_to_find) 00270 { 00271 waiting_for_ack_it n = waiting_for_ack_.end(); 00272 for(waiting_for_ack_it it = waiting_for_ack_.begin(); it != n; ++it) 00273 { 00274 if(it->second == it_to_find) 00275 return it; 00276 } 00277 return n; 00278 } 00279 00280 00281 std::string goby::acomms::Queue::summary() const 00282 { 00283 std::stringstream ss; 00284 ss << cfg_; 00285 return ss.str(); 00286 } 00287 00288 00289 void goby::acomms::Queue::flush() 00290 { 00291 if(log_) *log_ << group("pop") << "flushing stack " << cfg_.name() << " (qsize 0)" << std::endl; 00292 messages_.clear(); 00293 waiting_for_ack_.clear(); 00294 } 00295 00296 00297 std::ostream& goby::acomms::operator<< (std::ostream& os, const goby::acomms::Queue& oq) 00298 { 00299 os << oq.summary(); 00300 return os; 00301 } 00302