Skip to content

Commit

Permalink
https://telecominfraproject.atlassian.net/browse/WIFI-13450
Browse files Browse the repository at this point in the history
Signed-off-by: stephb9959 <[email protected]>
  • Loading branch information
stephb9959 committed Feb 29, 2024
1 parent 2bf60db commit 3d8f7c1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1 +1 @@
86
87
14 changes: 12 additions & 2 deletions src/GWKafkaEvents.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,22 @@ namespace OpenWifi {
class DeviceConfigurationChangeKafkaEvent : public GWKafkaEvents {
public:
DeviceConfigurationChangeKafkaEvent(std::uint64_t serialNumber,
std::uint64_t timestamp, const Poco::JSON::Object::Ptr config)
std::uint64_t timestamp,
const Poco::JSON::Object::Ptr config)
: GWKafkaEvents(serialNumber, "unit.configuration_change", timestamp), config_(config) {
}

~DeviceConfigurationChangeKafkaEvent() {
payload_->set("configuration", *config_);
if(config_!= nullptr) {
std::ostringstream os;
config_->stringify(os);
if(os.str().size()> KafkaManager()->KafkaManagerMaximumPayloadSize()) {
payload_->set("configuration", "{}");
payload_->set("configurationTooBig", true);
} else {
payload_->set("configuration", *config_);
}
}
Send();
}

Expand Down
7 changes: 5 additions & 2 deletions src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ namespace OpenWifi {
Utils::SetThreadName("Kafka:Prod");
cppkafka::Configuration Config(
{{"client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "")},
{"metadata.broker.list",
MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")}});
{"metadata.broker.list",MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")},
{"send.buffer.bytes", KafkaManager()->KafkaManagerMaximumPayloadSize() }
}
);

AddKafkaSecurity(Config);

Expand Down Expand Up @@ -275,6 +277,7 @@ namespace OpenWifi {
int KafkaManager::Start() {
if (!KafkaEnabled_)
return 0;
MaxPayloadSize_ = MicroServiceConfigGetInt("openwifi.kafka.max.payload", 2500000);
ConsumerThr_.Start();
ProducerThr_.Start();
return 0;
Expand Down
3 changes: 3 additions & 0 deletions src/framework/KafkaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ namespace OpenWifi {
return ConsumerThr_.UnregisterTopicWatcher(Topic,Id);
}

std::uint64_t KafkaManagerMaximumPayloadSize() const { return MaxPayloadSize_; }

private:
bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_;
KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_;
std::uint64_t MaxPayloadSize_ = 2500000;

void PartitionAssignment(const cppkafka::TopicPartitionList &partitions);
void PartitionRevocation(const cppkafka::TopicPartitionList &partitions);
Expand Down

0 comments on commit 3d8f7c1

Please sign in to comment.