Skip to content

Commit

Permalink
https://telecominfraproject.atlassian.net/browse/WIFI-7831
Browse files Browse the repository at this point in the history
Signed-off-by: stephb9959 <[email protected]>
  • Loading branch information
stephb9959 committed Sep 13, 2023
1 parent 6b49c68 commit caf1ec9
Show file tree
Hide file tree
Showing 21 changed files with 237 additions and 80 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1 +1 @@
18
19
10 changes: 2 additions & 8 deletions src/AP_WS_Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,7 @@ namespace OpenWifi {
Details.set(uCentralProtocol::TIMESTAMP, Utils::Now());
Details.set(uCentralProtocol::UUID,uuid);
Disconnect.set(uCentralProtocol::DISCONNECTION, Details);
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(Disconnect, OS);
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber, Disconnect);
} catch (...) {
}
}
Expand Down Expand Up @@ -725,10 +722,7 @@ namespace OpenWifi {
PingDetails.set(uCentralProtocol::UUID, uuid_);
PingDetails.set("locale", State_.locale);
PingObject.set(uCentralProtocol::PING, PingDetails);
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(PingObject, OS);
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_,PingObject);
}
return;
} break;
Expand Down
2 changes: 1 addition & 1 deletion src/AP_WS_Process_alarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace OpenWifi {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::ALERTS, SerialNumber_, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::ALERTS, SerialNumber_, OS.str());
}
}
}
Expand Down
14 changes: 3 additions & 11 deletions src/AP_WS_Process_connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ namespace OpenWifi {
Event.set("type", "device.firmware_change");
Event.set("timestamp", Utils::Now());
Event.set("payload", EventDetails);
std::ostringstream OS;
Event.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, Event);
}
}

Expand All @@ -51,9 +49,7 @@ namespace OpenWifi {
Event.set("type", "device.not_provisioned");
Event.set("timestamp", Utils::Now());
Event.set("payload", EventDetails);
std::ostringstream OS;
Event.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, Event);
}
}

Expand Down Expand Up @@ -285,15 +281,11 @@ namespace OpenWifi {
GWWebSocketNotifications::DeviceConnected(Notification);

if (KafkaManager()->Enabled()) {
Poco::JSON::Stringifier Stringify;

ParamsObj->set(uCentralProtocol::CONNECTIONIP, CId_);
ParamsObj->set("locale", State_.locale);
ParamsObj->set(uCentralProtocol::TIMESTAMP, Utils::Now());
ParamsObj->set(uCentralProtocol::UUID, uuid_);
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_, *ParamsObj);
}
} else {
poco_warning(
Expand Down
5 changes: 1 addition & 4 deletions src/AP_WS_Process_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ namespace OpenWifi {
FullEvent.set("type", EventType);
FullEvent.set("timestamp", EventTimeStamp);
FullEvent.set("payload", EventPayload);

std::ostringstream OS;
FullEvent.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber_,
std::make_shared<std::string>(OS.str()));
FullEvent);
}
}
} catch (const Poco::Exception &E) {
Expand Down
6 changes: 1 addition & 5 deletions src/AP_WS_Process_healthcheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ namespace OpenWifi {

SetLastHealthCheck(Check);
if (KafkaManager()->Enabled()) {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
ParamsObj->set("timestamp", Utils::Now());
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::HEALTHCHECK, SerialNumber_, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::HEALTHCHECK, SerialNumber_, *ParamsObj);
}
} else {
poco_warning(Logger_, fmt::format("HEALTHCHECK({}): Missing parameter", CId_));
Expand Down
5 changes: 1 addition & 4 deletions src/AP_WS_Process_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ namespace OpenWifi {
State_.Associations_5G, State_.Associations_6G);

if (KafkaManager()->Enabled()) {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::STATE, SerialNumber_, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::STATE, SerialNumber_, *ParamsObj);
}

