Skip to content

Commit

Permalink
feat(tap): Implement async output channel type for vector-tap lib (#2…
Browse files Browse the repository at this point in the history
…0876)

* implement async output channel type

* implement async output channel type

* rename changelog file

* added suggested changes

* remove/move 'allow(clippy...)' macros

* formatting
  • Loading branch information
ArunPiduguDD committed Jul 17, 2024
1 parent e9b4fe1 commit ef4f175
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 63 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion lib/vector-tap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ serde_yaml = { version = "0.9.34", default-features = false }
snafu = { version = "0.7.5", default-features = false }
tokio = { version = "1.38.0", default-features = false, features = ["time"] }
tokio-stream = { version = "0.1.15", default-features = false, features = ["sync"] }
tokio-tungstenite = { version = "0.20.1", default-features = false }
tracing = { version = "0.1.34", default-features = false }
url = { version = "2.5.2", default-features = false }
vector-api-client = {path = "../vector-api-client" }
vector-api-client = { path = "../vector-api-client" }
futures-util = "0.3.30"

[dev-dependencies]
chrono = { workspace = true }
portpicker = { path = "../portpicker" }
serde_json = { workspace = true }
tokio = { version = "1.38.0", default-features = false, features = ["test-util"] }



245 changes: 188 additions & 57 deletions lib/vector-tap/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#![deny(warnings)]

#[macro_use]
extern crate tracing;

use std::time::{Duration, Instant};
use std::{borrow::Cow, collections::BTreeMap};

use colored::{ColoredString, Colorize};
use snafu::Snafu;
use tokio::sync::mpsc as tokio_mpsc;
use tokio::time::timeout;
use tokio::time::{Duration, Instant};
use tokio_stream::StreamExt;
use url::Url;

Expand Down Expand Up @@ -95,11 +96,16 @@ impl EventFormatter {
}
}

#[derive(Clone, Debug)]
pub enum OutputChannel {
Stdout(EventFormatter),
AsyncChannel(tokio_mpsc::Sender<Vec<GraphQLTapOutputEvent>>),
}

/// Error type for DNS message parsing
#[derive(Debug, Snafu)]
#[derive(Debug)]
pub enum TapExecutorError {
#[snafu(display("[tap] Couldn't connect to API via WebSockets"))]
ConnectionFailure,
ConnectionFailure(tokio_tungstenite::tungstenite::Error),
GraphQLError,
}

Expand All @@ -108,7 +114,7 @@ pub struct TapRunner<'a> {
url: &'a Url,
input_patterns: Vec<String>,
output_patterns: Vec<String>,
formatter: &'a EventFormatter,
output_channel: &'a OutputChannel,
format: TapEncodingFormat,
}

Expand All @@ -117,20 +123,18 @@ impl<'a> TapRunner<'a> {
url: &'a Url,
input_patterns: Vec<String>,
output_patterns: Vec<String>,
formatter: &'a EventFormatter,
output_channel: &'a OutputChannel,
format: TapEncodingFormat,
) -> Self {
TapRunner {
url,
input_patterns,
output_patterns,
formatter,
output_channel,
format,
}
}

