From 9a20ea91ca2ce7e1bdc0e1cb60dc5e306a219470 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Fri, 10 Nov 2023 13:37:44 +0100 Subject: [PATCH] [occ] Try to make plugin unsubscribe from FairMQ at right time --- occ/plugin/OccFMQCommon.cxx | 5 ++++- occ/plugin/OccLiteServer.cxx | 6 +++++- occ/plugin/OccPluginServer.cxx | 12 ++++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/occ/plugin/OccFMQCommon.cxx b/occ/plugin/OccFMQCommon.cxx index 5e3f6d15f..ea457a950 100644 --- a/occ/plugin/OccFMQCommon.cxx +++ b/occ/plugin/OccFMQCommon.cxx @@ -148,7 +148,9 @@ std::tuple doTransition(fair: m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange); DEFER({ - m_pluginServices->UnsubscribeFromDeviceStateChange(id); + if ( !newStates.empty() && newStates.back() != "EXITING") { + m_pluginServices->UnsubscribeFromDeviceStateChange(id); + } }); try { @@ -220,6 +222,7 @@ std::tuple doTransition(fair: } if (newStates.back() == "EXITING") { + m_pluginServices->UnsubscribeFromDeviceStateChange(id); m_pluginServices->ReleaseDeviceControl(FMQ_CONTROLLER_NAME); OLOG(debug) << "releasing device control"; } diff --git a/occ/plugin/OccLiteServer.cxx b/occ/plugin/OccLiteServer.cxx index 418bfddf7..11a0bdb9e 100644 --- a/occ/plugin/OccLiteServer.cxx +++ b/occ/plugin/OccLiteServer.cxx @@ -122,10 +122,12 @@ OccLite::Service::EventStream(::grpc::ServerContext* context, const OccLite::nop std::mutex writer_mu; std::condition_variable finished; std::mutex finished_mu; + std::string last_known_state; auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) { std::lock_guard lock(writer_mu); auto state = fair::mq::PluginServices::ToStr(reachedState); + last_known_state = state; OLOG(debug) << "[EventStream] new state: " << state; @@ -147,7 +149,9 @@ OccLite::Service::EventStream(::grpc::ServerContext* context, const OccLite::nop m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange); DEFER({ - m_pluginServices->UnsubscribeFromDeviceStateChange(id); + if (last_known_state == "EXITING") { + m_pluginServices->UnsubscribeFromDeviceStateChange(id); + } }); { diff --git a/occ/plugin/OccPluginServer.cxx b/occ/plugin/OccPluginServer.cxx index 11b8c03d0..998146626 100644 --- a/occ/plugin/OccPluginServer.cxx +++ b/occ/plugin/OccPluginServer.cxx @@ -58,10 +58,12 @@ OccPluginServer::EventStream(grpc::ServerContext* context, std::mutex writer_mu; std::condition_variable finished; std::mutex finished_mu; + std::string last_known_state; auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) { std::lock_guard lock(writer_mu); auto state = fair::mq::PluginServices::ToStr(reachedState); + last_known_state = state OLOG(debug) << "[EventStream] new state: " << state; @@ -83,7 +85,9 @@ OccPluginServer::EventStream(grpc::ServerContext* context, m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange); DEFER({ - m_pluginServices->UnsubscribeFromDeviceStateChange(id); + if (last_known_state == "EXITING") { + m_pluginServices->UnsubscribeFromDeviceStateChange(id); + } }); { @@ -106,10 +110,12 @@ OccPluginServer::StateStream(grpc::ServerContext* context, std::mutex writer_mu; std::condition_variable finished; std::mutex finished_mu; + std::string last_known_state; auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) { std::lock_guard lock(writer_mu); auto state = fair::mq::PluginServices::ToStr(reachedState); + last_known_state = state pb::StateType sType = isIntermediateFMQState(state) ? pb::STATE_INTERMEDIATE : pb::STATE_STABLE; pb::StateStreamReply response; @@ -132,7 +138,9 @@ OccPluginServer::StateStream(grpc::ServerContext* context, m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange); DEFER({ - m_pluginServices->UnsubscribeFromDeviceStateChange(id); + if (last_known_state == "EXITING") { + m_pluginServices->UnsubscribeFromDeviceStateChange(id); + } }); {