diff --git a/src/Kafka_ProvUpdater.h b/src/Kafka_ProvUpdater.h index ea26bd3..5a4204d 100644 --- a/src/Kafka_ProvUpdater.h +++ b/src/Kafka_ProvUpdater.h @@ -39,9 +39,7 @@ namespace OpenWifi { Poco::JSON::Object Payload; obj.to_json(Payload); Payload.set("ObjectType", OT); - std::ostringstream OS; - Payload.stringify(OS); - KafkaManager()->PostMessage(KafkaTopics::PROVISIONING_CHANGE, Ops[op], std::make_shared(OS.str())); + KafkaManager()->PostMessage(KafkaTopics::PROVISIONING_CHANGE, Ops[op], Payload); return true; } diff --git a/src/framework/EventBusManager.cpp b/src/framework/EventBusManager.cpp index 28d8037..ca28ad9 100644 --- a/src/framework/EventBusManager.cpp +++ b/src/framework/EventBusManager.cpp @@ -14,18 +14,18 @@ namespace OpenWifi { void EventBusManager::run() { Running_ = true; Utils::SetThreadName("fmwk:EventMgr"); - auto Msg = std::make_shared(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN)); + auto Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN)); KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg, false); while (Running_) { Poco::Thread::trySleep((unsigned long)MicroServiceDaemonBusTimer()); if (!Running_) break; - Msg = std::make_shared(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE)); + Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE)); KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg, false); } - Msg = std::make_shared(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE)); + Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE)); KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg, false); }; diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 09527a5..d32833e 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -180,7 +180,7 @@ namespace OpenWifi { Consumer.async_commit(Msg); continue; } - KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), std::make_shared(Msg.get_payload())); + KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload()); if (!AutoCommit) Consumer.async_commit(Msg); } @@ -213,7 +213,7 @@ namespace OpenWifi { } void KafkaProducer::Produce(const char *Topic, const std::string &Key, - std::shared_ptr Payload) { + const std::string &Payload) { std::lock_guard G(Mutex_); Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload)); } @@ -276,7 +276,7 @@ namespace OpenWifi { } void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key, - const std::shared_ptr Payload) { + const std::string & Payload) { std::lock_guard G(Mutex_); auto It = Notifiers_.find(Topic); if (It != Notifiers_.end()) { @@ -333,20 +333,29 @@ namespace OpenWifi { } void KafkaManager::PostMessage(const char *topic, const std::string &key, - const std::shared_ptr PayLoad, bool WrapMessage) { + const std::string & PayLoad, bool WrapMessage) { if (KafkaEnabled_) { ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(PayLoad) : PayLoad); } } + void KafkaManager::PostMessage(const char *topic, const std::string &key, + const Poco::JSON::Object &Object, bool WrapMessage) { + if (KafkaEnabled_) { + std::ostringstream ObjectStr; + Object.stringify(ObjectStr); + ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(ObjectStr.str()) : ObjectStr.str()); + } + } + + void KafkaManager::Dispatch(const char *Topic, const std::string &Key, - const std::shared_ptr Payload) { + const std::string &Payload) { Dispatcher_.Dispatch(Topic, Key, Payload); } - [[nodiscard]] const std::shared_ptr KafkaManager::WrapSystemId(const std::shared_ptr PayLoad) { - *PayLoad = SystemInfoWrapper_ + *PayLoad + "}"; - return PayLoad; + [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { + return SystemInfoWrapper_ + PayLoad + "}"; } uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index 3ef5940..31cf093 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -6,7 +6,7 @@ #include "Poco/Notification.h" #include "Poco/NotificationQueue.h" - +#include "Poco/JSON/Object.h" #include "framework/KafkaTopics.h" #include "framework/OpenWifiTypes.h" #include "framework/SubSystemServer.h" @@ -18,17 +18,17 @@ namespace OpenWifi { class KafkaMessage : public Poco::Notification { public: - KafkaMessage(const char * Topic, const std::string &Key, std::shared_ptr Payload) + KafkaMessage(const char * Topic, const std::string &Key, const std::string &Payload) : Topic_(Topic), Key_(Key), Payload_(Payload) {} inline const char * Topic() { return Topic_; } inline const std::string &Key() { return Key_; } - inline const std::string &Payload() { return *Payload_; } + inline const std::string &Payload() { return Payload_; } private: const char *Topic_; std::string Key_; - std::shared_ptr Payload_; + std::string Payload_; }; class KafkaProducer : public Poco::Runnable { @@ -36,7 +36,7 @@ namespace OpenWifi { void run() override; void Start(); void Stop(); - void Produce(const char *Topic, const std::string &Key, std::shared_ptr Payload); + void Produce(const char *Topic, const std::string &Key, const std::string & Payload); private: std::recursive_mutex Mutex_; @@ -63,7 +63,7 @@ namespace OpenWifi { void Stop(); auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); void UnregisterTopicWatcher(const std::string &Topic, int Id); - void Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr Payload); + void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload); void run() override; void Topics(std::vector &T); @@ -92,9 +92,12 @@ namespace OpenWifi { void Stop() override; void PostMessage(const char *topic, const std::string &key, - std::shared_ptr PayLoad, bool WrapMessage = true); - void Dispatch(const char *Topic, const std::string &Key, std::shared_ptr Payload); - [[nodiscard]] const std::shared_ptr WrapSystemId(std::shared_ptr PayLoad); + const std::string &PayLoad, bool WrapMessage = true); + void PostMessage(const char *topic, const std::string &key, + const Poco::JSON::Object &Object, bool WrapMessage = true); + + void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload); + [[nodiscard]] std::string WrapSystemId(const std::string & PayLoad); [[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; } uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id); diff --git a/src/framework/ow_constants.h b/src/framework/ow_constants.h index eba39c2..65ed7a3 100644 --- a/src/framework/ow_constants.h +++ b/src/framework/ow_constants.h @@ -406,10 +406,16 @@ namespace OpenWifi::RESTAPI::Errors { 1172, "The venue name already exists." }; - static const struct msg DefFirmwareNameExists { 1172, "Firmware name already exists." }; - static const struct msg NotAValidECKey { 1173, "Provided key supplied is not valid." }; - static const struct msg InvalidGlobalReachAccount { 1174, "Invalid Global Reach account information (id or key)." }; - static const struct msg CannotCreateCSR { 1175, "Could not create CSR" }; + static const struct msg InvalidGlobalReachAccount { + 1173, "Invalid Global Reach account information." + }; + static const struct msg CannotCreateCSR { + 1174, "Cannot create a CSR certificate." + }; + + static const struct msg DefFirmwareNameExists { 1175, "Firmware name already exists." }; + + static const struct msg NotAValidECKey { 1176, "Not a valid Signing Key." }; static const struct msg SimulationDoesNotExist { 7000, "Simulation Instance ID does not exist."