From ff304060870fb8faa48aaf8f1ada8ebc3f1b5b0f Mon Sep 17 00:00:00 2001 From: Mihai Dinculescu Date: Sat, 31 Aug 2024 18:59:54 +0100 Subject: [PATCH] tapo-py: Improve the concurrency of device handlers --- CHANGELOG.md | 1 + tapo-py/src/handlers/color_light_handler.rs | 73 +++++++++++++++---- .../src/handlers/generic_device_handler.rs | 32 ++++++-- tapo-py/src/handlers/hub_handler.rs | 41 ++++++++--- tapo-py/src/handlers/light_handler.rs | 42 ++++++++--- .../plug_energy_monitoring_handler.rs | 66 +++++++++++++---- tapo-py/src/handlers/plug_handler.rs | 35 ++++++--- tapo-py/src/runtime.rs | 18 ++--- 8 files changed, 228 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28d0b57..4bf38ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ file. This change log follows the conventions of ### Fixed - Resolved an issue that led to unrecoverable process hangs when a device request timed out. +- The concurrency of device handlers has been significantly enhanced by replacing all `Mutex` instances with `RwLock`. - `DeviceInfoPlugResult` has been updated to correctly support the P100 and P105 devices. ## [Rust v0.7.13][v0.7.13] - 2024-08-26 diff --git a/tapo-py/src/handlers/color_light_handler.rs b/tapo-py/src/handlers/color_light_handler.rs index 6a96277..85af884 100644 --- a/tapo-py/src/handlers/color_light_handler.rs +++ b/tapo-py/src/handlers/color_light_handler.rs @@ -1,11 +1,12 @@ -use std::{ops::Deref, sync::Arc}; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use pyo3::prelude::*; use pyo3::types::PyDict; use tapo::requests::{Color, ColorLightSetDeviceInfoParams}; use tapo::responses::{DeviceInfoColorLightResult, DeviceUsageEnergyMonitoringResult}; use tapo::ColorLightHandler; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::call_handler_method; use crate::errors::ErrorWrapper; @@ -14,13 +15,13 @@ use crate::runtime::tokio; #[derive(Clone)] #[pyclass(name = "ColorLightHandler")] pub struct PyColorLightHandler { - handler: Arc>, + handler: Arc>, } impl PyColorLightHandler { pub fn new(handler: ColorLightHandler) -> Self { Self { - handler: Arc::new(Mutex::new(handler)), + handler: Arc::new(RwLock::new(handler)), } } } @@ -28,32 +29,55 @@ impl PyColorLightHandler { #[pymethods] impl PyColorLightHandler { pub async fn refresh_session(&self) -> PyResult<()> { - call_handler_method!(self, ColorLightHandler::refresh_session, discard_result) + let handler = self.handler.clone(); + call_handler_method!( + handler.write().await.deref_mut(), + ColorLightHandler::refresh_session, + discard_result + ) } pub async fn on(&self) -> PyResult<()> { - call_handler_method!(self, ColorLightHandler::on) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), ColorLightHandler::on) } pub async fn off(&self) -> PyResult<()> { - call_handler_method!(self, ColorLightHandler::off) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), ColorLightHandler::off) } pub async fn device_reset(&self) -> PyResult<()> { - call_handler_method!(self, ColorLightHandler::device_reset) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + ColorLightHandler::device_reset + ) } pub async fn get_device_info(&self) -> PyResult { - call_handler_method!(self, ColorLightHandler::get_device_info) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + ColorLightHandler::get_device_info + ) } pub async fn get_device_info_json(&self) -> PyResult> { - let result = call_handler_method!(self, ColorLightHandler::get_device_info_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + ColorLightHandler::get_device_info_json, + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } pub async fn get_device_usage(&self) -> PyResult { - call_handler_method!(self, ColorLightHandler::get_device_usage) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + ColorLightHandler::get_device_usage + ) } pub fn set(&self) -> PyColorLightSetDeviceInfoParams { @@ -61,20 +85,37 @@ impl PyColorLightHandler { } pub async fn set_brightness(&self, brightness: u8) -> PyResult<()> { - call_handler_method!(self, ColorLightHandler::set_brightness, brightness) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + ColorLightHandler::set_brightness, + brightness + ) } pub async fn set_color(&self, color: Color) -> PyResult<()> { - call_handler_method!(self, ColorLightHandler::set_color, color) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + ColorLightHandler::set_color, + color + ) } pub async fn set_hue_saturation(&self, hue: u16, saturation: u8) -> PyResult<()> { - call_handler_method!(self, ColorLightHandler::set_hue_saturation, hue, saturation) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + ColorLightHandler::set_hue_saturation, + hue, + saturation + ) } pub async fn set_color_temperature(&self, color_temperature: u16) -> PyResult<()> { + let handler = self.handler.clone(); call_handler_method!( - self, + handler.read().await.deref(), ColorLightHandler::set_color_temperature, color_temperature ) @@ -138,7 +179,7 @@ impl PyColorLightSetDeviceInfoParams { tokio() .spawn(async move { - let handler_lock = handler.handler.lock().await; + let handler_lock = handler.handler.read().await; params .send(handler_lock.deref()) diff --git a/tapo-py/src/handlers/generic_device_handler.rs b/tapo-py/src/handlers/generic_device_handler.rs index 1e5d731..84d2e54 100644 --- a/tapo-py/src/handlers/generic_device_handler.rs +++ b/tapo-py/src/handlers/generic_device_handler.rs @@ -1,10 +1,11 @@ +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use pyo3::prelude::*; use pyo3::types::PyDict; use tapo::responses::DeviceInfoGenericResult; use tapo::GenericDeviceHandler; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::call_handler_method; use crate::errors::ErrorWrapper; @@ -12,13 +13,13 @@ use crate::errors::ErrorWrapper; #[derive(Clone)] #[pyclass(name = "GenericDeviceHandler")] pub struct PyGenericDeviceHandler { - handler: Arc>, + handler: Arc>, } impl PyGenericDeviceHandler { pub fn new(handler: GenericDeviceHandler) -> Self { Self { - handler: Arc::new(Mutex::new(handler)), + handler: Arc::new(RwLock::new(handler)), } } } @@ -26,23 +27,38 @@ impl PyGenericDeviceHandler { #[pymethods] impl PyGenericDeviceHandler { pub async fn refresh_session(&self) -> PyResult<()> { - call_handler_method!(self, GenericDeviceHandler::refresh_session, discard_result) + let handler = self.handler.clone(); + call_handler_method!( + handler.write().await.deref_mut(), + GenericDeviceHandler::refresh_session, + discard_result + ) } pub async fn on(&self) -> PyResult<()> { - call_handler_method!(self, GenericDeviceHandler::on) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), GenericDeviceHandler::on) } pub async fn off(&self) -> PyResult<()> { - call_handler_method!(self, GenericDeviceHandler::off) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), GenericDeviceHandler::off) } pub async fn get_device_info(&self) -> PyResult { - call_handler_method!(self, GenericDeviceHandler::get_device_info) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + GenericDeviceHandler::get_device_info + ) } pub async fn get_device_info_json(&self) -> PyResult> { - let result = call_handler_method!(self, GenericDeviceHandler::get_device_info_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + GenericDeviceHandler::get_device_info_json, + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } } diff --git a/tapo-py/src/handlers/hub_handler.rs b/tapo-py/src/handlers/hub_handler.rs index 67319a3..6912fea 100644 --- a/tapo-py/src/handlers/hub_handler.rs +++ b/tapo-py/src/handlers/hub_handler.rs @@ -1,10 +1,11 @@ +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use pyo3::prelude::*; use pyo3::types::{PyDict, PyList}; use tapo::responses::{ChildDeviceHubResult, DeviceInfoHubResult}; use tapo::HubHandler; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::call_handler_method; use crate::errors::ErrorWrapper; @@ -12,13 +13,13 @@ use crate::errors::ErrorWrapper; #[derive(Clone)] #[pyclass(name = "HubHandler")] pub struct PyHubHandler { - handler: Arc>, + handler: Arc>, } impl PyHubHandler { pub fn new(handler: HubHandler) -> Self { Self { - handler: Arc::new(Mutex::new(handler)), + handler: Arc::new(RwLock::new(handler)), } } } @@ -26,20 +27,34 @@ impl PyHubHandler { #[pymethods] impl PyHubHandler { pub async fn refresh_session(&self) -> PyResult<()> { - call_handler_method!(self, HubHandler::refresh_session, discard_result) + let handler = self.handler.clone(); + call_handler_method!( + handler.write().await.deref_mut(), + HubHandler::refresh_session, + discard_result + ) } pub async fn get_device_info(&self) -> PyResult { - call_handler_method!(self, HubHandler::get_device_info) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), HubHandler::get_device_info) } pub async fn get_device_info_json(&self) -> PyResult> { - let result = call_handler_method!(self, HubHandler::get_device_info_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + HubHandler::get_device_info_json + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } pub async fn get_child_device_list(&self) -> PyResult> { - let children = call_handler_method!(self, HubHandler::get_child_device_list)?; + let handler = self.handler.clone(); + let children = call_handler_method!( + handler.read().await.deref(), + HubHandler::get_child_device_list + )?; let results = Python::with_gil(|py| { let results = PyList::empty_bound(py); @@ -80,12 +95,20 @@ impl PyHubHandler { } pub async fn get_child_device_list_json(&self) -> PyResult> { - let result = call_handler_method!(self, HubHandler::get_child_device_list_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + HubHandler::get_child_device_list_json + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } pub async fn get_child_device_component_list_json(&self) -> PyResult> { - let result = call_handler_method!(self, HubHandler::get_child_device_component_list_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + HubHandler::get_child_device_component_list_json + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } } diff --git a/tapo-py/src/handlers/light_handler.rs b/tapo-py/src/handlers/light_handler.rs index 84937a5..d2f571e 100644 --- a/tapo-py/src/handlers/light_handler.rs +++ b/tapo-py/src/handlers/light_handler.rs @@ -1,10 +1,11 @@ +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use pyo3::prelude::*; use pyo3::types::PyDict; use tapo::responses::{DeviceInfoLightResult, DeviceUsageEnergyMonitoringResult}; use tapo::LightHandler; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::call_handler_method; use crate::errors::ErrorWrapper; @@ -12,13 +13,13 @@ use crate::errors::ErrorWrapper; #[derive(Clone)] #[pyclass(name = "LightHandler")] pub struct PyLightHandler { - handler: Arc>, + handler: Arc>, } impl PyLightHandler { pub fn new(handler: LightHandler) -> Self { Self { - handler: Arc::new(Mutex::new(handler)), + handler: Arc::new(RwLock::new(handler)), } } } @@ -26,35 +27,54 @@ impl PyLightHandler { #[pymethods] impl PyLightHandler { pub async fn refresh_session(&self) -> PyResult<()> { - call_handler_method!(self, LightHandler::refresh_session, discard_result) + let handler = self.handler.clone(); + call_handler_method!( + handler.write().await.deref_mut(), + LightHandler::refresh_session, + discard_result + ) } pub async fn on(&self) -> PyResult<()> { - call_handler_method!(self, LightHandler::on) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), LightHandler::on) } pub async fn off(&self) -> PyResult<()> { - call_handler_method!(self, LightHandler::off) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), LightHandler::off) } pub async fn device_reset(&self) -> PyResult<()> { - call_handler_method!(self, LightHandler::device_reset) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), LightHandler::device_reset) } pub async fn get_device_info(&self) -> PyResult { - call_handler_method!(self, LightHandler::get_device_info) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), LightHandler::get_device_info) } pub async fn get_device_info_json(&self) -> PyResult> { - let result = call_handler_method!(self, LightHandler::get_device_info_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + LightHandler::get_device_info_json + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } pub async fn get_device_usage(&self) -> PyResult { - call_handler_method!(self, LightHandler::get_device_usage) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), LightHandler::get_device_usage) } pub async fn set_brightness(&self, brightness: u8) -> PyResult<()> { - call_handler_method!(self, LightHandler::set_brightness, brightness) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + LightHandler::set_brightness, + brightness + ) } } diff --git a/tapo-py/src/handlers/plug_energy_monitoring_handler.rs b/tapo-py/src/handlers/plug_energy_monitoring_handler.rs index 0a642ba..d5b66a9 100644 --- a/tapo-py/src/handlers/plug_energy_monitoring_handler.rs +++ b/tapo-py/src/handlers/plug_energy_monitoring_handler.rs @@ -1,3 +1,4 @@ +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use chrono::NaiveDate; @@ -9,7 +10,7 @@ use tapo::responses::{ EnergyDataResult, EnergyUsageResult, }; use tapo::PlugEnergyMonitoringHandler; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::call_handler_method; use crate::errors::ErrorWrapper; @@ -25,13 +26,13 @@ pub enum PyEnergyDataInterval { #[derive(Clone)] #[pyclass(name = "PlugEnergyMonitoringHandler")] pub struct PyPlugEnergyMonitoringHandler { - handler: Arc>, + handler: Arc>, } impl PyPlugEnergyMonitoringHandler { pub fn new(handler: PlugEnergyMonitoringHandler) -> Self { Self { - handler: Arc::new(Mutex::new(handler)), + handler: Arc::new(RwLock::new(handler)), } } } @@ -39,44 +40,77 @@ impl PyPlugEnergyMonitoringHandler { #[pymethods] impl PyPlugEnergyMonitoringHandler { pub async fn refresh_session(&self) -> PyResult<()> { + let handler = self.handler.clone(); call_handler_method!( - self, + handler.write().await.deref_mut(), PlugEnergyMonitoringHandler::refresh_session, discard_result ) } pub async fn on(&self) -> PyResult<()> { - call_handler_method!(self, PlugEnergyMonitoringHandler::on) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::on + ) } pub async fn off(&self) -> PyResult<()> { - call_handler_method!(self, PlugEnergyMonitoringHandler::off) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::off + ) } pub async fn device_reset(&self) -> PyResult<()> { - call_handler_method!(self, PlugEnergyMonitoringHandler::device_reset) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::device_reset, + ) } pub async fn get_device_info(&self) -> PyResult { - call_handler_method!(self, PlugEnergyMonitoringHandler::get_device_info) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::get_device_info, + ) } pub async fn get_device_info_json(&self) -> PyResult> { - let result = call_handler_method!(self, PlugEnergyMonitoringHandler::get_device_info_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::get_device_info_json, + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } pub async fn get_device_usage(&self) -> PyResult { - call_handler_method!(self, PlugEnergyMonitoringHandler::get_device_usage) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::get_device_usage, + ) } pub async fn get_current_power(&self) -> PyResult { - call_handler_method!(self, PlugEnergyMonitoringHandler::get_current_power) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::get_current_power, + ) } pub async fn get_energy_usage(&self) -> PyResult { - call_handler_method!(self, PlugEnergyMonitoringHandler::get_energy_usage) + let handler = self.handler.clone(); + call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::get_energy_usage, + ) } #[pyo3(signature = (interval, start_date, end_date=None))] @@ -95,8 +129,12 @@ impl PyPlugEnergyMonitoringHandler { PyEnergyDataInterval::Monthly => EnergyDataInterval::Monthly { start_date }, }; - let result = - call_handler_method!(self, PlugEnergyMonitoringHandler::get_energy_data, interval)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + PlugEnergyMonitoringHandler::get_energy_data, + interval + )?; Ok(result) } } diff --git a/tapo-py/src/handlers/plug_handler.rs b/tapo-py/src/handlers/plug_handler.rs index 8969649..2bb53a2 100644 --- a/tapo-py/src/handlers/plug_handler.rs +++ b/tapo-py/src/handlers/plug_handler.rs @@ -1,10 +1,11 @@ +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use pyo3::prelude::*; use pyo3::types::PyDict; use tapo::responses::{DeviceInfoPlugResult, DeviceUsageResult}; use tapo::PlugHandler; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::call_handler_method; use crate::errors::ErrorWrapper; @@ -12,13 +13,13 @@ use crate::errors::ErrorWrapper; #[derive(Clone)] #[pyclass(name = "PlugHandler")] pub struct PyPlugHandler { - handler: Arc>, + handler: Arc>, } impl PyPlugHandler { pub fn new(handler: PlugHandler) -> Self { Self { - handler: Arc::new(Mutex::new(handler)), + handler: Arc::new(RwLock::new(handler)), } } } @@ -26,31 +27,45 @@ impl PyPlugHandler { #[pymethods] impl PyPlugHandler { pub async fn refresh_session(&self) -> PyResult<()> { - call_handler_method!(self, PlugHandler::refresh_session, discard_result) + let handler = self.handler.clone(); + call_handler_method!( + handler.write().await.deref_mut(), + PlugHandler::refresh_session, + discard_result + ) } pub async fn on(&self) -> PyResult<()> { - call_handler_method!(self, PlugHandler::on) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), PlugHandler::on) } pub async fn off(&self) -> PyResult<()> { - call_handler_method!(self, PlugHandler::off) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), PlugHandler::off) } pub async fn device_reset(&self) -> PyResult<()> { - call_handler_method!(self, PlugHandler::device_reset) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), PlugHandler::device_reset) } pub async fn get_device_info(&self) -> PyResult { - call_handler_method!(self, PlugHandler::get_device_info) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), PlugHandler::get_device_info) } pub async fn get_device_info_json(&self) -> PyResult> { - let result = call_handler_method!(self, PlugHandler::get_device_info_json)?; + let handler = self.handler.clone(); + let result = call_handler_method!( + handler.read().await.deref(), + PlugHandler::get_device_info_json + )?; Python::with_gil(|py| tapo::python::serde_object_to_py_dict(py, &result)) } pub async fn get_device_usage(&self) -> PyResult { - call_handler_method!(self, PlugHandler::get_device_usage) + let handler = self.handler.clone(); + call_handler_method!(handler.read().await.deref(), PlugHandler::get_device_usage) } } diff --git a/tapo-py/src/runtime.rs b/tapo-py/src/runtime.rs index 63f4601..ce31e6d 100644 --- a/tapo-py/src/runtime.rs +++ b/tapo-py/src/runtime.rs @@ -25,15 +25,12 @@ macro_rules! call_handler_constructor { #[macro_export] macro_rules! call_handler_method { - ($self:expr, $method:path) => (call_handler_method!($self, $method,)); - ($self:expr, $method:path, discard_result) => (call_handler_method!($self, $method, discard_result,)); - ($self:expr, $method:path, $($param:expr),*) => {{ - let handler = $self.handler.clone(); + ($handler:expr, $method:path) => (call_handler_method!($handler, $method,)); + ($handler:expr, $method:path, discard_result) => (call_handler_method!($handler, $method, discard_result,)); + ($handler:expr, $method:path, $($param:expr),*) => {{ let result = $crate::runtime::tokio() .spawn(async move { - let mut handler = handler.lock().await; - - let result = $method(&mut handler, $($param),*) + let result = $method($handler, $($param),*) .await .map_err(ErrorWrapper)?; @@ -45,13 +42,10 @@ macro_rules! call_handler_method { Ok::<_, PyErr>(result) }}; - ($self:expr, $method:path, discard_result, $($param:expr),*) => {{ - let handler = $self.handler.clone(); + ($handler:expr, $method:path, discard_result, $($param:expr),*) => {{ let result = $crate::runtime::tokio() .spawn(async move { - let mut handler = handler.lock().await; - - $method(&mut handler, $($param),*) + $method($handler, $($param),*) .await .map_err(ErrorWrapper)?;