25 #ifndef GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
26 #define GOBY_MIDDLEWARE_APPLICATION_MULTI_THREAD_H
28 #include <boost/core/demangle.hpp>
29 #include <boost/units/systems/si.hpp>
63 :
public Thread<boost::units::quantity<boost::units::si::frequency>, InterThreadTransporter>,
75 TimerThread(
const boost::units::quantity<boost::units::si::frequency>& freq)
82 void loop()
override { interthread_.template publish_empty<expire_group>(); }
84 InterThreadTransporter& interthread() {
return interthread_; }
87 InterThreadTransporter interthread_;
96 template <
class Config,
class Transporter>
101 struct ThreadManagement
103 ThreadManagement() =
default;
115 std::atomic<bool>
alive{
true};
118 std::unique_ptr<std::thread> thread;
121 static std::exception_ptr thread_exception_;
123 std::map<std::type_index, std::map<int, ThreadManagement>> threads_;
125 int running_thread_count_{0};
131 _launch_thread<ThreadType, Config, false, true>(-1, this->
app_cfg());
135 _launch_thread<ThreadType, Config, true, true>(
index, this->
app_cfg());
138 template <
typename ThreadType,
typename ThreadConfig>
141 _launch_thread<ThreadType, ThreadConfig, false, true>(-1,
cfg);
143 template <
typename ThreadType,
typename ThreadConfig>
146 _launch_thread<ThreadType, ThreadConfig, true, true>(
index,
cfg);
151 _launch_thread<ThreadType, Config, false, false>(-1, this->
app_cfg());
155 _launch_thread<ThreadType, Config, true, false>(
index, this->
app_cfg());
161 auto type_i = std::type_index(
typeid(ThreadType));
167 void launch_timer(boost::units::quantity<boost::units::si::frequency> freq,
168 std::function<
void()> on_expire)
170 launch_thread<goby::middleware::TimerThread<i>>(freq);
172 .template subscribe_empty<goby::middleware::TimerThread<i>::expire_group>(on_expire);
175 template <
int i>
void join_timer() { join_thread<goby::middleware::TimerThread<i>>(); }
189 interthread_.template subscribe<MainThreadBase::joinable_group_>(
191 { _join_thread(joinable.
type_i, joinable.
index); });
199 std::map<std::type_index, std::map<int, ThreadManagement>>&
threads() {
return threads_; }
203 if (running_thread_count_ > 0)
206 goby::glog <<
"Requesting that all remaining threads shutdown cleanly..."
214 while (running_thread_count_ > 0)
217 << running_thread_count_
218 <<
" threads." << std::endl;
235 catch (std::exception&
e)
238 goby::glog <<
"MultiThreadApplicationBase:: uncaught exception: " <<
e.what()
244 template <
typename ThreadType,
typename ThreadConfig,
bool has_index,
bool has_config>
245 void _launch_thread(
int index,
const ThreadConfig&
cfg);
247 void _join_thread(
const std::type_index& type_i,
int index);
257 Config, InterVehicleForwarder<InterProcessPortal<InterThreadTransporter>>>,
262 InterProcessPortal<InterThreadTransporter> interprocess_;
285 :
Base(loop_freq, &intervehicle_),
288 intervehicle_(interprocess_)
295 this->
interprocess().template subscribe<goby::middleware::groups::datum_update>(
302 this->
interprocess().template publish<goby::middleware::groups::configuration>(
305 if (this->
app_cfg().app().health_cfg().run_health_monitor_thread())
306 this->
template launch_thread_without_cfg<HealthMonitorThread>();
313 InterProcessPortal<InterThreadTransporter>&
interprocess() {
return interprocess_; }
316 return intervehicle_;
329 void preseed_hook(std::shared_ptr<protobuf::ProcessHealth>&
health_response)
override
332 for (
const auto& type_map_p : this->
threads())
334 for (
const auto& index_manager_p : type_map_p.second)
336 const auto& thread_manager = index_manager_p.second;
350 template <
class Config>
391 boost::units::quantity<boost::units::si::frequency> loop_freq = 0 * boost::units::si::hertz)
405 template <
class Config,
class Transporter>
409 template <
class Config,
class Transporter>
410 template <
typename ThreadType,
typename ThreadConfig,
bool has_index,
bool has_config>
412 int index,
const ThreadConfig& cfg)
414 std::type_index type_i = std::type_index(
typeid(ThreadType));
416 if (threads_[type_i].count(index) && threads_[type_i][index].alive)
417 throw(Exception(std::string(
"Thread of type: ") + type_i.name() +
" and index " +
420 auto& thread_manager = threads_[type_i][index];
421 thread_manager.alive =
true;
422 thread_manager.name = boost::core::demangle(
typeid(ThreadType).name());
425 thread_manager.uid = thread_uid_++;
428 auto thread_lambda = [
this, type_i, index, cfg, &thread_manager]()
432 pthread_setname_np(thread_manager.name.c_str());
436 std::shared_ptr<ThreadType> goby_thread(
437 detail::ThreadTypeSelector<ThreadType, ThreadConfig, has_index, has_config>::thread(
440 goby_thread->set_name(thread_manager.name);
441 goby_thread->set_type_index(type_i);
442 goby_thread->set_uid(thread_manager.uid);
443 goby_thread->run(thread_manager.alive);
447 thread_exception_ = std::current_exception();
450 interthread_.
publish<MainThreadBase::joinable_group_>(ThreadIdentifier{type_i, index});
453 thread_manager.thread = std::unique_ptr<std::thread>(
new std::thread(thread_lambda));
457 pthread_setname_np(thread_manager.thread->native_handle(), thread_manager.name.c_str());
460 ++running_thread_count_;
463 template <
class Config,
class Transporter>
465 const std::type_index& type_i,
int index)
467 if (!threads_.count(type_i) || !threads_[type_i].count(index))
468 throw(Exception(std::string(
"No thread of type: ") + type_i.name() +
" and index " +
471 if (threads_[type_i][index].thread)
474 goby::glog <<
"Joining thread: " << type_i.name() <<
" index " << index << std::endl;
476 threads_[type_i][index].alive =
false;
477 threads_[type_i][index].thread->join();
478 threads_[type_i][index].thread.reset();
479 --running_thread_count_;
482 goby::glog <<
"Joined thread: " << type_i.name() <<
" index " << index << std::endl;
484 if (thread_exception_)
487 goby::glog <<
"Thread type: " << type_i.name() <<
", index: " << index
488 <<
" had an uncaught exception" << std::endl;
489 std::rethrow_exception(thread_exception_);
495 goby::glog <<
"Already joined thread: " << type_i.name() <<
" index " << index