diff --git a/Framework/Core/src/RateLimiter.cxx b/Framework/Core/src/RateLimiter.cxx index 5f6ff24adfd7c..0023fb1322958 100644 --- a/Framework/Core/src/RateLimiter.cxx +++ b/Framework/Core/src/RateLimiter.cxx @@ -33,32 +33,36 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) auto device = ctx.services().get().device(); auto& deviceState = ctx.services().get(); if (maxInFlight && device->GetChannels().count("metric-feedback")) { - int waitMessage = 0; - int recvTimeot = 0; auto& dtc = ctx.services().get(); const auto& device = ctx.services().get().device(); const auto& deviceContext = ctx.services().get(); - int timeout = deviceContext.exitTransitionTimeout; + bool timeout = deviceContext.exitTransitionTimeout; + bool timeoutForMessage = dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS; + bool waitMessage = false; + int recvTimeout = 0; + auto startTime = std::chrono::system_clock::now(); + static constexpr float MESSAGE_DELAY_TIME = 15.f; while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) { - if (recvTimeot != 0 && waitMessage == 0) { + if (recvTimeout != 0 && !waitMessage && (timeoutForMessage == false || std::chrono::duration_cast>(std::chrono::system_clock::now() - startTime).count() > MESSAGE_DELAY_TIME)) { if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) { LOG(alarm) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting"; } else { LOG(info) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting"; } - waitMessage = 1; + waitMessage = true; + timeoutForMessage = false; } auto msg = device->NewMessageFor("metric-feedback", 0, 0); int64_t count = 0; do { - count = device->Receive(msg, "metric-feedback", 0, recvTimeot); + count = device->Receive(msg, "metric-feedback", 0, recvTimeout); if (timeout && count <= 0 && device->NewStatePending()) { return 1; } - } while (count <= 0 && recvTimeot > 0); + } while (count <= 0 && recvTimeout > 0 && !timeoutForMessage); if (count <= 0) { - recvTimeot = timeout ? -1 : 1000; + recvTimeout = timeout || timeoutForMessage ? -1 : 1000; continue; } assert(msg->GetSize() == 8); @@ -68,7 +72,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM) if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) { LOG(important) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish"; } else { - LOG(important) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish"; + LOG(info) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish"; } }