diff --git a/build b/build index 8bfa2f5e..eaf7a13d 100644 --- a/build +++ b/build @@ -1 +1 @@ -86 \ No newline at end of file +87 \ No newline at end of file diff --git a/src/GWKafkaEvents.h b/src/GWKafkaEvents.h index 02075d93..5b74d766 100644 --- a/src/GWKafkaEvents.h +++ b/src/GWKafkaEvents.h @@ -50,12 +50,22 @@ namespace OpenWifi { class DeviceConfigurationChangeKafkaEvent : public GWKafkaEvents { public: DeviceConfigurationChangeKafkaEvent(std::uint64_t serialNumber, - std::uint64_t timestamp, const Poco::JSON::Object::Ptr config) + std::uint64_t timestamp, + const Poco::JSON::Object::Ptr config) : GWKafkaEvents(serialNumber, "unit.configuration_change", timestamp), config_(config) { } ~DeviceConfigurationChangeKafkaEvent() { - payload_->set("configuration", *config_); + if(config_!= nullptr) { + std::ostringstream os; + config_->stringify(os); + if(os.str().size()> KafkaManager()->KafkaManagerMaximumPayloadSize()) { + payload_->set("configuration", "{}"); + payload_->set("configurationTooBig", true); + } else { + payload_->set("configuration", *config_); + } + } Send(); } diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index f2936e7d..0d6aa7f7 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -79,8 +79,10 @@ namespace OpenWifi { Utils::SetThreadName("Kafka:Prod"); cppkafka::Configuration Config( {{"client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "")}, - {"metadata.broker.list", - MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")}}); + {"metadata.broker.list",MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")}, + {"send.buffer.bytes", KafkaManager()->KafkaManagerMaximumPayloadSize() } + } + ); AddKafkaSecurity(Config); @@ -275,6 +277,7 @@ namespace OpenWifi { int KafkaManager::Start() { if (!KafkaEnabled_) return 0; + MaxPayloadSize_ = MicroServiceConfigGetInt("openwifi.kafka.max.payload", 2500000); ConsumerThr_.Start(); ProducerThr_.Start(); return 0; diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index e6272d33..ba41a305 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -94,11 +94,14 @@ namespace OpenWifi { return ConsumerThr_.UnregisterTopicWatcher(Topic,Id); } + std::uint64_t KafkaManagerMaximumPayloadSize() const { return MaxPayloadSize_; } + private: bool KafkaEnabled_ = false; std::string SystemInfoWrapper_; KafkaProducer ProducerThr_; KafkaConsumer ConsumerThr_; + std::uint64_t MaxPayloadSize_ = 2500000; void PartitionAssignment(const cppkafka::TopicPartitionList &partitions); void PartitionRevocation(const cppkafka::TopicPartitionList &partitions);