From c7ed7fb2641e9ba86f0db3d19dd3b5f1aaae6bf6 Mon Sep 17 00:00:00 2001 From: Ivan Chvets Date: Tue, 11 Jun 2024 11:15:42 -0400 Subject: [PATCH 1/3] fix: modified kafka manager to use poll in producer https://telecominfraproject.atlassian.net/browse/WIFI-13597 Summary of changes: - Modified code in KafkaManager to use poll instead of flush for every messages sent. flush is used only on empty internal notification queue in idle times. Signed-off-by: Ivan Chvets --- src/framework/KafkaManager.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 96057369..38cdcb12 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -107,7 +107,7 @@ namespace OpenWifi { NewMessage.partition(0); NewMessage.payload(Msg->Payload()); Producer.produce(NewMessage); - Producer.flush(); + Producer.poll((std::chrono::milliseconds) 0); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -117,6 +117,10 @@ namespace OpenWifi { } catch (...) { poco_error(Logger_, "std::exception"); } + if (Queue_.size() == 0) { + // message queue is empty, flush all previously sent messages + Producer.flush(); + } Note = Queue_.waitDequeueNotification(); } poco_information(Logger_, "Stopped..."); From b118dcbcec87a735505504decf97d3b2001201b2 Mon Sep 17 00:00:00 2001 From: Ivan Chvets Date: Fri, 14 Jun 2024 16:30:23 -0400 Subject: [PATCH 2/3] fix: added flush() for proper shutdown https://telecominfraproject.atlassian.net/browse/WIFI-13597 Signed-off-by: Ivan Chvets --- src/framework/KafkaManager.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 38cdcb12..c3983684 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -123,6 +123,7 @@ namespace OpenWifi { } Note = Queue_.waitDequeueNotification(); } + Producer.flush(); poco_information(Logger_, "Stopped..."); } @@ -328,4 +329,4 @@ namespace OpenWifi { partitions.front().get_partition())); } -} // namespace OpenWifi \ No newline at end of file +} // namespace OpenWifi From 3c15c6dc4fdc590807e097500a0bd7a3618ce45b Mon Sep 17 00:00:00 2001 From: Ivan Chvets Date: Wed, 19 Jun 2024 16:29:24 -0400 Subject: [PATCH 3/3] fix: modified code to use flush() when internal queue is not loaded https://telecominfraproject.atlassian.net/browse/WIFI-13597 Signed-off-by: Ivan Chvets --- src/framework/KafkaManager.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index c3983684..c72b2956 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -107,7 +107,16 @@ namespace OpenWifi { NewMessage.partition(0); NewMessage.payload(Msg->Payload()); Producer.produce(NewMessage); - Producer.poll((std::chrono::milliseconds) 0); + if (Queue_.size() < 100) { + // use flush when internal queue is lightly loaded, i.e. flush after each + // message + Producer.flush(); + } + else { + // use poll when internal queue is loaded to allow messages to be sent in + // batches + Producer.poll((std::chrono::milliseconds) 0); + } } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_,