From 4ae0b99f55e3859d6e93e75b9752b266b26582ff Mon Sep 17 00:00:00 2001 From: Ivan Chvets Date: Wed, 19 Jun 2024 16:26:56 -0400 Subject: [PATCH] fix: modified code to use flush() when internal queue is not loaded https://telecominfraproject.atlassian.net/browse/WIFI-13597 NOTE: This is port of https://github.com/Telecominfraproject/wlan-cloud-ucentralgw/pull/362 Signed-off-by: Ivan Chvets --- src/framework/KafkaManager.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index c398368..c72b295 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -107,7 +107,16 @@ namespace OpenWifi { NewMessage.partition(0); NewMessage.payload(Msg->Payload()); Producer.produce(NewMessage); - Producer.poll((std::chrono::milliseconds) 0); + if (Queue_.size() < 100) { + // use flush when internal queue is lightly loaded, i.e. flush after each + // message + Producer.flush(); + } + else { + // use poll when internal queue is loaded to allow messages to be sent in + // batches + Producer.poll((std::chrono::milliseconds) 0); + } } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_,