Note: Goby version 1 (shown here) is now considered obsolete. Please use version 2 for new projects, and consider upgrading old projects.

Goby Underwater Autonomy Project  Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
acomms/libqueue/queue_manager.h
00001 // copyright 2009 t. schneider tes@mit.edu 
00002 //
00003 // this file is part of the Queue Library (libqueue),
00004 // the goby-acomms message queue manager. goby-acomms is a collection of 
00005 // libraries for acoustic underwater networking
00006 //
00007 // This program is free software: you can redistribute it and/or modify
00008 // it under the terms of the GNU General Public License as published by
00009 // the Free Software Foundation, either version 3 of the License, or
00010 // (at your option) any later version.
00011 //
00012 // This software is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License
00018 // along with this software.  If not, see <http://www.gnu.org/licenses/>.
00019 
00020 
00021 #ifndef QueueManager20091204_H
00022 #define QueueManager20091204_H
00023 
00024 #include <limits>
00025 #include <set>
00026 #include <boost/bind.hpp>
00027 #include <boost/signal.hpp>
00028 
00029 #include "goby/acomms/dccl.h"
00030 #include "goby/protobuf/queue.pb.h"
00031 #include "goby/protobuf/acomms_proto_helpers.h"
00032 
00033 #include <map>
00034 #include <deque>
00035 
00036 #include "queue_exception.h"
00037 #include "queue.h"
00038 
00039 
00040 namespace goby
00041 {
00042     namespace util
00043     {
00044         class FlexOstream;
00045     }
00046         
00047     
00048     namespace acomms
00049     {
00054         class QueueManager
00055         {
00056           public:
00058 
00059 
00060 
00061 
00062             QueueManager(std::ostream* log = 0);
00063 
00065             ~QueueManager() { }
00066         
00068         
00072 
00073 
00075             void set_cfg(const protobuf::QueueManagerConfig& cfg);
00076 
00078             void merge_cfg(const protobuf::QueueManagerConfig& cfg);
00079 
00081             
00083 
00084 
00086             static void add_flex_groups(util::FlexOstream* tout);
00087             
00088             
00090 
00095 
00096 
00101             void push_message(const protobuf::ModemDataTransmission& new_message);        
00102 
00106             void flush_queue(const protobuf::QueueFlush& flush);
00108         
00112 
00113 
00120             void handle_modem_data_request(const protobuf::ModemDataRequest& msg_request, protobuf::ModemDataTransmission* msg_data);
00121 
00126             void handle_modem_receive(const protobuf::ModemDataTransmission& message);
00127         
00132             void handle_modem_ack(const protobuf::ModemDataAck& message);
00133         
00135 
00136 
00140 
00141 
00142             void do_work();
00143 
00145         
00148 
00149 
00151             std::string summary() const;
00152             const ManipulatorManager& manip_manager() const { return manip_manager_; }
00153 
00154             
00155             
00157         
00159 
00160 
00161 
00162 
00163             boost::signal<void (const protobuf::ModemDataAck& ack_msg)> signal_ack;
00167             boost::signal<void (const protobuf::ModemDataTransmission& msg)> signal_receive;
00168 
00172             boost::signal<void (const protobuf::ModemDataTransmission& msg)> signal_receive_ccl;
00173             
00177             boost::signal<void (const protobuf::ModemDataExpire& expire_msg)> signal_expire;
00178             
00183             boost::signal<void (const protobuf::ModemDataRequest& request_msg,
00184                                 protobuf::ModemDataTransmission* data_msg)> signal_data_on_demand;
00185 
00189             boost::signal<void (protobuf::QueueSize size)> signal_queue_size_change;
00191 
00196 
00197 
00199 
00200 
00201             
00202           private:
00203             friend class Queue;
00204             static int modem_id_;
00205             
00206 
00210             std::set<unsigned> add_xml_queue_file(const std::string& xml_file);
00211 
00215             void add_queue(const protobuf::QueueConfig& cfg);
00216         
00217         
00218             
00219             void qsize(Queue* q);
00220         
00221             // finds the %queue with the highest priority
00222             Queue* find_next_sender(const protobuf::ModemDataRequest& message, const protobuf::ModemDataTransmission& data_msg, bool first_user_frame);
00223         
00224             // combine multiple "user" frames into a single "modem" frame
00225             bool stitch_recursive(const protobuf::ModemDataRequest& request_msg, protobuf::ModemDataTransmission* data_msg, Queue* winning_var);
00226             bool unstitch_recursive(std::string* data, protobuf::ModemDataTransmission* message);
00227 
00228             void replace_header(bool is_last_user_frame, protobuf::ModemDataTransmission* data_msg, const protobuf::ModemDataTransmission& next_data_msg, const std::string& new_data);
00229        
00230             // clears the destination and ack values for the packet to reset for next $CADRQ
00231             void clear_packet();
00232             
00233             // slave function to receive_incoming_modem_data that actually writes a piece of the message (called for each user-frame)
00234             bool publish_incoming_piece(protobuf::ModemDataTransmission* message, const unsigned incoming_var_id);
00235             
00236         
00237             void process_cfg();
00238             
00239           private:
00240             std::map<goby::acomms::protobuf::QueueKey, Queue> queues_;
00241             
00242             std::ostream* log_;
00243 
00244             // map frame number onto %queue pointer that contains
00245             // the data for this ack
00246             std::multimap<unsigned, Queue*> waiting_for_ack_;
00247 
00248             // the first *user* frame sets the tone (dest & ack) for the entire packet (all %modem frames)
00249             unsigned packet_ack_;
00250             int packet_dest_;
00251  
00252             protobuf::QueueManagerConfig cfg_;
00253             
00254             ManipulatorManager manip_manager_;
00255         };
00256 
00258         std::ostream& operator<< (std::ostream& out, const QueueManager& d);
00259 
00260         
00261     }
00262 
00263 }
00264 
00265 
00266 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends