Skip to content

Commit

Permalink
https://telecominfraproject.atlassian.net/browse/WIFI-7831
Browse files Browse the repository at this point in the history
Signed-off-by: stephb9959 <[email protected]>
  • Loading branch information
stephb9959 committed Sep 13, 2023
1 parent 94ce329 commit 7d995e7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 27 deletions.
4 changes: 1 addition & 3 deletions src/Kafka_ProvUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ namespace OpenWifi {
Poco::JSON::Object Payload;
obj.to_json(Payload);
Payload.set("ObjectType", OT);
std::ostringstream OS;
Payload.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::PROVISIONING_CHANGE, Ops[op], std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::PROVISIONING_CHANGE, Ops[op], Payload);

return true;
}
Expand Down
6 changes: 3 additions & 3 deletions src/framework/EventBusManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ namespace OpenWifi {
void EventBusManager::run() {
Running_ = true;
Utils::SetThreadName("fmwk:EventMgr");
auto Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN));
auto Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg,
false);
while (Running_) {
Poco::Thread::trySleep((unsigned long)MicroServiceDaemonBusTimer());
if (!Running_)
break;
Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE));
Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(),
Msg, false);
}
Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE));
Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg,
false);
};
Expand Down
25 changes: 17 additions & 8 deletions src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ namespace OpenWifi {
Consumer.async_commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), std::make_shared<std::string>(Msg.get_payload()));
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload());
if (!AutoCommit)
Consumer.async_commit(Msg);
}
Expand Down Expand Up @@ -213,7 +213,7 @@ namespace OpenWifi {
}

void KafkaProducer::Produce(const char *Topic, const std::string &Key,
std::shared_ptr<std::string> Payload) {
const std::string &Payload) {
std::lock_guard G(Mutex_);
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
}
Expand Down Expand Up @@ -276,7 +276,7 @@ namespace OpenWifi {
}

void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key,
const std::shared_ptr<std::string> Payload) {
const std::string & Payload) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if (It != Notifiers_.end()) {
Expand Down Expand Up @@ -333,20 +333,29 @@ namespace OpenWifi {
}

void KafkaManager::PostMessage(const char *topic, const std::string &key,
const std::shared_ptr<std::string> PayLoad, bool WrapMessage) {
const std::string & PayLoad, bool WrapMessage) {
if (KafkaEnabled_) {
ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
}
}

void KafkaManager::PostMessage(const char *topic, const std::string &key,
const Poco::JSON::Object &Object, bool WrapMessage) {
if (KafkaEnabled_) {
std::ostringstream ObjectStr;
Object.stringify(ObjectStr);
ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(ObjectStr.str()) : ObjectStr.str());
}
}


void KafkaManager::Dispatch(const char *Topic, const std::string &Key,
const std::shared_ptr<std::string> Payload) {
const std::string &Payload) {
Dispatcher_.Dispatch(Topic, Key, Payload);
}

[[nodiscard]] const std::shared_ptr<std::string> KafkaManager::WrapSystemId(const std::shared_ptr<std::string> PayLoad) {
*PayLoad = SystemInfoWrapper_ + *PayLoad + "}";
return PayLoad;
[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
return SystemInfoWrapper_ + PayLoad + "}";
}

uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,
Expand Down
21 changes: 12 additions & 9 deletions src/framework/KafkaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include "Poco/Notification.h"
#include "Poco/NotificationQueue.h"

#include "Poco/JSON/Object.h"
#include "framework/KafkaTopics.h"
#include "framework/OpenWifiTypes.h"
#include "framework/SubSystemServer.h"
Expand All @@ -18,25 +18,25 @@ namespace OpenWifi {

class KafkaMessage : public Poco::Notification {
public:
KafkaMessage(const char * Topic, const std::string &Key, std::shared_ptr<std::string> Payload)
KafkaMessage(const char * Topic, const std::string &Key, const std::string &Payload)
: Topic_(Topic), Key_(Key), Payload_(Payload) {}

inline const char * Topic() { return Topic_; }
inline const std::string &Key() { return Key_; }
inline const std::string &Payload() { return *Payload_; }
inline const std::string &Payload() { return Payload_; }

private:
const char *Topic_;
std::string Key_;
std::shared_ptr<std::string> Payload_;
std::string Payload_;
};

class KafkaProducer : public Poco::Runnable {
public:
void run() override;
void Start();
void Stop();
void Produce(const char *Topic, const std::string &Key, std::shared_ptr<std::string> Payload);
void Produce(const char *Topic, const std::string &Key, const std::string & Payload);

private:
std::recursive_mutex Mutex_;
Expand All @@ -63,7 +63,7 @@ namespace OpenWifi {
void Stop();
auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, int Id);
void Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr<std::string> Payload);
void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload);
void run() override;
void Topics(std::vector<std::string> &T);

Expand Down Expand Up @@ -92,9 +92,12 @@ namespace OpenWifi {
void Stop() override;

void PostMessage(const char *topic, const std::string &key,
std::shared_ptr<std::string> PayLoad, bool WrapMessage = true);
void Dispatch(const char *Topic, const std::string &Key, std::shared_ptr<std::string> Payload);
[[nodiscard]] const std::shared_ptr<std::string> WrapSystemId(std::shared_ptr<std::string> PayLoad);
const std::string &PayLoad, bool WrapMessage = true);
void PostMessage(const char *topic, const std::string &key,
const Poco::JSON::Object &Object, bool WrapMessage = true);

void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload);
[[nodiscard]] std::string WrapSystemId(const std::string & PayLoad);
[[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; }
uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id);
Expand Down
14 changes: 10 additions & 4 deletions src/framework/ow_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,16 @@ namespace OpenWifi::RESTAPI::Errors {
1172, "The venue name already exists."
};

static const struct msg DefFirmwareNameExists { 1172, "Firmware name already exists." };
static const struct msg NotAValidECKey { 1173, "Provided key supplied is not valid." };
static const struct msg InvalidGlobalReachAccount { 1174, "Invalid Global Reach account information (id or key)." };
static const struct msg CannotCreateCSR { 1175, "Could not create CSR" };
static const struct msg InvalidGlobalReachAccount {
1173, "Invalid Global Reach account information."
};
static const struct msg CannotCreateCSR {
1174, "Cannot create a CSR certificate."
};

static const struct msg DefFirmwareNameExists { 1175, "Firmware name already exists." };

static const struct msg NotAValidECKey { 1176, "Not a valid Signing Key." };

static const struct msg SimulationDoesNotExist {
7000, "Simulation Instance ID does not exist."
Expand Down

0 comments on commit 7d995e7

Please sign in to comment.