24 #include <boost/filesystem.hpp> 26 #include "goby/acomms/acomms_constants.h" 27 #include "goby/acomms/protobuf/store_server.pb.h" 28 #include "goby/common/time.h" 29 #include "goby/common/zeromq_application_base.h" 30 #include "goby/pb/protobuf_node.h" 31 #include "goby/util/binary.h" 32 #include "goby_store_server_config.pb.h" 56 void check(
int rc,
const std::string& error_prefix);
64 std::map<int, uint64> last_request_time_;
71 int main(
int argc,
char* argv[])
74 goby::run<goby::acomms::GobyStoreServer>(argc, argv, &cfg);
78 : ZeroMQApplicationBase(&zeromq_service_, cfg), StaticProtobufNode(&zeromq_service_),
82 if (!boost::filesystem::exists(cfg_.db_file_dir()))
83 throw(
goby::Exception(
"db_file_dir does not exist: " + cfg_.db_file_dir()));
85 std::string full_db_name = cfg_.db_file_dir() +
"/";
86 if (cfg_.has_db_file_name())
87 full_db_name += cfg_.db_file_name();
92 rc = sqlite3_open(full_db_name.c_str(), &db_);
94 throw(
goby::Exception(
"Can't open database: " + std::string(sqlite3_errmsg(db_))));
98 rc = sqlite3_exec(db_,
99 "CREATE TABLE IF NOT EXISTS ModemTransmission (id INTEGER PRIMARY KEY ASC " 100 "AUTOINCREMENT, src INTEGER, dest INTEGER, microtime INTEGER, bytes BLOB);",
105 std::string error(errmsg);
106 sqlite3_free(errmsg);
112 on_receipt<protobuf::StoreServerRequest>(cfg_.reply_socket().socket_id(),
113 &GobyStoreServer::handle_request,
this);
117 service_cfg.add_socket()->CopyFrom(cfg_.reply_socket());
118 zeromq_service_.set_cfg(service_cfg);
121 void goby::acomms::GobyStoreServer::loop() {}
125 glog.is(DEBUG1) &&
glog <<
"Got request: " << request.DebugString() << std::endl;
130 response.set_modem_id(request.modem_id());
133 for (
int i = 0, n = request.outbox_size(); i < n; ++i)
135 glog.is(DEBUG1) &&
glog <<
"Trying to insert (size: " << request.outbox(i).ByteSize()
136 <<
"): " << request.outbox(i).DebugString() << std::endl;
138 sqlite3_stmt* insert;
143 "INSERT INTO ModemTransmission (src, dest, microtime, bytes) VALUES (?, ?, ?, ?);",
145 "Insert statement preparation failed");
146 check(sqlite3_bind_int(insert, 1, request.outbox(i).src()),
"Insert `src` binding failed");
147 check(sqlite3_bind_int(insert, 2, request.outbox(i).dest()),
148 "Insert `dest` binding failed");
150 "Insert `microtime` binding failed");
153 request.outbox(i).SerializeToString(&bytes);
157 check(sqlite3_bind_blob(insert, 4, bytes.data(), bytes.size(), SQLITE_STATIC),
158 "Insert `bytes` binding failed");
160 check(sqlite3_step(insert),
"Insert step failed");
161 check(sqlite3_finalize(insert),
"Insert statement finalize failed");
163 glog.is(DEBUG1) &&
glog <<
"Insert successful." << std::endl;
167 glog.is(DEBUG1) &&
glog <<
"Trying to select for dest: " << request.modem_id() << std::endl;
169 if (!last_request_time_.count(request.modem_id()))
170 last_request_time_.insert(std::make_pair(request.modem_id(), 0));
172 sqlite3_stmt* select;
173 check(sqlite3_prepare(db_,
174 "SELECT bytes FROM ModemTransmission WHERE src != ?1 AND (microtime > ?2 " 175 "AND microtime <= ?3 );",
177 "Select statement preparation failed");
179 check(sqlite3_bind_int(select, 1, request.modem_id()),
180 "Select request modem_id binding failed");
181 check(sqlite3_bind_int64(select, 2, last_request_time_[request.modem_id()]),
182 "Select `microtime` last time binding failed");
183 check(sqlite3_bind_int64(select, 3, request_time),
184 "Select `microtime` this time binding failed");
186 int rc = sqlite3_step(select);
187 while (rc == SQLITE_ROW)
193 const unsigned char* bytes = sqlite3_column_text(select, 0);
194 int num_bytes = sqlite3_column_bytes(select, 0);
200 response.add_inbox()->ParseFromArray(bytes, num_bytes);
201 glog.is(DEBUG1) &&
glog <<
"Got message for inbox (size: " << num_bytes <<
"): " 202 << response.inbox(response.inbox_size() - 1).DebugString()
204 rc = sqlite3_step(select);
208 default: check(rc,
"Select step failed");
break;
212 check(sqlite3_finalize(select),
"Select statement finalize failed");
213 glog.is(DEBUG1) &&
glog <<
"Select successful." << std::endl;
215 last_request_time_[request.modem_id()] = request_time;
217 send(response, cfg_.reply_socket().socket_id());
220 void goby::acomms::GobyStoreServer::check(
int rc,
const std::string& error_prefix)
222 if (rc != SQLITE_OK && rc != SQLITE_DONE)
223 throw(
goby::Exception(error_prefix +
": " + std::string(sqlite3_errmsg(db_))));
uint64 goby_time< uint64 >()
Returns current UTC time as integer microseconds since 1970-01-01 00:00:00.
ReturnType goby_time()
Returns current UTC time as a boost::posix_time::ptime.
std::string goby_file_timestamp()
ISO string representation of goby_time()
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.
google::protobuf::uint64 uint64
an unsigned 64 bit integer
simple exception class for goby applications