25 #include "goby/common/logger.h" 26 #include "goby/common/logger/term_color.h" 27 #include "goby/common/zeromq_service.h" 29 #include "goby/pb/application.h" 31 #include "file_transfer_config.pb.h" 32 #include "goby/acomms/protobuf/file_transfer.pb.h" 53 int send_file(
const std::string& path);
62 std::cout <<
"Got ack for request: " << request.DebugString() << std::endl;
63 waiting_for_request_ack_ =
false;
71 MAX_FILE_TRANSFER_BYTES = 1024 * 1024
76 std::map<ModemId, std::map<int, protobuf::FileFragment> > receive_files_;
77 std::map<ModemId, protobuf::TransferRequest> requests_;
78 bool waiting_for_request_ack_;
83 int main(
int argc,
char* argv[])
86 goby::run<goby::acomms::FileTransfer>(argc, argv, &cfg);
92 : Application(cfg), cfg_(*cfg), waiting_for_request_ack_(
false)
94 glog.is(DEBUG1) &&
glog << cfg_.DebugString() << std::endl;
96 if (cfg_.action() != protobuf::FileTransferConfig::WAIT)
98 if (!cfg_.has_remote_id())
100 glog.is(WARN) &&
glog <<
"Must set remote_id modem ID for file destination." 104 if (!cfg_.has_local_file())
106 glog.is(WARN) &&
glog <<
"Must set local_file path." << std::endl;
109 if (!cfg_.has_remote_id())
111 glog.is(WARN) &&
glog <<
"Must set remote_file path." << std::endl;
115 const unsigned max_path = protobuf::TransferRequest::descriptor()
116 ->FindFieldByName(
"file")
118 .GetExtension(dccl::field)
120 if (cfg_.remote_file().size() > max_path)
122 glog.is(WARN) &&
glog <<
"remote_file full path must be less than " << max_path
123 <<
" characters." << std::endl;
128 subscribe(&FileTransfer::handle_ack,
this,
129 "QueueAckOrig" + goby::util::as<std::string>(cfg_.local_id()));
131 subscribe(&FileTransfer::handle_remote_transfer_request,
this,
132 "QueueRx" + goby::util::as<std::string>(cfg_.local_id()));
134 subscribe(&FileTransfer::handle_receive_fragment,
this,
135 "QueueRx" + goby::util::as<std::string>(cfg_.local_id()));
137 subscribe(&FileTransfer::handle_receive_response,
this,
138 "QueueRx" + goby::util::as<std::string>(cfg_.local_id()));
142 if (cfg_.action() == protobuf::FileTransferConfig::PUSH)
144 else if (cfg_.action() == protobuf::FileTransferConfig::PULL)
147 catch (protobuf::TransferResponse::ErrorCode& c)
149 glog.is(WARN) &&
glog <<
"File transfer action failed: " 150 << protobuf::TransferResponse::ErrorCode_Name(c) << std::endl;
154 catch (std::exception& e)
156 glog.is(WARN) &&
glog <<
"File transfer action failed: " << e.what() << std::endl;
162 void goby::acomms::FileTransfer::push_file()
165 request.set_src(cfg_.local_id());
166 request.set_dest(cfg_.remote_id());
167 request.set_push_or_pull(protobuf::TransferRequest::PUSH);
168 request.set_file(cfg_.remote_file());
170 publish(request,
"QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
173 waiting_for_request_ack_ =
true;
176 zeromq_service().poll(10000);
177 if (!waiting_for_request_ack_)
179 send_file(cfg_.local_file());
185 void goby::acomms::FileTransfer::pull_file()
188 request.set_src(cfg_.local_id());
189 request.set_dest(cfg_.remote_id());
190 request.set_push_or_pull(protobuf::TransferRequest::PULL);
191 request.set_file(cfg_.remote_file());
193 publish(request,
"QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
196 request.set_file(cfg_.local_file());
197 request.set_src(cfg_.remote_id());
198 request.set_dest(cfg_.local_id());
199 receive_files_[request.src()].clear();
200 requests_[request.src()] = request;
203 int goby::acomms::FileTransfer::send_file(
const std::string& path)
205 std::ifstream send_file(path.c_str(), std::ios::binary | std::ios::ate);
207 glog.is(VERBOSE) &&
glog <<
"Attempting to transfer: " << path << std::endl;
210 if (!send_file.is_open())
211 throw protobuf::TransferResponse::COULD_NOT_READ_FILE;
214 std::streampos size = send_file.tellg();
215 glog.is(VERBOSE) &&
glog <<
"File size: " << size << std::endl;
217 if (size > MAX_FILE_TRANSFER_BYTES)
219 glog.is(WARN) &&
glog <<
"File exceeds maximum supported size of " 220 << MAX_FILE_TRANSFER_BYTES <<
"B" << std::endl;
221 throw protobuf::TransferResponse::FILE_TOO_LARGE;
225 send_file.seekg(0, send_file.beg);
227 int size_acquired = 0;
230 int fragment_size = protobuf::FileFragment::descriptor()
231 ->FindFieldByName(
"data")
233 .GetExtension(dccl::field)
237 reference_fragment.set_src(cfg_.local_id());
238 reference_fragment.set_dest(cfg_.remote_id());
240 std::vector<protobuf::FileFragment> fragments(std::ceil((
double)size / fragment_size),
243 std::vector<protobuf::FileFragment>::iterator fragments_it = fragments.begin();
244 std::vector<char> buffer(fragment_size);
245 int fragment_idx = 0;
246 while (send_file.good())
249 send_file.read(&buffer[0], fragment_size);
250 int bytes_read = send_file.gcount();
251 size_acquired += bytes_read;
252 fragment.set_fragment(fragment_idx++);
253 if (size_acquired == size)
254 fragment.set_is_last_fragment(
true);
256 fragment.set_is_last_fragment(
false);
257 fragment.set_num_bytes(bytes_read);
259 fragment.set_data(std::string(buffer.begin(), buffer.begin() + bytes_read));
262 if (!send_file.eof())
263 throw protobuf::TransferResponse::ERROR_WHILE_READING;
269 for (
int i = 0, n = fragments.size(); i < n; ++i)
271 glog.is(VERBOSE) &&
glog << fragments[i].ShortDebugString() << std::endl;
272 publish(fragments[i],
"QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
278 goby::acomms::FileTransfer::~FileTransfer() {}
280 void goby::acomms::FileTransfer::handle_remote_transfer_request(
283 glog.is(VERBOSE) &&
glog <<
"Received remote transfer request: " << request.DebugString()
286 if (request.push_or_pull() == protobuf::TransferRequest::PUSH)
288 glog.is(VERBOSE) &&
glog <<
"Preparing to receive file..." << std::endl;
289 receive_files_[request.src()].clear();
291 else if (request.push_or_pull() == protobuf::TransferRequest::PULL)
294 response.set_src(request.dest());
295 response.set_dest(request.src());
298 response.set_num_fragments(send_file(request.file()));
299 response.set_transfer_successful(
true);
301 catch (protobuf::TransferResponse::ErrorCode& c)
303 glog.is(WARN) &&
glog <<
"File transfer action failed: " 304 << protobuf::TransferResponse::ErrorCode_Name(c) << std::endl;
305 response.set_transfer_successful(
false);
306 response.set_error(c);
310 catch (std::exception& e)
312 glog.is(WARN) &&
glog <<
"File transfer action failed: " << e.what() << std::endl;
316 response.set_transfer_successful(
false);
317 response.set_error(protobuf::TransferResponse::OTHER_ERROR);
319 publish(response,
"QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
321 requests_[request.src()] = request;
326 std::map<int, protobuf::FileFragment>& receive = receive_files_[fragment.src()];
328 receive.insert(std::make_pair(fragment.fragment(), fragment));
330 glog.is(VERBOSE) &&
glog <<
"Received fragment #" << fragment.fragment()
331 <<
", total received: " << receive.size() << std::endl;
333 if (receive.rbegin()->second.is_last_fragment())
335 if ((
int)receive.size() == receive.rbegin()->second.fragment() + 1)
338 response.set_src(requests_[fragment.src()].dest());
339 response.set_dest(requests_[fragment.src()].src());
343 glog.is(VERBOSE) &&
glog <<
"Received all fragments!" << std::endl;
344 glog.is(VERBOSE) &&
glog <<
"Writing to " << requests_[fragment.src()].file()
346 std::ofstream receive_file(requests_[fragment.src()].file().c_str(),
350 if (!receive_file.is_open())
351 throw(protobuf::TransferResponse::COULD_NOT_WRITE_FILE);
353 for (std::map<int, protobuf::FileFragment>::const_iterator it = receive.begin(),
356 { receive_file.write(it->second.data().c_str(), it->second.num_bytes()); }
358 receive_file.close();
359 response.set_transfer_successful(
true);
363 catch (protobuf::TransferResponse::ErrorCode& c)
365 glog.is(WARN) &&
glog <<
"File transfer action failed: " 366 << protobuf::TransferResponse::ErrorCode_Name(c) << std::endl;
367 response.set_transfer_successful(
false);
368 response.set_error(c);
373 catch (std::exception& e)
375 glog.is(WARN) &&
glog <<
"File transfer action failed: " << e.what() << std::endl;
379 response.set_transfer_successful(
false);
380 response.set_error(protobuf::TransferResponse::OTHER_ERROR);
382 publish(response,
"QueuePush" + goby::util::as<std::string>(cfg_.local_id()));
386 glog.is(VERBOSE) &&
glog <<
"Still waiting on some fragments..." << std::endl;
393 glog.is(VERBOSE) &&
glog <<
"Received response for file transfer: " << response.DebugString()
396 if (!response.transfer_successful())
397 glog.is(WARN) &&
glog <<
"Transfer failed: " 398 << protobuf::TransferResponse::ErrorCode_Name(response.error())
403 if (response.transfer_successful())
405 glog.is(VERBOSE) &&
glog <<
"File transfer completed successfully." << std::endl;
Base class provided for users to generate applications that participate in the Goby publish/subscribe...
double goby_time< double >()
Returns current UTC time as seconds and fractional seconds since 1970-01-01 00:00:00.
common::FlexOstream glog
Access the Goby logger through this object.
The global namespace for the Goby project.