Skip to content

Commit

Permalink
DPL: Delay "Maximum number of TF in flight reached" alarm by 15 secon…
Browse files Browse the repository at this point in the history
…ds in online mode
  • Loading branch information
davidrohr committed Mar 27, 2024
1 parent b75034e commit bdb41e3
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions Framework/Core/src/RateLimiter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,36 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
auto device = ctx.services().get<RawDeviceService>().device();
auto& deviceState = ctx.services().get<DeviceState>();
if (maxInFlight && device->GetChannels().count("metric-feedback")) {
int waitMessage = 0;
int recvTimeot = 0;
auto& dtc = ctx.services().get<DataTakingContext>();
const auto& device = ctx.services().get<RawDeviceService>().device();
const auto& deviceContext = ctx.services().get<DeviceContext>();
int timeout = deviceContext.exitTransitionTimeout;
bool timeout = deviceContext.exitTransitionTimeout;
bool timeoutForMessage = dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
bool waitMessage = false;
int recvTimeout = 0;
auto startTime = std::chrono::system_clock::now();
static constexpr float MESSAGE_DELAY_TIME = 15.f;
while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) {
if (recvTimeot != 0 && waitMessage == 0) {
if (recvTimeout != 0 && !waitMessage && (timeoutForMessage == false || std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::system_clock::now() - startTime).count() > MESSAGE_DELAY_TIME)) {
if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
LOG(alarm) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting";
} else {
LOG(info) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting";
}
waitMessage = 1;
waitMessage = true;
timeoutForMessage = false;
}
auto msg = device->NewMessageFor("metric-feedback", 0, 0);
int64_t count = 0;
do {
count = device->Receive(msg, "metric-feedback", 0, recvTimeot);
count = device->Receive(msg, "metric-feedback", 0, recvTimeout);
if (timeout && count <= 0 && device->NewStatePending()) {
return 1;
}
} while (count <= 0 && recvTimeot > 0);
} while (count <= 0 && recvTimeout > 0 && !timeoutForMessage);

if (count <= 0) {
recvTimeot = timeout ? -1 : 1000;
recvTimeout = timeout || timeoutForMessage ? -1 : 1000;
continue;
}
assert(msg->GetSize() == 8);
Expand All @@ -68,7 +72,7 @@ int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
LOG(important) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish";
} else {
LOG(important) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish";
LOG(info) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish";
}
}

Expand Down

0 comments on commit bdb41e3

Please sign in to comment.