From 5b3cc6e6e668bdcf1eaf6ed96976bc85d4e9f4bd Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Thu, 4 Apr 2024 17:08:32 -0300 Subject: [PATCH] Updated tiers watcher to use tokio channel (#26) --- proxy/src/tiers.rs | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/proxy/src/tiers.rs b/proxy/src/tiers.rs index 7d1fc2a..e543348 100644 --- a/proxy/src/tiers.rs +++ b/proxy/src/tiers.rs @@ -1,9 +1,10 @@ use async_trait::async_trait; -use notify::{PollWatcher, RecursiveMode, Watcher}; +use notify::{Event, PollWatcher, RecursiveMode, Watcher}; use pingora::{server::ShutdownWatch, services::background::BackgroundService}; use serde_json::Value; use std::error::Error; use std::{fs, sync::Arc}; +use tokio::runtime::{Handle, Runtime}; use tracing::{error, info, warn}; use crate::{config::Config, State, Tier}; @@ -48,13 +49,22 @@ impl BackgroundService for TierBackgroundService { return; } - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, mut rx) = tokio::sync::mpsc::channel::(1); let watcher_config = notify::Config::default() .with_compare_contents(true) .with_poll_interval(self.config.proxy_tiers_poll_interval); - let watcher_result = PollWatcher::new(tx, watcher_config); + let watcher_result = PollWatcher::new( + move |res| { + if let Ok(event) = res { + runtime_handle() + .block_on(async { tx.send(event).await }) + .unwrap(); + } + }, + watcher_config, + ); if let Err(err) = watcher_result { error!(error = err.to_string(), "error to watcher tier"); return; @@ -67,18 +77,26 @@ impl BackgroundService for TierBackgroundService { return; } - for result in rx { - match result { - Ok(_event) => { - if let Err(err) = self.update_tiers().await { - error!(error = err.to_string(), "error to update tiers"); - continue; - } - - info!("tiers modified"); + loop { + let result = rx.recv().await; + if result.is_some() { + if let Err(err) = self.update_tiers().await { + error!(error = err.to_string(), "error to update tiers"); + continue; } - Err(err) => error!(error = err.to_string(), "watch error"), + + info!("tiers modified"); } } } } + +fn runtime_handle() -> Handle { + match Handle::try_current() { + Ok(h) => h, + Err(_) => { + let rt = Runtime::new().unwrap(); + rt.handle().clone() + } + } +}