Skip to content

Commit

Permalink
chore: fix servesink (#1999)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Aug 24, 2024
1 parent ae02243 commit 86c381f
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 33 deletions.
7 changes: 2 additions & 5 deletions rust/monovertex/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,10 @@ impl Forwarder {
// Applies transformation to the messages if transformer is present
// we concurrently apply transformation to all the messages.
async fn apply_transformer(&self, messages: Vec<Message>) -> Result<Vec<Message>> {
let transformer_client;
if let Some(trf_client) = &self.transformer_client {
transformer_client = trf_client;
} else {
let Some(transformer_client) = &self.transformer_client else {
// return early if there is no transformer
return Ok(messages);
}
};

let start_time = tokio::time::Instant::now();
let mut jh = JoinSet::new();
Expand Down
4 changes: 1 addition & 3 deletions rust/monovertex/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ impl SourceClient {
Ok(self.client.ack_fn(request).await?.into_inner())
}

#[allow(dead_code)]
// TODO: remove dead_code
pub(crate) async fn pending_fn(&mut self) -> Result<i64> {
let request = Request::new(());
let response = self
Expand All @@ -114,7 +112,7 @@ impl SourceClient {
.await?
.into_inner()
.result
.map_or(0, |r| r.count);
.map_or(-1, |r| r.count); // default to -1(unavailable)
Ok(response)
}

Expand Down
200 changes: 175 additions & 25 deletions rust/servesink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use reqwest::Client;
use tracing::{error, warn};
use tracing_subscriber::prelude::*;

const NUMAFLOW_CALLBACK_URL_HEADER: &str = "X-Numaflow-Callback-Url";
const NUMAFLOW_ID_HEADER: &str = "X-Numaflow-Id";

/// servesink is a Numaflow Sink which forwards the payload to the Numaflow serving URL.
pub async fn servesink() -> Result<(), Box<dyn Error + Send + Sync>> {
tracing_subscriber::registry()
.with(
Expand All @@ -14,14 +18,14 @@ pub async fn servesink() -> Result<(), Box<dyn Error + Send + Sync>> {
.with(tracing_subscriber::fmt::layer().with_ansi(false))
.init();

sink::Server::new(Logger::new()).start().await
sink::Server::new(ServeSink::new()).start().await
}

struct Logger {
struct ServeSink {
client: Client,
}

impl Logger {
impl ServeSink {
fn new() -> Self {
Self {
client: Client::new(),
Expand All @@ -30,43 +34,189 @@ impl Logger {
}

#[tonic::async_trait]
impl sink::Sinker for Logger {
impl sink::Sinker for ServeSink {
async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
let mut responses: Vec<Response> = Vec::new();

while let Some(datum) = input.recv().await {
// do something better, but for now let's just log it.
// please note that `from_utf8` is working because the input in this
// example uses utf-8 data.
let response = match std::str::from_utf8(&datum.value) {
Ok(_v) => {
// record the response
Response::ok(datum.id)
// if the callback url is absent, ignore the request
let url = match datum.headers.get(NUMAFLOW_CALLBACK_URL_HEADER) {
Some(url) => url,
None => {
warn!(
"Missing {} header, Ignoring the request",
NUMAFLOW_CALLBACK_URL_HEADER
);
responses.push(Response::ok(datum.id));
continue;
}
Err(e) => Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)),
};
// return the responses
responses.push(response);
let Some(url) = datum.headers.get("X-Numaflow-Callback-Url") else {
warn!("X-Numaflow-Callback-Url header is not found in the payload");
continue;
};
let Some(numaflow_id) = datum.headers.get("X-Numaflow-Id") else {
warn!("X-Numaflow-Id header is not found in the payload");
continue;

// if the numaflow id is absent, ignore the request
let numaflow_id = match datum.headers.get(NUMAFLOW_ID_HEADER) {
Some(id) => id,
None => {
warn!(
"Missing {} header, Ignoring the request",
NUMAFLOW_ID_HEADER
);
responses.push(Response::ok(datum.id));
continue;
}
};

let resp = self
.client
.post(format!("{}_{}", url, "save"))
.header("X-Numaflow-Id", numaflow_id)
.header(NUMAFLOW_ID_HEADER, numaflow_id)
.header("id", numaflow_id)
.body(datum.value)
.send()
.await;
if let Err(e) = resp {
error!(error=?e, url=url, "Sending result to numaserve")
}

let response = match resp {
Ok(_) => Response::ok(datum.id),
Err(e) => {
error!("Sending result to serving URL {:?}", e);
Response::failure(datum.id, format!("Failed to send: {}", e))
}
};

responses.push(response);
}
responses
}
}

#[cfg(test)]
mod tests {
use super::*;
use numaflow::sink::{SinkRequest, Sinker};
use std::collections::HashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::mpsc;

#[tokio::test]
async fn test_serve_sink_without_url_header() {
let serve_sink = ServeSink::new();
let (tx, rx) = mpsc::channel(1);

let mut headers = HashMap::new();
headers.insert(NUMAFLOW_ID_HEADER.to_string(), "12345".to_string());

let request = SinkRequest {
keys: vec![],
id: "1".to_string(),
value: b"test".to_vec(),
watermark: Default::default(),
headers,
event_time: Default::default(),
};

tx.send(request).await.unwrap();
drop(tx); // Close the sender to end the stream

let responses = serve_sink.sink(rx).await;
assert_eq!(responses.len(), 1);
assert!(responses[0].success);
}

#[tokio::test]
async fn test_serve_sink_without_id_header() {
let serve_sink = ServeSink::new();
let (tx, rx) = mpsc::channel(1);

let mut headers = HashMap::new();
headers.insert(
NUMAFLOW_CALLBACK_URL_HEADER.to_string(),
"http://localhost:8080".to_string(),
);

let request = SinkRequest {
keys: vec![],
id: "1".to_string(),
value: b"test".to_vec(),
watermark: Default::default(),
headers,
event_time: Default::default(),
};

tx.send(request).await.unwrap();
drop(tx); // Close the sender to end the stream

let responses = serve_sink.sink(rx).await;
assert_eq!(responses.len(), 1);
assert!(responses[0].success);
}

async fn start_server() -> (String, mpsc::Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let addr_str = format!("{}", addr);
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
break;
}
Ok((mut socket, _)) = listener.accept() => {
tokio::spawn(async move {
let mut buffer = [0; 1024];
let _ = socket.read(&mut buffer).await.unwrap();
let request = String::from_utf8_lossy(&buffer[..]);
let response = if request.contains("/error") {
"HTTP/1.1 500 INTERNAL SERVER ERROR\r\n\
content-length: 0\r\n\
\r\n"
} else {
"HTTP/1.1 200 OK\r\n\
content-length: 0\r\n\
\r\n"
};
socket.write_all(response.as_bytes()).await.unwrap();
});
}
}
}
});
(addr_str, shutdown_tx)
}

#[tokio::test]
async fn test_serve_sink() {
let serve_sink = ServeSink::new();

let (addr, shutdown_tx) = start_server().await;

let (tx, rx) = mpsc::channel(1);

let mut headers = HashMap::new();
headers.insert(NUMAFLOW_ID_HEADER.to_string(), "12345".to_string());

headers.insert(
NUMAFLOW_CALLBACK_URL_HEADER.to_string(),
format!("http://{}/sync", addr),
);

let request = SinkRequest {
keys: vec![],
id: "1".to_string(),
value: b"test".to_vec(),
watermark: Default::default(),
headers,
event_time: Default::default(),
};

tx.send(request).await.unwrap();
drop(tx); // Close the sender to end the stream

let responses = serve_sink.sink(rx).await;
assert_eq!(responses.len(), 1);
assert!(responses[0].success);

// Stop the server
shutdown_tx.send(()).await.unwrap();
}
}

0 comments on commit 86c381f

Please sign in to comment.