Note: Goby version 1 (shown here) is now considered obsolete. Please use version 2 for new projects, and consider upgrading old projects.

Goby Underwater Autonomy Project  Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
acomms/libqueue/queue.cpp
00001 // copyright 2008, 2009 t. schneider tes@mit.edu
00002 //
00003 // this file is part of the Queue Library (libqueue),
00004 // the goby-acomms message queue manager. goby-acomms is a collection of 
00005 // libraries for acoustic underwater networking
00006 //
00007 // This program is free software: you can redistribute it and/or modify
00008 // it under the terms of the GNU General Public License as published by
00009 // the Free Software Foundation, either version 3 of the License, or
00010 // (at your option) any later version.
00011 //
00012 // This software is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this software.  If not, see <http://www.gnu.org/licenses/>.
00019 
00020 #include "goby/acomms/acomms_constants.h"
00021 #include "goby/util/logger.h"
00022 
00023 #include "queue.h"
00024 #include "queue_manager.h"
00025 
00026 using goby::util::goby_time;
00027 
00028 goby::acomms::Queue::Queue(const protobuf::QueueConfig cfg /* = 0 */,
00029                            std::ostream* log /* = 0 */,
00030                            int modem_id /* = 0 */)
00031     : cfg_(cfg),
00032       last_send_time_(goby_time()),
00033       log_(log)
00034 {}
00035 
00036 
00037 // add a new message
00038 bool goby::acomms::Queue::push_message(const protobuf::ModemDataTransmission& data_msg)
00039 {
00040     if(data_msg.data().empty())
00041     {
00042         if(log_) *log_ << group("q_out") << warn
00043                        << "empty message attempted to be pushed to queue "
00044                        << cfg_.name() << std::endl;
00045         return false;
00046     }
00047     else if(cfg_.key().type() == protobuf::QUEUE_CCL && data_msg.data()[0] != char(cfg_.key().id()))
00048     {
00049         if(log_) *log_ << group("q_out") << warn
00050                        << "CCL message attempted to be pushed to queue "
00051                        << cfg_.name() << " that doesn't have proper CCL byte. " 
00052                        << "expecting: " << cfg_.key().id() << ", given: " << int(data_msg.data()[0])
00053                        << std::endl;
00054         return false;
00055     }
00056     
00057     
00058     
00059     messages_.push_back(data_msg);
00060     
00061     protobuf::ModemDataTransmission* new_data_msg = &messages_.back();
00062 
00063     if(!new_data_msg->has_ack_requested())
00064         new_data_msg->set_ack_requested(cfg_.ack());
00065     
00066     // needed for CCL messages
00067     new_data_msg->mutable_base()->set_src(QueueManager::modem_id_);
00068     
00069     // pop messages off the stack if the queue is full
00070     if(cfg_.max_queue() && messages_.size() > cfg_.max_queue())
00071     {        
00072         messages_it it_to_erase =
00073             cfg_.newest_first() ? messages_.begin() : messages_.end();
00074 
00075         // want "back" iterator not "end"
00076         if(it_to_erase == messages_.end()) --it_to_erase;
00077         
00078         // if we were waiting for an ack for this, erase that too
00079         waiting_for_ack_it it = find_ack_value(it_to_erase);
00080         if(it != waiting_for_ack_.end()) waiting_for_ack_.erase(it);        
00081         
00082         if(log_) *log_ << group("pop") << "queue exceeded for " << cfg_.name() <<
00083                      ". removing: " << *it_to_erase << std::endl;
00084 
00085         messages_.erase(it_to_erase);
00086     }
00087     
00088     if(log_) *log_ << group("push") << "pushing" << " to send stack "
00089                    << cfg_.name() << " (qsize " << size() <<  "/"
00090                    << cfg_.max_queue() << "): " << data_msg << std::endl;
00091     
00092     return true;     
00093 }
00094 
00095 messages_it goby::acomms::Queue::next_message_it()
00096 {
00097     messages_it it_to_give =
00098         cfg_.newest_first() ? messages_.end() : messages_.begin();
00099     if(it_to_give == messages_.end()) --it_to_give; // want "back" iterator not "end"    
00100     
00101     // find a value that isn't already waiting to be acknowledged
00102     while(find_ack_value(it_to_give) != waiting_for_ack_.end())
00103         cfg_.newest_first() ? --it_to_give : ++it_to_give;
00104 
00105     return it_to_give;
00106 }
00107 
00108 goby::acomms::protobuf::ModemDataTransmission goby::acomms::Queue::give_data(const protobuf::ModemDataRequest& request_msg)
00109 {
00110     messages_it it_to_give = next_message_it();
00111 
00112     bool ack = it_to_give->ack_requested();
00113     // broadcast cannot acknowledge
00114     if(it_to_give->base().dest() == BROADCAST_ID && ack == true)
00115     {
00116         if(log_) *log_ << group("pop") << warn << "overriding ack request and setting ack = false because dest = BROADCAST (0) cannot acknowledge messages" << std::endl;
00117         ack = false;
00118     }
00119 
00120     it_to_give->set_ack_requested(ack);
00121 
00122     if(ack)
00123         waiting_for_ack_.insert(std::pair<unsigned, messages_it>(request_msg.frame(),
00124                                                                  it_to_give));
00125 
00126     last_send_time_ = goby_time();    
00127     
00128     return *it_to_give;
00129 }
00130 
00131 
00132 // gives priority values. returns false if in blackout interval or if no data or if messages of wrong size, true if not in blackout
00133 bool goby::acomms::Queue::priority_values(double& priority,
00134                                           boost::posix_time::ptime& last_send_time,
00135                                           const protobuf::ModemDataRequest& request_msg,
00136                                           const protobuf::ModemDataTransmission& data_msg)
00137 {
00138     priority = util::time_duration2double((goby_time()-last_send_time_))/cfg_.ttl()*cfg_.value_base();
00139     
00140     last_send_time = last_send_time_;
00141 
00142     // no messages left to send
00143     if(messages_.size() <= waiting_for_ack_.size()) return false;
00144     
00145     messages_it next_msg_it = next_message_it();
00146 
00147     // for followup user-frames, destination must be either zero (broadcast)
00148     // or the same as the first user-frame
00149 
00150     if (last_send_time_ + boost::posix_time::seconds(cfg_.blackout_time()) > goby_time())
00151     {
00152         if(log_) *log_<< group("priority") << "\t" << cfg_.name() << " is in blackout" << std::endl;
00153         return false;
00154     }
00155     // wrong size
00156     else if(request_msg.has_max_bytes() &&
00157             (next_msg_it->data().size() > (request_msg.max_bytes() - data_msg.data().size())))
00158     {
00159         if(log_) *log_<< group("priority") << "\t" << cfg_.name() << " next message is too large {" << next_msg_it->data().size() << "}" << std::endl;
00160         return false;
00161     }
00162     // wrong destination
00163     else if((data_msg.base().has_dest()
00164              && data_msg.base().dest() != QUERY_DESTINATION_ID
00165              && next_msg_it->base().dest() != BROADCAST_ID
00166              && data_msg.base().dest() != next_msg_it->base().dest()))
00167     {
00168         if(log_) *log_<< group("priority") << "\t" <<  cfg_.name() << " next message has wrong destination  (must be BROADCAST (0) or same as first user-frame)" << std::endl;
00169         return false; 
00170     }
00171     // wrong ack value UNLESS message can be broadcast
00172     else if((data_msg.has_ack_requested() && !data_msg.ack_requested() &&
00173              next_msg_it->ack_requested() && data_msg.base().dest() != acomms::BROADCAST_ID))
00174     {
00175         if(log_) *log_<< group("priority") << "\t" <<  cfg_.name() << " next message requires ACK and the packet does not" << std::endl;
00176         return false; 
00177     }
00178     else // ok!
00179     {
00180         if(log_) *log_<< group("priority") << "\t" << cfg().name()
00181                       << " (" << next_msg_it->data().size()
00182                       << "B) has priority value"
00183                       << ": " << priority << std::endl;
00184         return true;
00185     }
00186     
00187 }
00188 
00189 bool goby::acomms::Queue::pop_message(unsigned frame)
00190 {   
00191     if (cfg_.newest_first() && !messages_.back().ack_requested())
00192     {
00193         stream_for_pop(messages_.back());
00194         messages_.pop_back();
00195     }
00196     else if(!cfg_.newest_first() && !messages_.front().ack_requested())
00197     {
00198         stream_for_pop(messages_.front());
00199         messages_.pop_front();
00200     }
00201     else
00202     {
00203         return false;
00204     }    
00205 
00206     return true;
00207 }
00208 
00209 bool goby::acomms::Queue::pop_message_ack(unsigned frame, protobuf::ModemDataTransmission* data_msg)
00210 {
00211     // pop message from the ack stack
00212     if(waiting_for_ack_.count(frame))
00213     {
00214         // remove a messages in this frame that needs ack
00215         waiting_for_ack_it it = waiting_for_ack_.find(frame);
00216         *data_msg = *(it->second);
00217 
00218         stream_for_pop(*data_msg);
00219 
00220         // remove the message
00221         messages_.erase(it->second);
00222         // clear the acknowledgement map entry for this message
00223         waiting_for_ack_.erase(it);
00224         
00225     }
00226     else
00227     {
00228         return false;
00229     }
00230     
00231     return true;    
00232 }
00233 
00234 void goby::acomms::Queue::stream_for_pop(const protobuf::ModemDataTransmission& data_msg)
00235 {
00236     if(log_) *log_ << group("pop") <<  "popping" << " from send stack "
00237                    << cfg_.name() << " (qsize " << size()-1
00238                    <<  "/" << cfg_.max_queue() << "): "  << data_msg << std::endl;
00239 }
00240 
00241 std::vector<goby::acomms::protobuf::ModemDataTransmission> goby::acomms::Queue::expire()
00242 {
00243     std::vector<protobuf::ModemDataTransmission> expired_msgs;
00244     
00245     while(!messages_.empty())
00246     {
00247         if((goby::util::as<boost::posix_time::ptime>(messages_.front().base().time())
00248             + boost::posix_time::seconds(cfg_.ttl())) < goby_time())
00249         {
00250             expired_msgs.push_back(messages_.front());
00251             if(log_) *log_ << group("pop") <<  "expiring" << " from send stack "
00252                            << cfg_.name() << " (qsize " << size()-1
00253                            <<  "/" << cfg_.max_queue() << "): "  << messages_.front() << std::endl;
00254             // if we were waiting for an ack for this, erase that too
00255             waiting_for_ack_it it = find_ack_value(messages_.begin());
00256             if(it != waiting_for_ack_.end()) waiting_for_ack_.erase(it);
00257             
00258             messages_.pop_front();
00259         }
00260         else
00261         {
00262             return expired_msgs;
00263         } 
00264     }
00265 
00266     return expired_msgs;
00267 }
00268 
00269 waiting_for_ack_it goby::acomms::Queue::find_ack_value(messages_it it_to_find)
00270 {
00271     waiting_for_ack_it n = waiting_for_ack_.end();
00272     for(waiting_for_ack_it it = waiting_for_ack_.begin(); it != n; ++it)
00273     {
00274         if(it->second == it_to_find)
00275             return it;
00276     }
00277     return n;
00278 }
00279 
00280 
00281 std::string goby::acomms::Queue::summary() const 
00282 {
00283     std::stringstream ss;
00284     ss << cfg_;
00285     return ss.str();
00286 }
00287 
00288 
00289 void goby::acomms::Queue::flush()
00290 {
00291     if(log_) *log_ << group("pop") << "flushing stack " << cfg_.name() << " (qsize 0)" << std::endl;
00292     messages_.clear();
00293     waiting_for_ack_.clear();
00294 }        
00295 
00296 
00297 std::ostream& goby::acomms::operator<< (std::ostream& os, const goby::acomms::Queue& oq)
00298 {
00299     os << oq.summary();
00300     return os;
00301 }
00302 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends