Skip to content

Commit

Permalink
Merge pull request #106 from Telecominfraproject/main
Browse files Browse the repository at this point in the history
  • Loading branch information
stephb9959 authored Dec 15, 2023
2 parents 1063080 + d050635 commit 8166b75
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 49 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2
6
2 changes: 1 addition & 1 deletion src/AuthService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ namespace OpenWifi {
Expired = (WT.created_ + WT.expires_in_) < now;
if (StorageService()->UserDB().GetUserById(UserId, UInfo.userinfo)) {
UInfo.webtoken = WT;
poco_debug(Logger(), fmt::format("TokenValidation success for TID={} Token={}",
poco_trace(Logger(), fmt::format("TokenValidation success for TID={} Token={}",
TID, Utils::SanitizeToken(CallToken)));
return true;
}
Expand Down
2 changes: 0 additions & 2 deletions src/framework/EventBusManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

namespace OpenWifi {

EventBusManager::EventBusManager(Poco::Logger &L) : Logger_(L) {}

void EventBusManager::run() {
Running_ = true;
Utils::SetThreadName("fmwk:EventMgr");
Expand Down
12 changes: 12 additions & 0 deletions src/framework/EventBusManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ namespace OpenWifi {

class EventBusManager : public Poco::Runnable {
public:
EventBusManager() :
Logger_(Poco::Logger::create(
"EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel())) {
}

static auto instance() {
static auto instance_ = new EventBusManager;
return instance_;
}

explicit EventBusManager(Poco::Logger &L);
void run() final;
void Start();
Expand All @@ -24,4 +34,6 @@ namespace OpenWifi {
Poco::Logger &Logger_;
};

inline auto EventBusManager() { return EventBusManager::instance(); }

} // namespace OpenWifi
121 changes: 78 additions & 43 deletions src/framework/MicroService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,23 @@ namespace OpenWifi {

void MicroService::Exit(int Reason) { std::exit(Reason); }

static std::string MakeServiceListString(const Types::MicroServiceMetaMap &Services) {
std::string SvcList;
for (const auto &Svc : Services) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
}
return SvcList;
}

void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key,
const std::string &Payload) {
std::lock_guard G(InfraMutex_);

Poco::Logger &BusLogger = EventBusManager()->Logger();

try {
Poco::JSON::Parser P;
auto Object = P.parse(Payload).extract<Poco::JSON::Object::Ptr>();
Expand All @@ -55,28 +69,18 @@ namespace OpenWifi {
Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) {
auto PrivateEndPoint =
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString();
if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE &&
Services_.find(PrivateEndPoint) != Services_.end()) {
Services_[PrivateEndPoint].LastUpdate = Utils::Now();
} else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
Services_.erase(PrivateEndPoint);
poco_debug(
logger(),
poco_information(
BusLogger,
fmt::format(
"Service {} ID={} leaving system.",
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
} else if (Event == KafkaTopics::ServiceEvents::EVENT_JOIN ||
Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE) {
poco_debug(
logger(),
fmt::format(
"Service {} ID={} joining system.",
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
Services_[PrivateEndPoint] = Types::MicroServiceMeta{
auto ServiceInfo = Types::MicroServiceMeta{
.Id = ID,
.Type = Poco::toLower(
Object->get(KafkaTopics::ServiceEvents::Fields::TYPE)
Expand All @@ -94,20 +98,46 @@ namespace OpenWifi {
.toString(),
.LastUpdate = Utils::Now()};

std::string SvcList;
for (const auto &Svc : Services_) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
auto s1 = MakeServiceListString(Services_);
auto PreviousSize = Services_.size();
Services_[PrivateEndPoint] = ServiceInfo;
auto CurrentSize = Services_.size();
if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) {
if(!s1.empty()) {
poco_information(
BusLogger,
fmt::format(
"Service {} ID={} is joining the system.",
Object
->get(
KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
}
std::string SvcList;
for (const auto &Svc : Services_) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
}
poco_information(
BusLogger,
fmt::format("Current list of microservices: {}", SvcList));
} else if(CurrentSize!=PreviousSize) {
poco_information(
BusLogger,
fmt::format(
"Service {} ID={} is being added back in.",
Object
->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
}
poco_information(
logger(),
fmt::format("Current list of microservices: {}", SvcList));
}
} else {
poco_error(
logger(),
poco_information(
BusLogger,
fmt::format("KAFKA-MSG: invalid event '{}', missing a field.",
Event));
}
Expand All @@ -118,32 +148,39 @@ namespace OpenWifi {
Object->get(KafkaTopics::ServiceEvents::Fields::TOKEN).toString());
#endif
} else {
poco_error(
logger(),
poco_information(
BusLogger,
fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event));
}
} else {
poco_error(logger(),
poco_information(BusLogger,
fmt::format("Unknown Event: {} Source: {}", Event, ID));
}
}
} else {
poco_error(logger(), "Bad bus message.");
std::ostringstream os;
Object->stringify(std::cout);
std::ostringstream os;
Object->stringify(std::cout);
poco_error(BusLogger, fmt::format("Bad bus message: {}", os.str()));
}

auto i = Services_.begin();
auto ServiceHint = Services_.begin();
auto now = Utils::Now();
for (; i != Services_.end();) {
if ((now - i->second.LastUpdate) > 60) {
i = Services_.erase(i);
auto si1 = Services_.size();
auto ss1 = MakeServiceListString(Services_);
while(ServiceHint!=Services_.end()) {
if ((now - ServiceHint->second.LastUpdate) > 120) {
poco_information(BusLogger, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint));
ServiceHint = Services_.erase(ServiceHint);
} else
++i;
++ServiceHint;
}
if(Services_.size() != si1) {
auto ss2 = MakeServiceListString(Services_);
poco_information(BusLogger, fmt::format("Current list of microservices: {} -> {}", ss1, ss2));
}

} catch (const Poco::Exception &E) {
logger().log(E);
BusLogger.log(E);
}
}

Expand Down Expand Up @@ -412,7 +449,7 @@ namespace OpenWifi {
try {
DataDir.createDirectory();
} catch (const Poco::Exception &E) {
logger().log(E);
Logger_.log(E);
}
}
WWWAssetsDir_ = ConfigPath("openwifi.restapi.wwwassets", "");
Expand Down Expand Up @@ -530,14 +567,12 @@ namespace OpenWifi {
for (auto i : SubSystems_) {
i->Start();
}
EventBusManager_ = std::make_unique<EventBusManager>(Poco::Logger::create(
"EventBusManager", Poco::Logger::root().getChannel(), Poco::Logger::root().getLevel()));
EventBusManager_->Start();
EventBusManager()->Start();
}

void MicroService::StopSubSystemServers() {
AddActivity("Stopping");
EventBusManager_->Stop();
EventBusManager()->Stop();
for (auto i = SubSystems_.rbegin(); i != SubSystems_.rend(); ++i) {
(*i)->Stop();
}
Expand Down Expand Up @@ -697,7 +732,7 @@ namespace OpenWifi {
auto APIKEY = Request.get("X-API-KEY");
return APIKEY == MyHash_;
} catch (const Poco::Exception &E) {
logger().log(E);
Logger_.log(E);
}
return false;
}
Expand Down
1 change: 0 additions & 1 deletion src/framework/MicroService.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ namespace OpenWifi {
Poco::JWT::Signer Signer_;
Poco::Logger &Logger_;
Poco::ThreadPool TimerPool_{"timer:pool", 2, 32};
std::unique_ptr<EventBusManager> EventBusManager_;
};

inline MicroService *MicroService::instance_ = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions src/framework/StorageClass.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ namespace OpenWifi {

}

Poco::Data::SessionPool &Pool() { return *Pool_; }

private:
inline int Setup_SQLite();
inline int Setup_MySQL();
Expand Down
7 changes: 6 additions & 1 deletion src/framework/orm.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ namespace ORM {
bool UpdateRecord(field_name_t FieldName, const T &Value, const RecordType &R) {
try {
assert(ValidFieldName(FieldName));

Poco::Data::Session Session = Pool_.get();
Session.begin();
Poco::Data::Statement Update(Session);

RecordTuple RT;
Expand All @@ -593,6 +593,7 @@ namespace ORM {
Update.execute();
if (Cache_)
Cache_->UpdateCache(R);
Session.commit();
return true;
} catch (const Poco::Exception &E) {
Logger_.log(E);
Expand Down Expand Up @@ -662,6 +663,7 @@ namespace ORM {
assert(ValidFieldName(FieldName));

Poco::Data::Session Session = Pool_.get();
Session.begin();
Poco::Data::Statement Delete(Session);

std::string St = "delete from " + TableName_ + " where " + FieldName + "=?";
Expand All @@ -671,6 +673,7 @@ namespace ORM {
Delete.execute();
if (Cache_)
Cache_->Delete(FieldName, Value);
Session.commit();
return true;
} catch (const Poco::Exception &E) {
Logger_.log(E);
Expand All @@ -682,11 +685,13 @@ namespace ORM {
try {
assert(!WhereClause.empty());
Poco::Data::Session Session = Pool_.get();
Session.begin();
Poco::Data::Statement Delete(Session);

std::string St = "delete from " + TableName_ + " where " + WhereClause;
Delete << St;
Delete.execute();
Session.commit();
return true;
} catch (const Poco::Exception &E) {
Logger_.log(E);
Expand Down
Loading

0 comments on commit 8166b75

Please sign in to comment.