Skip to content

Commit

Permalink
https://telecominfraproject.atlassian.net/browse/WIFI-13172
Browse files Browse the repository at this point in the history
Signed-off-by: stephb9959 <[email protected]>
  • Loading branch information
stephb9959 committed Dec 3, 2023
1 parent bc4da0a commit 7d5c130
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 41 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3
4
98 changes: 58 additions & 40 deletions src/framework/MicroService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ namespace OpenWifi {

void MicroService::Exit(int Reason) { std::exit(Reason); }

static std::string MakeServiceListString(const Types::MicroServiceMetaMap &Services) {
std::string SvcList;
for (const auto &Svc : Services) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
}
return SvcList;
}

void MicroService::BusMessageReceived([[maybe_unused]] const std::string &Key,
const std::string &Payload) {
std::lock_guard G(InfraMutex_);
Expand All @@ -55,28 +66,18 @@ namespace OpenWifi {
Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) {
auto PrivateEndPoint =
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString();
if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE &&
Services_.find(PrivateEndPoint) != Services_.end()) {
Services_[PrivateEndPoint].LastUpdate = Utils::Now();
} else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
Services_.erase(PrivateEndPoint);
poco_debug(
logger(),
poco_information(
Logger_,
fmt::format(
"Service {} ID={} leaving system.",
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
} else if (Event == KafkaTopics::ServiceEvents::EVENT_JOIN ||
Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE) {
poco_debug(
logger(),
fmt::format(
"Service {} ID={} joining system.",
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID));
Services_[PrivateEndPoint] = Types::MicroServiceMeta{
auto ServiceInfo = Types::MicroServiceMeta{
.Id = ID,
.Type = Poco::toLower(
Object->get(KafkaTopics::ServiceEvents::Fields::TYPE)
Expand All @@ -94,20 +95,31 @@ namespace OpenWifi {
.toString(),
.LastUpdate = Utils::Now()};

std::string SvcList;
for (const auto &Svc : Services_) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
auto s1 = MakeServiceListString(Services_);
Services_[PrivateEndPoint] = ServiceInfo;
if(Event == KafkaTopics::ServiceEvents::EVENT_JOIN) {
poco_information(
Logger_,
fmt::format(
"Service {} ID={} is joining the system. old={}",
Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE)
.toString(),
ID, s1));
std::string SvcList;
for (const auto &Svc : Services_) {
if (SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
}
poco_information(
Logger_,
fmt::format("Current list of microservices: {}", SvcList));
}
poco_information(
logger(),
fmt::format("Current list of microservices: {}", SvcList));
}
} else {
poco_error(
logger(),
poco_information(
Logger_,
fmt::format("KAFKA-MSG: invalid event '{}', missing a field.",
Event));
}
Expand All @@ -118,33 +130,39 @@ namespace OpenWifi {
Object->get(KafkaTopics::ServiceEvents::Fields::TOKEN).toString());
#endif
} else {
poco_error(
logger(),
poco_information(
Logger_,
fmt::format("KAFKA-MSG: invalid event '{}', missing token", Event));
}
} else {
poco_error(logger(),
poco_information(Logger_,
fmt::format("Unknown Event: {} Source: {}", Event, ID));
}
}
} else {
poco_error(logger(), "Bad bus message.");
std::ostringstream os;
Object->stringify(std::cout);
std::ostringstream os;
Object->stringify(std::cout);
poco_error(Logger_, fmt::format("Bad bus message: {}", os.str()));
}

auto i = Services_.begin();
auto ServiceHint = Services_.begin();
auto now = Utils::Now();
for (; i != Services_.end();) {
if ((now - i->second.LastUpdate) > 120) {
poco_warning(logger(), fmt::format("ZombieService: Removing service {}, ", i->second.PublicEndPoint));
i = Services_.erase(i);
auto si1 = Services_.size();
auto ss1 = MakeServiceListString(Services_);
while(ServiceHint!=Services_.end()) {
if ((now - ServiceHint->second.LastUpdate) > 120) {
poco_information(Logger_, fmt::format("ZombieService: Removing service {}, ", ServiceHint->second.PublicEndPoint));
ServiceHint = Services_.erase(ServiceHint);
} else
++i;
++ServiceHint;
}
if(Services_.size() != si1) {
auto ss2 = MakeServiceListString(Services_);
poco_information(Logger_, fmt::format("Current list of microservices: {} -> {}", ss1, ss2));
}

} catch (const Poco::Exception &E) {
logger().log(E);
Logger_.log(E);
}
}

Expand Down Expand Up @@ -413,7 +431,7 @@ namespace OpenWifi {
try {
DataDir.createDirectory();
} catch (const Poco::Exception &E) {
logger().log(E);
Logger_.log(E);
}
}
WWWAssetsDir_ = ConfigPath("openwifi.restapi.wwwassets", "");
Expand Down Expand Up @@ -698,7 +716,7 @@ namespace OpenWifi {
auto APIKEY = Request.get("X-API-KEY");
return APIKEY == MyHash_;
} catch (const Poco::Exception &E) {
logger().log(E);
Logger_.log(E);
}
return false;
}
Expand Down

0 comments on commit 7d5c130

Please sign in to comment.