diff --git a/Cargo.lock b/Cargo.lock index 956214f447019..3f225afd352b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10721,11 +10721,16 @@ dependencies = [ name = "vector-tap" version = "0.1.0" dependencies = [ + "chrono", "colored", + "futures-util", + "portpicker", + "serde_json", "serde_yaml 0.9.34+deprecated", "snafu 0.7.5", "tokio", "tokio-stream", + "tokio-tungstenite 0.20.1", "tracing 0.1.40", "url", "vector-api-client", diff --git a/lib/vector-tap/Cargo.toml b/lib/vector-tap/Cargo.toml index 2e7182345a73d..305a12d780eba 100644 --- a/lib/vector-tap/Cargo.toml +++ b/lib/vector-tap/Cargo.toml @@ -12,6 +12,17 @@ serde_yaml = { version = "0.9.34", default-features = false } snafu = { version = "0.7.5", default-features = false } tokio = { version = "1.38.0", default-features = false, features = ["time"] } tokio-stream = { version = "0.1.15", default-features = false, features = ["sync"] } +tokio-tungstenite = { version = "0.20.1", default-features = false } tracing = { version = "0.1.34", default-features = false } url = { version = "2.5.2", default-features = false } -vector-api-client = {path = "../vector-api-client" } +vector-api-client = { path = "../vector-api-client" } +futures-util = "0.3.30" + +[dev-dependencies] +chrono = { workspace = true } +portpicker = { path = "../portpicker" } +serde_json = { workspace = true } +tokio = { version = "1.38.0", default-features = false, features = ["test-util"] } + + + diff --git a/lib/vector-tap/src/lib.rs b/lib/vector-tap/src/lib.rs index fa06f5ad9992a..3e1eeb9b25703 100644 --- a/lib/vector-tap/src/lib.rs +++ b/lib/vector-tap/src/lib.rs @@ -1,13 +1,14 @@ #![deny(warnings)] +#[macro_use] extern crate tracing; -use std::time::{Duration, Instant}; use std::{borrow::Cow, collections::BTreeMap}; use colored::{ColoredString, Colorize}; -use snafu::Snafu; +use tokio::sync::mpsc as tokio_mpsc; use tokio::time::timeout; +use tokio::time::{Duration, Instant}; use tokio_stream::StreamExt; use url::Url; @@ -95,11 +96,16 @@ impl EventFormatter { } } +#[derive(Clone, Debug)] +pub enum OutputChannel { + Stdout(EventFormatter), + AsyncChannel(tokio_mpsc::Sender>), +} + /// Error type for DNS message parsing -#[derive(Debug, Snafu)] +#[derive(Debug)] pub enum TapExecutorError { - #[snafu(display("[tap] Couldn't connect to API via WebSockets"))] - ConnectionFailure, + ConnectionFailure(tokio_tungstenite::tungstenite::Error), GraphQLError, } @@ -108,7 +114,7 @@ pub struct TapRunner<'a> { url: &'a Url, input_patterns: Vec, output_patterns: Vec, - formatter: &'a EventFormatter, + output_channel: &'a OutputChannel, format: TapEncodingFormat, } @@ -117,20 +123,18 @@ impl<'a> TapRunner<'a> { url: &'a Url, input_patterns: Vec, output_patterns: Vec, - formatter: &'a EventFormatter, + output_channel: &'a OutputChannel, format: TapEncodingFormat, ) -> Self { TapRunner { url, input_patterns, output_patterns, - formatter, + output_channel, format, } } - #[allow(clippy::print_stdout)] - #[allow(clippy::print_stderr)] pub async fn run_tap( &self, interval: i64, @@ -140,10 +144,7 @@ impl<'a> TapRunner<'a> { ) -> Result<(), TapExecutorError> { let subscription_client = connect_subscription_client((*self.url).clone()) .await - .map_err(|error| { - eprintln!("[tap] Couldn't connect to API via WebSockets: {error}"); - TapExecutorError::ConnectionFailure - })?; + .map_err(TapExecutorError::ConnectionFailure)?; tokio::pin! { let stream = subscription_client.output_events_by_component_id_patterns_subscription( @@ -171,57 +172,187 @@ impl<'a> TapRunner<'a> { match message { Ok(Some(Some(res))) => { if let Some(d) = res.data { - for tap_event in d.output_events_by_component_id_patterns.iter() { - match tap_event { - GraphQLTapOutputEvent::Log(ev) => { - println!( - "{}", - self.formatter.format( - ev.component_id.as_ref(), - ev.component_kind.as_ref(), - ev.component_type.as_ref(), - ev.string.as_ref() - ) - ); - } - GraphQLTapOutputEvent::Metric(ev) => { - println!( - "{}", - self.formatter.format( - ev.component_id.as_ref(), - ev.component_kind.as_ref(), - ev.component_type.as_ref(), - ev.string.as_ref() - ) - ); - } - GraphQLTapOutputEvent::Trace(ev) => { - println!( - "{}", - self.formatter.format( - ev.component_id.as_ref(), - ev.component_kind.as_ref(), - ev.component_type.as_ref(), - ev.string.as_ref() - ) - ); - } - GraphQLTapOutputEvent::EventNotification(ev) => { - if !quiet { - eprintln!("{}", ev.message); - } + let output_events: Vec = d + .output_events_by_component_id_patterns + .into_iter() + .filter(|event| { + !matches!( + (quiet, event), + (true, GraphQLTapOutputEvent::EventNotification(_)) + ) + }) + .collect(); + + match &self.output_channel { + OutputChannel::Stdout(formatter) => { + self.output_event_stdout(&output_events, formatter); + } + OutputChannel::AsyncChannel(sender_tx) => { + if sender_tx.send(output_events).await.is_err() { + debug!("Could not send events"); } } } } } - Err(_) => { - // If the stream times out, that indicates the duration specified by the user - // has elapsed. We should exit gracefully. - return Ok(()); + Err(_) => + // If the stream times out, that indicates the duration specified by the user + // has elapsed. We should exit gracefully. + { + return Ok(()) } Ok(_) => return Err(TapExecutorError::GraphQLError), } } } + + #[allow(clippy::print_stdout)] + fn output_event_stdout( + &self, + output_events: &[GraphQLTapOutputEvent], + formatter: &EventFormatter, + ) { + for tap_event in output_events.iter() { + match tap_event { + GraphQLTapOutputEvent::Log(ev) => { + println!( + "{}", + formatter.format( + ev.component_id.as_ref(), + ev.component_kind.as_ref(), + ev.component_type.as_ref(), + ev.string.as_ref() + ) + ); + } + GraphQLTapOutputEvent::Metric(ev) => { + println!( + "{}", + formatter.format( + ev.component_id.as_ref(), + ev.component_kind.as_ref(), + ev.component_type.as_ref(), + ev.string.as_ref() + ) + ); + } + GraphQLTapOutputEvent::Trace(ev) => { + println!( + "{}", + formatter.format( + ev.component_id.as_ref(), + ev.component_kind.as_ref(), + ev.component_type.as_ref(), + ev.string.as_ref() + ) + ); + } + #[allow(clippy::print_stderr)] + GraphQLTapOutputEvent::EventNotification(ev) => { + eprintln!("{}", ev.message); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::net::{IpAddr, Ipv4Addr}; + + use chrono::Utc; + use futures_util::sink::SinkExt; + use futures_util::stream::StreamExt; + use serde_json::Value; + use tokio::net::TcpListener; + use tokio::time::sleep; + use tokio_tungstenite::accept_async; + use tokio_tungstenite::tungstenite::Message; + + use portpicker::pick_unused_port; + + use super::*; + + #[tokio::test(start_paused = true)] + async fn test_async_output_channel() { + let component_id = "test-component-id"; + let component_type = "test-component-type"; + let message = "test-message"; + let timestamp = Utc::now(); + let string_encoding = "test-str"; + + // Start a local WebSocket server to mimic Vector GraphQL API + let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); + let port = pick_unused_port(ip_addr); + let addr = format!("{ip_addr}:{port}"); + + let listener = TcpListener::bind(&addr).await.unwrap(); + let server = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let mut ws_stream = accept_async(stream).await.unwrap(); + if let Some(Ok(Message::Text(msg))) = ws_stream.next().await { + let client_init_msg: Value = + serde_json::from_str(&msg).expect("Init message should be in JSON format"); + let subscription_id = &client_init_msg["id"]; + + let message_to_send = format!( + "{{\ + \"type\":\"data\",\ + \"id\":{subscription_id},\ + \"payload\":{{\ + \"data\":{{\ + \"outputEventsByComponentIdPatterns\":[{{\ + \"__typename\":\"Log\",\ + \"componentId\":\"{component_id}\",\ + \"componentType\":\"{component_type}\",\ + \"componentKind\":\"source\",\ + \"message\":\"{message}\",\ + \"timestamp\":\"{timestamp}\",\ + \"string\":\"{string_encoding}\"\ + }}]\ + }}\ + }}\ + }}", + ); + + // Send 2 messages to client, mimicking 3 second interval + loop { + ws_stream + .send(Message::Text(message_to_send.clone())) + .await + .unwrap(); + sleep(Duration::from_secs(3)).await; + } + } + }); + + let (output_tx, mut output_rx) = tokio_mpsc::channel(10); + let url = Url::parse(&format!("ws://{addr}")).unwrap(); + let output_channel = OutputChannel::AsyncChannel(output_tx); + + let tap_runner = TapRunner::new( + &url, + vec![], + vec![], + &output_channel, + TapEncodingFormat::Json, + ); + assert!(tap_runner.run_tap(0, 0, Some(5000), false).await.is_ok()); + + let mut num_recv = 0; + while let Ok(events) = output_rx.try_recv() { + assert_eq!(events.len(), 1); + if let GraphQLTapOutputEvent::Log(ev) = &events[0] { + num_recv += 1; + assert_eq!(ev.component_id, component_id); + assert_eq!(ev.component_type, component_type); + assert_eq!(ev.message, Some(message.to_string())); + assert_eq!(ev.timestamp, Some(timestamp)); + assert_eq!(ev.string, string_encoding); + } + } + assert_eq!(num_recv, 2); + + server.abort(); + } } diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 7eefab29fe9a1..9a957584ff233 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -1,7 +1,7 @@ use std::time::Duration; use vector_lib::api_client::Client; -use vector_lib::tap::{EventFormatter, TapExecutorError, TapRunner}; +use vector_lib::tap::{EventFormatter, OutputChannel, TapRunner}; use crate::signal::{SignalRx, SignalTo}; @@ -38,12 +38,12 @@ pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::Ex /// Observe event flow from specified components pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode { let subscription_url = opts.web_socket_url(); - let formatter = EventFormatter::new(opts.meta, opts.format); + let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format)); let tap_runner = TapRunner::new( &subscription_url, opts.inputs_of.clone(), opts.outputs_patterns().clone(), - &formatter, + &output_channel, opts.format, ); @@ -61,11 +61,14 @@ pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitC Ok(_) => { break; } - Err(TapExecutorError::ConnectionFailure) | Err(TapExecutorError::GraphQLError) => { + Err(tap_executor_error) => { if !opts.no_reconnect { #[allow(clippy::print_stderr)] { - eprintln!("[tap] Connection failed. Reconnecting in {:?} seconds.", RECONNECT_DELAY / 1000); + eprintln!( + "[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.", + tap_executor_error, + RECONNECT_DELAY / 1000); } tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; }