Skip to content

Commit

Permalink
Merge pull request #338 from Telecominfraproject/WIFI-13450
Browse files Browse the repository at this point in the history
  • Loading branch information
stephb9959 authored Mar 6, 2024
2 parents 4e4b69e + 071922c commit 1a0a6d4
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 94 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
@@ -1 +1 @@
88
89
11 changes: 7 additions & 4 deletions src/AP_WS_Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ namespace OpenWifi {
WS_->setKeepAlive(true);
WS_->setBlocking(false);
uuid_ = MicroServiceRandom(std::numeric_limits<std::uint64_t>::max()-1);

AP_WS_Server()->IncrementConnectionCount();
}

void AP_WS_Connection::Start() {
Expand All @@ -76,12 +78,13 @@ namespace OpenWifi {
AP_WS_Connection::~AP_WS_Connection() {
// poco_information(Logger_, fmt::format("DESTRUCTOR({}): 0 - Session={} Connection closed.", SerialNumber_,
// State_.sessionId));
std::lock_guard G(ConnectionMutex_);
// std::lock_guard G(ConnectionMutex_);
// poco_information(Logger_, fmt::format("DESTRUCTOR({}): 1 - Session={} Connection closed.", SerialNumber_,
// State_.sessionId));
EndConnection(false);
poco_debug(Logger_, fmt::format("TERMINATION({}): Session={}, Connection removed.", SerialNumber_,
State_.sessionId));
AP_WS_Server()->DecrementConnectionCount();
}

static void NotifyKafkaDisconnect(const std::string &SerialNumber, std::uint64_t uuid) {
Expand Down Expand Up @@ -564,14 +567,14 @@ namespace OpenWifi {
void AP_WS_Connection::OnSocketShutdown(
[[maybe_unused]] const Poco::AutoPtr<Poco::Net::ShutdownNotification> &pNf) {
poco_trace(Logger_, fmt::format("SOCKET-SHUTDOWN({}): Closing.", CId_));
std::lock_guard G(ConnectionMutex_);
// std::lock_guard G(ConnectionMutex_);
return EndConnection();
}

void AP_WS_Connection::OnSocketError(
[[maybe_unused]] const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf) {
poco_trace(Logger_, fmt::format("SOCKET-ERROR({}): Closing.", CId_));
std::lock_guard G(ConnectionMutex_);
// std::lock_guard G(ConnectionMutex_);
return EndConnection();
}

Expand All @@ -581,7 +584,7 @@ namespace OpenWifi {
if (Dead_) // we are dead, so we do not process anything.
return;

std::lock_guard DeviceLock(ConnectionMutex_);
std::lock_guard G(ConnectionMutex_);

State_.LastContact = LastContact_ = Utils::Now();
if (AP_WS_Server()->Running() && (DeviceValidated_ || ValidatedDevice())) {
Expand Down
16 changes: 6 additions & 10 deletions src/AP_WS_Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ namespace OpenWifi {
}
}

[[nodiscard]] inline bool HasGPS() const { return hasGPS_; }
[[nodiscard]] bool ValidatedDevice();

inline void GetRestrictions(GWObjects::DeviceRestrictions &R) {
inline GWObjects::DeviceRestrictions GetRestrictions() {
std::lock_guard G(ConnectionMutex_);
R = Restrictions_;
return Restrictions_;
}

[[nodiscard]] inline bool HasGPS() const { return hasGPS_; }
[[nodiscard]] bool ValidatedDevice();

inline bool GetTelemetryParameters(bool &Reporting, uint64_t &Interval,
uint64_t &WebSocketTimer, uint64_t &KafkaTimer,
uint64_t &WebSocketCount, uint64_t &KafkaCount,
Expand All @@ -105,10 +105,6 @@ namespace OpenWifi {

friend class AP_WS_Server;

inline GWObjects::DeviceRestrictions Restrictions() {
std::lock_guard G(ConnectionMutex_);
return Restrictions_;
}
void Start();

private:
Expand Down Expand Up @@ -148,7 +144,7 @@ namespace OpenWifi {
std::double_t memory_used_=0.0, cpu_load_ = 0.0, temperature_ = 0.0;
std::uint64_t uuid_=0;
bool Simulated_=false;
std::uint64_t LastContact_=0;
std::atomic_uint64_t LastContact_=0;

static inline std::atomic_uint64_t ConcurrentStartingDevices_ = 0;

Expand Down
97 changes: 21 additions & 76 deletions src/AP_WS_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ namespace OpenWifi {
try {
poco_information(LocalLogger,
fmt::format("Garbage collecting zombies... (step 1)"));
NumberOfConnectedDevices_ = 0;
NumberOfConnectingDevices_ = 0;
AverageDeviceConnectionTime_ = 0;
int waits = 0;
Expand All @@ -242,36 +241,25 @@ namespace OpenWifi {
poco_information(
LocalLogger,
fmt::format("Dead device found in hash index {}", hashIndex));
// hint = SerialNumbers_[hashIndex].erase(hint);
hint++;
hint = SerialNumbers_[hashIndex].erase(hint);
continue;
}
auto Device = hint->second;
if(Device->ConnectionMutex_.try_lock()) {
auto RightNow = Utils::Now();
if (RightNow > Device->LastContact_ &&
(RightNow - Device->LastContact_) > SessionTimeOut_) {
poco_information(
LocalLogger,
fmt::format("{}: Session seems idle. Controller disconnecting device.",
Device->SerialNumber_));
hint = SerialNumbers_[hashIndex].erase(hint);
} else if (Device->State_.Connected) {
NumberOfConnectedDevices_++;
total_connected_time +=
(RightNow - Device->State_.started);
++hint;
} else {
++hint;
}
Device->ConnectionMutex_.unlock();
continue;
auto RightNow = Utils::Now();
if (RightNow > Device->LastContact_ &&
(RightNow - Device->LastContact_) > SessionTimeOut_) {
poco_information(
LocalLogger,
fmt::format("{}: Session seems idle. Controller disconnecting device.",
Device->SerialNumber_));
hint = SerialNumbers_[hashIndex].erase(hint);
} else if (Device->State_.Connected) {
total_connected_time +=
(RightNow - Device->State_.started);
++hint;
} else {
poco_warning(LocalLogger, fmt::format("Could not lock device mutex for {}",
Device->SerialNumber_));
++hint;
}
++NumberOfConnectingDevices_;
++hint;
}
SerialNumbersMutex_[hashIndex].unlock();
break;
Expand Down Expand Up @@ -334,28 +322,13 @@ namespace OpenWifi {
poco_error(LocalLogger, fmt::format("exception:Garbage collecting zombies failed: {}", "unknown"));
}

} else {
NumberOfConnectedDevices_=0;
int wait=0;
for(int i=0;i<MACHash::HashMax();i++) {
if(SerialNumbersMutex_[i].try_lock()) {
wait=0;
NumberOfConnectedDevices_ += SerialNumbers_[i].size();
SerialNumbersMutex_[i].unlock();
} else if (wait<5) {
++wait;
Poco::Thread::sleep(10);
} else {
continue;
}
}
if(NumberOfConnectedDevices_) {
if (last_garbage_run > 0) {
AverageDeviceConnectionTime_ += (now - last_garbage_run);
}
} else {
AverageDeviceConnectionTime_ = 0;
}
if(NumberOfConnectedDevices_) {
if (last_garbage_run > 0) {
AverageDeviceConnectionTime_ += (now - last_garbage_run);
}
} else {
AverageDeviceConnectionTime_ = 0;
}

try {
Expand Down Expand Up @@ -516,7 +489,7 @@ namespace OpenWifi {
return false;
}
if(!DeviceHint->second->Dead_) {
DeviceHint->second->GetRestrictions(Restrictions);
Restrictions = DeviceHint->second->GetRestrictions();
return DeviceHint->second->State_.Connected;
}
return false;
Expand Down Expand Up @@ -695,32 +668,4 @@ namespace OpenWifi {
return false;
}

/* bool AP_WS_Server::ExtendedAttributes(const std::string &serialNumber,
bool & hasGPS,
std::uint64_t &Sanity,
std::double_t &MemoryUsed,
std::double_t &Load,
std::double_t &Temperature
) {
auto serialNumberInt = Utils::SerialNumberToInt(serialNumber);
auto hashIndex = MACHash::Hash(serialNumberInt);
std::lock_guard DevicesGuard(SerialNumbersMutex_[hashIndex]);
auto DeviceHint = SerialNumbers_[hashIndex].find(Utils::SerialNumberToInt(serialNumber));
if(DeviceHint==end(SerialNumbers_[hashIndex])) {
return false;
}
if(DeviceHint->second->Dead_) {
return false;
}
std::lock_guard DeviceGuard(DeviceHint->second->ConnectionMutex_);
hasGPS = DeviceHint->second->hasGPS_;
Sanity = DeviceHint->second->RawLastHealthcheck_.Sanity;
MemoryUsed = DeviceHint->second->memory_used_;
Load = DeviceHint->second->cpu_load_;
Temperature = DeviceHint->second->temperature_;
return true;
}
*/

} // namespace OpenWifi
15 changes: 12 additions & 3 deletions src/AP_WS_Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,21 @@ namespace OpenWifi {
bool KafkaDisableState() const { return KafkaDisableState_; }
bool KafkaDisableHealthChecks() const { return KafkaDisableHealthChecks_; }

inline void IncrementConnectionCount() {
++NumberOfConnectedDevices_;
}

inline void DecrementConnectionCount() {
--NumberOfConnectedDevices_;
}

private:
std::array<std::mutex,SessionHashMax> SessionMutex_;
std::array<std::map<std::uint64_t, std::shared_ptr<AP_WS_Connection>>,SessionHashMax> Sessions_;
using SerialNumberMap = std::map<uint64_t /* serial number */,
std::shared_ptr<AP_WS_Connection>>;
std::array<SerialNumberMap,MACHashMax> SerialNumbers_;
mutable std::array<std::recursive_mutex,MACHashMax> SerialNumbersMutex_;
mutable std::array<std::mutex,MACHashMax> SerialNumbersMutex_;

std::unique_ptr<Poco::Crypto::X509Certificate> IssuerCert_;
std::list<std::unique_ptr<Poco::Net::HTTPServer>> WebServers_;
Expand All @@ -218,8 +226,9 @@ namespace OpenWifi {
std::atomic_bool Running_ = false;

std::uint64_t MismatchDepth_ = 2;
std::uint64_t NumberOfConnectedDevices_ = 0;
std::uint64_t AverageDeviceConnectionTime_ = 0;

std::atomic_uint64_t NumberOfConnectedDevices_ = 0;
std::atomic_uint64_t AverageDeviceConnectionTime_ = 0;
std::uint64_t NumberOfConnectingDevices_ = 0;
std::uint64_t SessionTimeOut_ = 10*60;
std::uint64_t LeftOverSessions_ = 0;
Expand Down

0 comments on commit 1a0a6d4

Please sign in to comment.