Skip to content

Commit

Permalink
feat(cli): graceful period for stream interruption (#3969)
Browse files Browse the repository at this point in the history
  • Loading branch information
galibey authored Apr 26, 2024
1 parent 4da0cd9 commit 31bdbee
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ smartengine = ["fluvio-smartengine/default"]
producer-file-io = ["fluvio-cli-common/file-records"]

[dependencies]

async-channel = { workspace = true }
async-net = { workspace = true }
async-trait = { workspace = true }
anyhow = { workspace = true }
Expand Down
31 changes: 22 additions & 9 deletions crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use cmd::ConsumeOpt;

mod cmd {

use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{UNIX_EPOCH, Duration};
use std::{io::Error as IoError, path::PathBuf};
use std::io::{self, ErrorKind, Stdout};
Expand All @@ -21,6 +22,7 @@ mod cmd {
use std::sync::Arc;

use fluvio_protocol::link::ErrorCode;
use futures_util::{Stream, StreamExt};
use tracing::{debug, trace, instrument};
use clap::{Parser, ValueEnum};
use futures::{select, FutureExt};
Expand All @@ -40,7 +42,6 @@ mod cmd {
use fluvio_spu_schema::server::smartmodule::SmartModuleContextData;
use fluvio_protocol::record::NO_TIMESTAMP;
use fluvio::metadata::tableformat::TableFormatSpec;
use fluvio_future::io::StreamExt;
use fluvio::{Fluvio, Offset, FluvioError};
use fluvio::consumer::{ConsumerConfigExt, ConsumerStream, OffsetManagementStrategy};

Expand Down Expand Up @@ -299,7 +300,7 @@ mod cmd {
tableformat: Option<TableFormatSpec>,
) -> Result<()> {
trace!(config = ?self, "Starting consumer:");
self.init_ctrlc()?;
let stop_signal = self.init_ctrlc()?;
let offset = self.calculate_offset()?;

let mut builder = ConsumerConfigExt::builder();
Expand Down Expand Up @@ -379,7 +380,10 @@ mod cmd {
debug!("consume config: {:#?}", consume_config);

self.print_status();
let mut stream = fluvio.consumer_with_config(consume_config).await?;
let mut stream = fluvio
.consumer_with_config(consume_config)
.await?
.take_until(stop_signal.recv());
self.consume_records_stream(&mut stream, tableformat)
.await?;

Expand All @@ -388,8 +392,8 @@ mod cmd {
}

if self.consumer.is_some() {
stream.offset_commit()?;
stream.offset_flush().await?;
stream.get_mut().offset_commit()?;
stream.get_mut().offset_flush().await?;
}

Ok(())
Expand All @@ -398,7 +402,7 @@ mod cmd {
/// Consume records as a stream, waiting for new records to arrive
async fn consume_records_stream(
&self,
stream: &mut (impl ConsumerStream<Item = Result<Record, ErrorCode>> + Unpin),
stream: &mut (impl Stream<Item = Result<Record, ErrorCode>> + Unpin),
tableformat: Option<TableFormatSpec>,
) -> Result<()> {
let maybe_potential_end_offset: Option<u32> = self.end;
Expand Down Expand Up @@ -702,10 +706,19 @@ mod cmd {
}

/// Initialize Ctrl-C event handler
fn init_ctrlc(&self) -> Result<()> {
fn init_ctrlc(&self) -> Result<async_channel::Receiver<()>> {
let (s, r) = async_channel::bounded(1);
let invoked = AtomicBool::new(false);
let result = ctrlc::set_handler(move || {
debug!("detected control c, setting end");
std::process::exit(0);
if invoked.load(Ordering::SeqCst) {
std::process::exit(0);
} else {
invoked.store(true, Ordering::SeqCst);
let _ = s.try_send(());
std::thread::sleep(Duration::from_secs(2));
std::process::exit(0);
}
});

if let Err(err) = result {
Expand All @@ -715,7 +728,7 @@ mod cmd {
)
.into());
}
Ok(())
Ok(r)
}

/// Calculate the Offset to use with the consumer based on the provided offset number
Expand Down

0 comments on commit 31bdbee

Please sign in to comment.