Skip to content

Commit

Permalink
Merge branch 'master' into v3.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
i-chvets committed Jul 12, 2024
2 parents 1d2e943 + 5d89107 commit ea90cf4
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,16 @@ namespace OpenWifi {
NewMessage.partition(0);
NewMessage.payload(Msg->Payload());
Producer.produce(NewMessage);
Producer.flush();
if (Queue_.size() < 100) {
// use flush when internal queue is lightly loaded, i.e. flush after each
// message
Producer.flush();
}
else {
// use poll when internal queue is loaded to allow messages to be sent in
// batches
Producer.poll((std::chrono::milliseconds) 0);
}
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
Expand All @@ -117,8 +126,13 @@ namespace OpenWifi {
} catch (...) {
poco_error(Logger_, "std::exception");
}
if (Queue_.size() == 0) {
// message queue is empty, flush all previously sent messages
Producer.flush();
}
Note = Queue_.waitDequeueNotification();
}
Producer.flush();
poco_information(Logger_, "Stopped...");
}

Expand Down Expand Up @@ -324,4 +338,4 @@ namespace OpenWifi {
partitions.front().get_partition()));
}

} // namespace OpenWifi
} // namespace OpenWifi

0 comments on commit ea90cf4

Please sign in to comment.