#[allow(clippy::print_stdout)]
#[allow(clippy::print_stderr)]
pub async fn run_tap(
&self,
interval: i64,
Expand All @@ -140,10 +144,7 @@ impl<'a> TapRunner<'a> {
) -> Result<(), TapExecutorError> {
let subscription_client = connect_subscription_client((*self.url).clone())
.await
.map_err(|error| {
eprintln!("[tap] Couldn't connect to API via WebSockets: {error}");
TapExecutorError::ConnectionFailure
})?;
.map_err(TapExecutorError::ConnectionFailure)?;

tokio::pin! {
let stream = subscription_client.output_events_by_component_id_patterns_subscription(
Expand Down Expand Up @@ -171,57 +172,187 @@ impl<'a> TapRunner<'a> {
match message {
Ok(Some(Some(res))) => {
if let Some(d) = res.data {
for tap_event in d.output_events_by_component_id_patterns.iter() {
match tap_event {
GraphQLTapOutputEvent::Log(ev) => {
println!(
"{}",
self.formatter.format(
ev.component_id.as_ref(),
ev.component_kind.as_ref(),
ev.component_type.as_ref(),
ev.string.as_ref()
)
);
}
GraphQLTapOutputEvent::Metric(ev) => {
println!(
"{}",
self.formatter.format(
ev.component_id.as_ref(),
ev.component_kind.as_ref(),
ev.component_type.as_ref(),
ev.string.as_ref()
)
);
}
GraphQLTapOutputEvent::Trace(ev) => {
println!(
"{}",
self.formatter.format(
ev.component_id.as_ref(),
ev.component_kind.as_ref(),
ev.component_type.as_ref(),
ev.string.as_ref()
)
);
}
GraphQLTapOutputEvent::EventNotification(ev) => {
if !quiet {
eprintln!("{}", ev.message);
}
let output_events: Vec<GraphQLTapOutputEvent> = d
.output_events_by_component_id_patterns
.into_iter()
.filter(|event| {
!matches!(
(quiet, event),
(true, GraphQLTapOutputEvent::EventNotification(_))
)
})
.collect();

match &self.output_channel {
OutputChannel::Stdout(formatter) => {
self.output_event_stdout(&output_events, formatter);
}
OutputChannel::AsyncChannel(sender_tx) => {
if sender_tx.send(output_events).await.is_err() {
debug!("Could not send events");
}
}
}
}
}
Err(_) => {
// If the stream times out, that indicates the duration specified by the user
// has elapsed. We should exit gracefully.
return Ok(());
Err(_) =>
// If the stream times out, that indicates the duration specified by the user
// has elapsed. We should exit gracefully.
{
return Ok(())
}
Ok(_) => return Err(TapExecutorError::GraphQLError),
}
}
}

#[allow(clippy::print_stdout)]
fn output_event_stdout(
&self,
output_events: &[GraphQLTapOutputEvent],
formatter: &EventFormatter,
) {
for tap_event in output_events.iter() {
match tap_event {
GraphQLTapOutputEvent::Log(ev) => {
println!(
"{}",
formatter.format(
ev.component_id.as_ref(),
ev.component_kind.as_ref(),
ev.component_type.as_ref(),
ev.string.as_ref()
)
);
}
GraphQLTapOutputEvent::Metric(ev) => {
println!(
"{}",
formatter.format(
ev.component_id.as_ref(),
ev.component_kind.as_ref(),
ev.component_type.as_ref(),
ev.string.as_ref()
)
);
}
GraphQLTapOutputEvent::Trace(ev) => {
println!(
"{}",
formatter.format(
ev.component_id.as_ref(),
ev.component_kind.as_ref(),
ev.component_type.as_ref(),
ev.string.as_ref()
)
);
}
#[allow(clippy::print_stderr)]
GraphQLTapOutputEvent::EventNotification(ev) => {
eprintln!("{}", ev.message);
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};

use chrono::Utc;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use serde_json::Value;
use tokio::net::TcpListener;
use tokio::time::sleep;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;

use portpicker::pick_unused_port;

use super::*;

#[tokio::test(start_paused = true)]
async fn test_async_output_channel() {
let component_id = "test-component-id";
let component_type = "test-component-type";
let message = "test-message";
let timestamp = Utc::now();
let string_encoding = "test-str";

// Start a local WebSocket server to mimic Vector GraphQL API
let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
let port = pick_unused_port(ip_addr);
let addr = format!("{ip_addr}:{port}");

let listener = TcpListener::bind(&addr).await.unwrap();
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut ws_stream = accept_async(stream).await.unwrap();
if let Some(Ok(Message::Text(msg))) = ws_stream.next().await {
let client_init_msg: Value =
serde_json::from_str(&msg).expect("Init message should be in JSON format");
let subscription_id = &client_init_msg["id"];

let message_to_send = format!(
"{{\
\"type\":\"data\",\
\"id\":{subscription_id},\
\"payload\":{{\
\"data\":{{\
\"outputEventsByComponentIdPatterns\":[{{\
\"__typename\":\"Log\",\
\"componentId\":\"{component_id}\",\
\"componentType\":\"{component_type}\",\
\"componentKind\":\"source\",\
\"message\":\"{message}\",\
\"timestamp\":\"{timestamp}\",\
\"string\":\"{string_encoding}\"\
}}]\
}}\
}}\
}}",
);

// Send 2 messages to client, mimicking 3 second interval
loop {
ws_stream
.send(Message::Text(message_to_send.clone()))
.await
.unwrap();
sleep(Duration::from_secs(3)).await;
}
}
});

let (output_tx, mut output_rx) = tokio_mpsc::channel(10);
let url = Url::parse(&format!("ws://{addr}")).unwrap();
let output_channel = OutputChannel::AsyncChannel(output_tx);

let tap_runner = TapRunner::new(
&url,
vec![],
vec![],
&output_channel,
TapEncodingFormat::Json,
);
assert!(tap_runner.run_tap(0, 0, Some(5000), false).await.is_ok());

let mut num_recv = 0;
while let Ok(events) = output_rx.try_recv() {
assert_eq!(events.len(), 1);
if let GraphQLTapOutputEvent::Log(ev) = &events[0] {
num_recv += 1;
assert_eq!(ev.component_id, component_id);
assert_eq!(ev.component_type, component_type);
assert_eq!(ev.message, Some(message.to_string()));
assert_eq!(ev.timestamp, Some(timestamp));
assert_eq!(ev.string, string_encoding);
}
}
assert_eq!(num_recv, 2);

server.abort();
}
}
13 changes: 8 additions & 5 deletions src/tap/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use vector_lib::api_client::Client;
use vector_lib::tap::{EventFormatter, TapExecutorError, TapRunner};
use vector_lib::tap::{EventFormatter, OutputChannel, TapRunner};

use crate::signal::{SignalRx, SignalTo};

Expand Down Expand Up @@ -38,12 +38,12 @@ pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::Ex
/// Observe event flow from specified components
pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
let subscription_url = opts.web_socket_url();
let formatter = EventFormatter::new(opts.meta, opts.format);
let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format));
let tap_runner = TapRunner::new(
&subscription_url,
opts.inputs_of.clone(),
opts.outputs_patterns().clone(),
&formatter,
&output_channel,
opts.format,
);

Expand All @@ -61,11 +61,14 @@ pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitC
Ok(_) => {
break;
}
Err(TapExecutorError::ConnectionFailure) | Err(TapExecutorError::GraphQLError) => {
Err(tap_executor_error) => {
if !opts.no_reconnect {
#[allow(clippy::print_stderr)]
{
eprintln!("[tap] Connection failed. Reconnecting in {:?} seconds.", RECONNECT_DELAY / 1000);
eprintln!(
"[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.",
tap_executor_error,
RECONNECT_DELAY / 1000);
}
tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
}
Expand Down

0 comments on commit ef4f175

Please sign in to comment.