GWWebSocketNotifications::SingleDevice_t N;
Expand Down
4 changes: 2 additions & 2 deletions src/AP_WS_Process_telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace OpenWifi {
std::ostringstream SS;
Payload->stringify(SS);
auto now = Utils::Now();
auto KafkaPayload = std::make_shared<std::string>(SS.str());
auto KafkaPayload = SS.str();
if (ParamsObj->has("adhoc")) {
KafkaManager()->PostMessage(KafkaTopics::DEVICE_TELEMETRY, SerialNumber_,
KafkaPayload);
Expand All @@ -39,7 +39,7 @@ namespace OpenWifi {
// std::endl;
TelemetryWebSocketPackets_++;
State_.websocketPackets = TelemetryWebSocketPackets_;
TelemetryStream()->NotifyEndPoint(SerialNumberInt_, *KafkaPayload);
TelemetryStream()->NotifyEndPoint(SerialNumberInt_, KafkaPayload);
} else {
StopWebSocketTelemetry(CommandManager()->Next_RPC_ID());
}
Expand Down
6 changes: 1 addition & 5 deletions src/AP_WS_Process_wifiscan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ namespace OpenWifi {

if (ParamsObj->has(uCentralProtocol::SERIAL) && ParamsObj->has(uCentralProtocol::DATA)) {
if (KafkaManager()->Enabled()) {
auto Data = ParamsObj->get(uCentralProtocol::DATA);
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, *ParamsObj);
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/AP_WS_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,7 @@ namespace OpenWifi {
FullEvent.set("timestamp", now);
FullEvent.set("payload", KafkaNotification);

std::ostringstream OS;
FullEvent.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, "system",
std::make_shared<std::string>(OS.str()));
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, "system", FullEvent);
}

void AP_WS_Server::Stop() {
Expand Down
3 changes: 1 addition & 2 deletions src/GWKafkaEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ namespace OpenWifi {
Event.set("payload", payload_);
std::ostringstream OS;
Event.stringify(OS);
auto payload = std::make_shared<std::string>(OS.str());
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, Utils::IntToSerialNumber(serialNumber_), payload);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, Utils::IntToSerialNumber(serialNumber_), OS.str());
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/RESTAPI/RESTAPI_device_commandHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ namespace OpenWifi {
RESTAPI_RPC::WaitForCommand(CMD_RPC, APCommands::Commands::wifiscan, false, Cmd, Params,
*Request, *Response, timeout, nullptr, this, Logger_);
if (Cmd.ErrorCode == 0) {
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, std::make_shared<std::string>(Cmd.Results));
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, Cmd.Results);
}
}

Expand Down Expand Up @@ -1069,7 +1069,7 @@ namespace OpenWifi {
Logger_);
if (Cmd.ErrorCode == 0) {
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber_,
std::make_shared<std::string>(Cmd.Results));
Cmd.Results);
}
return;
}
Expand Down Expand Up @@ -1126,9 +1126,6 @@ namespace OpenWifi {
BadRequest(RESTAPI::Errors::MissingOrInvalidParameters);
}

#define DBGLINE \
{ std::cout << __LINE__ << std::endl; }

void RESTAPI_device_commandHandler::Rtty(
const std::string &CMD_UUID, uint64_t CMD_RPC, std::chrono::milliseconds timeout,
[[maybe_unused]] const GWObjects::DeviceRestrictions &Restrictions) {
Expand Down
6 changes: 3 additions & 3 deletions src/framework/EventBusManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ namespace OpenWifi {
void EventBusManager::run() {
Running_ = true;
Utils::SetThreadName("fmwk:EventMgr");
auto Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN));
auto Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg,
false);
while (Running_) {
Poco::Thread::trySleep((unsigned long)MicroServiceDaemonBusTimer());
if (!Running_)
break;
Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE));
Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(),
Msg, false);
}
Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE));
Msg = (MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg,
false);
};
Expand Down
25 changes: 17 additions & 8 deletions src/framework/KafkaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ namespace OpenWifi {
Consumer.async_commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), std::make_shared<std::string>(Msg.get_payload()));
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload());
if (!AutoCommit)
Consumer.async_commit(Msg);
}
Expand Down Expand Up @@ -213,7 +213,7 @@ namespace OpenWifi {
}

void KafkaProducer::Produce(const char *Topic, const std::string &Key,
std::shared_ptr<std::string> Payload) {
const std::string &Payload) {
std::lock_guard G(Mutex_);
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
}
Expand Down Expand Up @@ -276,7 +276,7 @@ namespace OpenWifi {
}

void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key,
const std::shared_ptr<std::string> Payload) {
const std::string & Payload) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if (It != Notifiers_.end()) {
Expand Down Expand Up @@ -333,20 +333,29 @@ namespace OpenWifi {
}

void KafkaManager::PostMessage(const char *topic, const std::string &key,
const std::shared_ptr<std::string> PayLoad, bool WrapMessage) {
const std::string & PayLoad, bool WrapMessage) {
if (KafkaEnabled_) {
ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
}
}

void KafkaManager::PostMessage(const char *topic, const std::string &key,
const Poco::JSON::Object &Object, bool WrapMessage) {
if (KafkaEnabled_) {
std::ostringstream ObjectStr;
Object.stringify(ObjectStr);
ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(ObjectStr.str()) : ObjectStr.str());
}
}


