Skip to content

Commit

Permalink
https://telecominfraproject.atlassian.net/browse/WIFI-12954
Browse files Browse the repository at this point in the history
Signed-off-by: stephb9959 <[email protected]>
  • Loading branch information
stephb9959 committed Sep 23, 2023
1 parent 7d9d5b4 commit 42d44b0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1 +1 @@
9
10
26 changes: 17 additions & 9 deletions src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,12 @@ namespace OpenWifi {
try {
auto Msg = dynamic_cast<KafkaMessage *>(Note.get());
if (Msg != nullptr) {
Producer.produce(cppkafka::MessageBuilder(Msg->Topic())
.key(Msg->Key())
.payload(Msg->Payload()));
auto NewMessage = cppkafka::MessageBuilder(Msg->Topic());
NewMessage.key(Msg->Key());
NewMessage.partition(0);
NewMessage.payload(Msg->Payload());
Producer.produce(NewMessage);
Producer.flush();
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
Expand Down Expand Up @@ -157,17 +160,19 @@ namespace OpenWifi {
});

bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 20);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100);

Types::StringVec Topics;
KafkaManager()->Topics(Topics);
Consumer.subscribe(Topics);

Running_ = true;
std::vector<cppkafka::Message> MsgVec;
while (Running_) {
try {
std::vector<cppkafka::Message> MsgVec =
Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100));
MsgVec.clear();
MsgVec.reserve(BatchSize);
MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000));
for (auto const &Msg : MsgVec) {
if (!Msg)
continue;
Expand All @@ -177,12 +182,12 @@ namespace OpenWifi {
fmt::format("Error: {}", Msg.get_error().to_string()));
}
if (!AutoCommit)
Consumer.async_commit(Msg);
Consumer.commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload());
if (!AutoCommit)
Consumer.async_commit(Msg);
Consumer.commit(Msg);
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
Expand Down Expand Up @@ -355,7 +360,10 @@ namespace OpenWifi {
}

[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
return SystemInfoWrapper_ + PayLoad + "}";
return fmt::format( R"lit({{ "system" : {{ "id" : {},
"host" : "{}" }},
"payload" : {} }})lit", MicroServiceID(),
MicroServicePrivateEndPoint(), PayLoad ) ;
}

uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,
Expand Down

0 comments on commit 42d44b0

Please sign in to comment.