Goby Underwater Autonomy Project
Series: 1.1, revision: 163, released on 2013-02-06 14:23:27 -0500
|
00001 // copyright 2009 t. schneider tes@mit.edu 00002 // 00003 // this file is part of the Queue Library (libqueue), 00004 // the goby-acomms message queue manager. goby-acomms is a collection of 00005 // libraries for acoustic underwater networking 00006 // 00007 // This program is free software: you can redistribute it and/or modify 00008 // it under the terms of the GNU General Public License as published by 00009 // the Free Software Foundation, either version 3 of the License, or 00010 // (at your option) any later version. 00011 // 00012 // This software is distributed in the hope that it will be useful, 00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 // GNU General Public License for more details. 00016 // 00017 // You should have received a copy of the GNU General Public License 00018 // along with this software. If not, see <http://www.gnu.org/licenses/>. 00019 00020 00021 #ifndef QueueManager20091204_H 00022 #define QueueManager20091204_H 00023 00024 #include <limits> 00025 #include <set> 00026 #include <boost/bind.hpp> 00027 #include <boost/signal.hpp> 00028 00029 #include "goby/acomms/dccl.h" 00030 #include "goby/protobuf/queue.pb.h" 00031 #include "goby/protobuf/acomms_proto_helpers.h" 00032 00033 #include <map> 00034 #include <deque> 00035 00036 #include "queue_exception.h" 00037 #include "queue.h" 00038 00039 00040 namespace goby 00041 { 00042 namespace util 00043 { 00044 class FlexOstream; 00045 } 00046 00047 00048 namespace acomms 00049 { 00054 class QueueManager 00055 { 00056 public: 00058 00059 00060 00061 00062 QueueManager(std::ostream* log = 0); 00063 00065 ~QueueManager() { } 00066 00068 00072 00073 00075 void set_cfg(const protobuf::QueueManagerConfig& cfg); 00076 00078 void merge_cfg(const protobuf::QueueManagerConfig& cfg); 00079 00081 00083 00084 00086 static void add_flex_groups(util::FlexOstream* tout); 00087 00088 00090 00095 00096 00101 void push_message(const protobuf::ModemDataTransmission& new_message); 00102 00106 void flush_queue(const protobuf::QueueFlush& flush); 00108 00112 00113 00120 void handle_modem_data_request(const protobuf::ModemDataRequest& msg_request, protobuf::ModemDataTransmission* msg_data); 00121 00126 void handle_modem_receive(const protobuf::ModemDataTransmission& message); 00127 00132 void handle_modem_ack(const protobuf::ModemDataAck& message); 00133 00135 00136 00140 00141 00142 void do_work(); 00143 00145 00148 00149 00151 std::string summary() const; 00152 const ManipulatorManager& manip_manager() const { return manip_manager_; } 00153 00154 00155 00157 00159 00160 00161 00162 00163 boost::signal<void (const protobuf::ModemDataAck& ack_msg)> signal_ack; 00167 boost::signal<void (const protobuf::ModemDataTransmission& msg)> signal_receive; 00168 00172 boost::signal<void (const protobuf::ModemDataTransmission& msg)> signal_receive_ccl; 00173 00177 boost::signal<void (const protobuf::ModemDataExpire& expire_msg)> signal_expire; 00178 00183 boost::signal<void (const protobuf::ModemDataRequest& request_msg, 00184 protobuf::ModemDataTransmission* data_msg)> signal_data_on_demand; 00185 00189 boost::signal<void (protobuf::QueueSize size)> signal_queue_size_change; 00191 00196 00197 00199 00200 00201 00202 private: 00203 friend class Queue; 00204 static int modem_id_; 00205 00206 00210 std::set<unsigned> add_xml_queue_file(const std::string& xml_file); 00211 00215 void add_queue(const protobuf::QueueConfig& cfg); 00216 00217 00218 00219 void qsize(Queue* q); 00220 00221 // finds the %queue with the highest priority 00222 Queue* find_next_sender(const protobuf::ModemDataRequest& message, const protobuf::ModemDataTransmission& data_msg, bool first_user_frame); 00223 00224 // combine multiple "user" frames into a single "modem" frame 00225 bool stitch_recursive(const protobuf::ModemDataRequest& request_msg, protobuf::ModemDataTransmission* data_msg, Queue* winning_var); 00226 bool unstitch_recursive(std::string* data, protobuf::ModemDataTransmission* message); 00227 00228 void replace_header(bool is_last_user_frame, protobuf::ModemDataTransmission* data_msg, const protobuf::ModemDataTransmission& next_data_msg, const std::string& new_data); 00229 00230 // clears the destination and ack values for the packet to reset for next $CADRQ 00231 void clear_packet(); 00232 00233 // slave function to receive_incoming_modem_data that actually writes a piece of the message (called for each user-frame) 00234 bool publish_incoming_piece(protobuf::ModemDataTransmission* message, const unsigned incoming_var_id); 00235 00236 00237 void process_cfg(); 00238 00239 private: 00240 std::map<goby::acomms::protobuf::QueueKey, Queue> queues_; 00241 00242 std::ostream* log_; 00243 00244 // map frame number onto %queue pointer that contains 00245 // the data for this ack 00246 std::multimap<unsigned, Queue*> waiting_for_ack_; 00247 00248 // the first *user* frame sets the tone (dest & ack) for the entire packet (all %modem frames) 00249 unsigned packet_ack_; 00250 int packet_dest_; 00251 00252 protobuf::QueueManagerConfig cfg_; 00253 00254 ManipulatorManager manip_manager_; 00255 }; 00256 00258 std::ostream& operator<< (std::ostream& out, const QueueManager& d); 00259 00260 00261 } 00262 00263 } 00264 00265 00266 #endif