From 6315376aafa4a261ca03e5a057f50ed862fe3615 Mon Sep 17 00:00:00 2001 From: Marco Kirchner Date: Tue, 6 Feb 2024 21:05:26 +0100 Subject: [PATCH] Fix memory leak in kuma-client resolves #1 --- CHANGELOG.md | 2 ++ kuma-client/src/client.rs | 68 ++++++++++++++++++++++----------------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 550a6b5..685b48e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- Memory leak in kuma-client [#1](https://github.com/BigBoot/AutoKuma/issues/1) ## [0.3.0] - 2024-01-13 ### Added diff --git a/kuma-client/src/client.rs b/kuma-client/src/client.rs index 2e85dfa..82076c3 100644 --- a/kuma-client/src/client.rs +++ b/kuma-client/src/client.rs @@ -20,7 +20,13 @@ use rust_socketio::{ }; use serde::de::DeserializeOwned; use serde_json::{json, Value}; -use std::{collections::HashMap, mem, str::FromStr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + mem, + str::FromStr, + sync::{Arc, Weak}, + time::Duration, +}; use tokio::{runtime::Handle, sync::Mutex}; struct Ready { @@ -952,47 +958,49 @@ impl Worker { } let handle = Handle::current(); - let self_ref = self.to_owned(); + let self_ref = Arc::downgrade(self); let client = builder .on_any(move |event, payload, _| { let handle = handle.clone(); - let self_ref: Arc = self_ref.clone(); + let self_ref: Weak = self_ref.clone(); trace!("Client::on_any({:?}, {:?})", &event, &payload); async move { - match (event, payload) { - (SocketIOEvent::Message, Payload::Text(params)) => { - if let Ok(e) = Event::from_str( - ¶ms[0] - .as_str() - .log_warn(|| "Error while deserializing Event...") - .unwrap_or(""), - ) { - handle.clone().spawn(async move { - _ = self_ref.clone().on_event(e, json!(null)).await.log_warn( - |e| { + if let Some(arc) = self_ref.upgrade() { + match (event, payload) { + (SocketIOEvent::Message, Payload::Text(params)) => { + if let Ok(e) = Event::from_str( + ¶ms[0] + .as_str() + .log_warn(|| "Error while deserializing Event...") + .unwrap_or(""), + ) { + handle.clone().spawn(async move { + _ = arc.on_event(e, json!(null)).await.log_warn(|e| { format!( "Error while sending message event: {}", e.to_string() ) - }, - ); - }); - } - } - (event, Payload::Text(params)) => { - if let Ok(e) = Event::from_str(&String::from(event)) { - handle.clone().spawn(async move { - _ = self_ref - .clone() - .on_event(e, params.into_iter().next().unwrap()) - .await - .log_warn(|e| { - format!("Error while sending event: {}", e.to_string()) }); - }); + }); + } + } + (event, Payload::Text(params)) => { + if let Ok(e) = Event::from_str(&String::from(event)) { + handle.clone().spawn(async move { + _ = arc + .on_event(e, params.into_iter().next().unwrap()) + .await + .log_warn(|e| { + format!( + "Error while sending event: {}", + e.to_string() + ) + }); + }); + } } + _ => {} } - _ => {} } } .boxed()