diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 96057369..7db910dc 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -107,7 +107,16 @@ namespace OpenWifi { NewMessage.partition(0); NewMessage.payload(Msg->Payload()); Producer.produce(NewMessage); - Producer.flush(); + if (Queue_.size() < 100) { + // use flush when internal queue is lightly loaded, i.e. flush after each + // message + Producer.flush(); + } + else { + // use poll when internal queue is loaded to allow messages to be sent in + // batches + Producer.poll((std::chrono::milliseconds) 0); + } } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -324,4 +333,4 @@ namespace OpenWifi { partitions.front().get_partition())); } -} // namespace OpenWifi \ No newline at end of file +} // namespace OpenWifi