Skip to content

Commit

Permalink
[occ] Try to make plugin unsubscribe from FairMQ at right time
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Nov 10, 2023
1 parent 3e8163f commit 9a20ea9
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
5 changes: 4 additions & 1 deletion occ/plugin/OccFMQCommon.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ std::tuple<OccLite::nopb::TransitionResponse, ::grpc::Status> doTransition(fair:

m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange);
DEFER({
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
if ( !newStates.empty() && newStates.back() != "EXITING") {
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
}
});

try {
Expand Down Expand Up @@ -220,6 +222,7 @@ std::tuple<OccLite::nopb::TransitionResponse, ::grpc::Status> doTransition(fair:
}

if (newStates.back() == "EXITING") {
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
m_pluginServices->ReleaseDeviceControl(FMQ_CONTROLLER_NAME);
OLOG(debug) << "releasing device control";
}
Expand Down
6 changes: 5 additions & 1 deletion occ/plugin/OccLiteServer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ OccLite::Service::EventStream(::grpc::ServerContext* context, const OccLite::nop
std::mutex writer_mu;
std::condition_variable finished;
std::mutex finished_mu;
std::string last_known_state;

auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) {
std::lock_guard<std::mutex> lock(writer_mu);
auto state = fair::mq::PluginServices::ToStr(reachedState);
last_known_state = state;

OLOG(debug) << "[EventStream] new state: " << state;

Expand All @@ -147,7 +149,9 @@ OccLite::Service::EventStream(::grpc::ServerContext* context, const OccLite::nop

m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange);
DEFER({
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
if (last_known_state == "EXITING") {
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
}
});

{
Expand Down
12 changes: 10 additions & 2 deletions occ/plugin/OccPluginServer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ OccPluginServer::EventStream(grpc::ServerContext* context,
std::mutex writer_mu;
std::condition_variable finished;
std::mutex finished_mu;
std::string last_known_state;

auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) {
std::lock_guard<std::mutex> lock(writer_mu);
auto state = fair::mq::PluginServices::ToStr(reachedState);
last_known_state = state

OLOG(debug) << "[EventStream] new state: " << state;

Expand All @@ -83,7 +85,9 @@ OccPluginServer::EventStream(grpc::ServerContext* context,

m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange);
DEFER({
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
if (last_known_state == "EXITING") {
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
}
});

{
Expand All @@ -106,10 +110,12 @@ OccPluginServer::StateStream(grpc::ServerContext* context,
std::mutex writer_mu;
std::condition_variable finished;
std::mutex finished_mu;
std::string last_known_state;

auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) {
std::lock_guard<std::mutex> lock(writer_mu);
auto state = fair::mq::PluginServices::ToStr(reachedState);
last_known_state = state
pb::StateType sType = isIntermediateFMQState(state) ? pb::STATE_INTERMEDIATE : pb::STATE_STABLE;

pb::StateStreamReply response;
Expand All @@ -132,7 +138,9 @@ OccPluginServer::StateStream(grpc::ServerContext* context,

m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange);
DEFER({
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
if (last_known_state == "EXITING") {
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
}
});

{
Expand Down

0 comments on commit 9a20ea9

Please sign in to comment.