From 31bdbeeb8a317e37379f6b19ca5b332c25ab8e49 Mon Sep 17 00:00:00 2001 From: Alexander Galibey <48586936+galibey@users.noreply.github.com> Date: Fri, 26 Apr 2024 08:57:44 +0400 Subject: [PATCH] feat(cli): graceful period for stream interruption (#3969) --- Cargo.lock | 1 + crates/fluvio-cli/Cargo.toml | 2 +- crates/fluvio-cli/src/client/consume/mod.rs | 31 +++++++++++++++------ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d233cae5dc..def3513be2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2318,6 +2318,7 @@ name = "fluvio-cli" version = "0.0.0" dependencies = [ "anyhow", + "async-channel 1.9.0", "async-net", "async-trait", "atty", diff --git a/crates/fluvio-cli/Cargo.toml b/crates/fluvio-cli/Cargo.toml index 1846c7ffce..f469356f11 100644 --- a/crates/fluvio-cli/Cargo.toml +++ b/crates/fluvio-cli/Cargo.toml @@ -38,7 +38,7 @@ smartengine = ["fluvio-smartengine/default"] producer-file-io = ["fluvio-cli-common/file-records"] [dependencies] - +async-channel = { workspace = true } async-net = { workspace = true } async-trait = { workspace = true } anyhow = { workspace = true } diff --git a/crates/fluvio-cli/src/client/consume/mod.rs b/crates/fluvio-cli/src/client/consume/mod.rs index a052580c02..cd02464b96 100644 --- a/crates/fluvio-cli/src/client/consume/mod.rs +++ b/crates/fluvio-cli/src/client/consume/mod.rs @@ -13,6 +13,7 @@ pub use cmd::ConsumeOpt; mod cmd { + use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{UNIX_EPOCH, Duration}; use std::{io::Error as IoError, path::PathBuf}; use std::io::{self, ErrorKind, Stdout}; @@ -21,6 +22,7 @@ mod cmd { use std::sync::Arc; use fluvio_protocol::link::ErrorCode; + use futures_util::{Stream, StreamExt}; use tracing::{debug, trace, instrument}; use clap::{Parser, ValueEnum}; use futures::{select, FutureExt}; @@ -40,7 +42,6 @@ mod cmd { use fluvio_spu_schema::server::smartmodule::SmartModuleContextData; use fluvio_protocol::record::NO_TIMESTAMP; use fluvio::metadata::tableformat::TableFormatSpec; - use fluvio_future::io::StreamExt; use fluvio::{Fluvio, Offset, FluvioError}; use fluvio::consumer::{ConsumerConfigExt, ConsumerStream, OffsetManagementStrategy}; @@ -299,7 +300,7 @@ mod cmd { tableformat: Option, ) -> Result<()> { trace!(config = ?self, "Starting consumer:"); - self.init_ctrlc()?; + let stop_signal = self.init_ctrlc()?; let offset = self.calculate_offset()?; let mut builder = ConsumerConfigExt::builder(); @@ -379,7 +380,10 @@ mod cmd { debug!("consume config: {:#?}", consume_config); self.print_status(); - let mut stream = fluvio.consumer_with_config(consume_config).await?; + let mut stream = fluvio + .consumer_with_config(consume_config) + .await? + .take_until(stop_signal.recv()); self.consume_records_stream(&mut stream, tableformat) .await?; @@ -388,8 +392,8 @@ mod cmd { } if self.consumer.is_some() { - stream.offset_commit()?; - stream.offset_flush().await?; + stream.get_mut().offset_commit()?; + stream.get_mut().offset_flush().await?; } Ok(()) @@ -398,7 +402,7 @@ mod cmd { /// Consume records as a stream, waiting for new records to arrive async fn consume_records_stream( &self, - stream: &mut (impl ConsumerStream> + Unpin), + stream: &mut (impl Stream> + Unpin), tableformat: Option, ) -> Result<()> { let maybe_potential_end_offset: Option = self.end; @@ -702,10 +706,19 @@ mod cmd { } /// Initialize Ctrl-C event handler - fn init_ctrlc(&self) -> Result<()> { + fn init_ctrlc(&self) -> Result> { + let (s, r) = async_channel::bounded(1); + let invoked = AtomicBool::new(false); let result = ctrlc::set_handler(move || { debug!("detected control c, setting end"); - std::process::exit(0); + if invoked.load(Ordering::SeqCst) { + std::process::exit(0); + } else { + invoked.store(true, Ordering::SeqCst); + let _ = s.try_send(()); + std::thread::sleep(Duration::from_secs(2)); + std::process::exit(0); + } }); if let Err(err) = result { @@ -715,7 +728,7 @@ mod cmd { ) .into()); } - Ok(()) + Ok(r) } /// Calculate the Offset to use with the consumer based on the provided offset number