void KafkaManager::Dispatch(const char *Topic, const std::string &Key,
const std::shared_ptr<std::string> Payload) {
const std::string &Payload) {
Dispatcher_.Dispatch(Topic, Key, Payload);
}

[[nodiscard]] const std::shared_ptr<std::string> KafkaManager::WrapSystemId(const std::shared_ptr<std::string> PayLoad) {
*PayLoad = SystemInfoWrapper_ + *PayLoad + "}";
return PayLoad;
[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
return SystemInfoWrapper_ + PayLoad + "}";
}

uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,
Expand Down
21 changes: 12 additions & 9 deletions src/framework/KafkaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include "Poco/Notification.h"
#include "Poco/NotificationQueue.h"

#include "Poco/JSON/Object.h"
#include "framework/KafkaTopics.h"
#include "framework/OpenWifiTypes.h"
#include "framework/SubSystemServer.h"
Expand All @@ -18,25 +18,25 @@ namespace OpenWifi {

class KafkaMessage : public Poco::Notification {
public:
KafkaMessage(const char * Topic, const std::string &Key, std::shared_ptr<std::string> Payload)
KafkaMessage(const char * Topic, const std::string &Key, const std::string &Payload)
: Topic_(Topic), Key_(Key), Payload_(Payload) {}

inline const char * Topic() { return Topic_; }
inline const std::string &Key() { return Key_; }
inline const std::string &Payload() { return *Payload_; }
inline const std::string &Payload() { return Payload_; }

private:
const char *Topic_;
std::string Key_;
std::shared_ptr<std::string> Payload_;
std::string Payload_;
};

class KafkaProducer : public Poco::Runnable {
public:
void run() override;
void Start();
void Stop();
void Produce(const char *Topic, const std::string &Key, std::shared_ptr<std::string> Payload);
void Produce(const char *Topic, const std::string &Key, const std::string & Payload);

private:
std::recursive_mutex Mutex_;
Expand All @@ -63,7 +63,7 @@ namespace OpenWifi {
void Stop();
auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, int Id);
void Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr<std::string> Payload);
void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload);
void run() override;
void Topics(std::vector<std::string> &T);

Expand Down Expand Up @@ -92,9 +92,12 @@ namespace OpenWifi {
void Stop() override;

void PostMessage(const char *topic, const std::string &key,
std::shared_ptr<std::string> PayLoad, bool WrapMessage = true);
void Dispatch(const char *Topic, const std::string &Key, std::shared_ptr<std::string> Payload);
[[nodiscard]] const std::shared_ptr<std::string> WrapSystemId(std::shared_ptr<std::string> PayLoad);
const std::string &PayLoad, bool WrapMessage = true);
void PostMessage(const char *topic, const std::string &key,
const Poco::JSON::Object &Object, bool WrapMessage = true);

void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload);
[[nodiscard]] std::string WrapSystemId(const std::string & PayLoad);
[[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; }
uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id);
Expand Down
22 changes: 21 additions & 1 deletion src/framework/RESTAPI_Handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,27 @@ namespace OpenWifi {
return ReturnRawJSON(os.str());
}

inline void ReturnRawJSON(const std::string &json_doc) {
template<class T> void ReturnObject(const std::vector<T> &Objects) {
Poco::JSON::Array Arr;
for(const auto &Object:Objects) {
Poco::JSON::Object O;
Object.to_json(O);
Arr.add(O);
}
std::ostringstream os;
Arr.stringify(os);
return ReturnRawJSON(os.str());
}

template<class T> void ReturnObject(const T &Object) {
Poco::JSON::Object O;
Object.to_json(O);
std::ostringstream os;
O.stringify(os);
return ReturnRawJSON(os.str());
}

inline void ReturnRawJSON(const std::string &json_doc) {
PrepareResponse();
if (Request != nullptr) {
// can we compress ???
Expand Down
1 change: 1 addition & 0 deletions src/framework/ow_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace OpenWifi {
};
}

#define DBGLINE std::cout << __LINE__ << ":" << __FILE__ << ", " << __func__ << std::endl;
namespace OpenWifi::RESTAPI::Errors {
struct msg {
uint64_t err_num;
Expand Down
Loading

0 comments on commit caf1ec9

Please sign in to comment.