diff --git a/http/get_simple/rs/.gitignore b/http/get_simple/rs/.gitignore new file mode 100644 index 0000000..95a3a86 --- /dev/null +++ b/http/get_simple/rs/.gitignore @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +/target +Cargo.lock diff --git a/http/get_simple/rs/Cargo.toml b/http/get_simple/rs/Cargo.toml new file mode 100644 index 0000000..f8670a8 --- /dev/null +++ b/http/get_simple/rs/Cargo.toml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[workspace] +resolver = "2" +members = ["client", "server"] + +[workspace.dependencies] +arrow-array = "50.0.0" +arrow-ipc = "50.0.0" +arrow-schema = "50.0.0" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" diff --git a/http/get_simple/rs/client/Cargo.toml b/http/get_simple/rs/client/Cargo.toml new file mode 100644 index 0000000..1eec1bd --- /dev/null +++ b/http/get_simple/rs/client/Cargo.toml @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "client" +version = "0.1.0" +edition = "2021" + +[dependencies] +arrow-ipc.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true diff --git a/http/get_simple/rs/client/README.md b/http/get_simple/rs/client/README.md new file mode 100644 index 0000000..caeeab1 --- /dev/null +++ b/http/get_simple/rs/client/README.md @@ -0,0 +1,34 @@ + + +# HTTP GET Arrow Data: Simple Rust Client Example + +This directory contains a minimal example of an HTTP client implemented in Rust. The client: + +1. Sends an HTTP GET request to a server. +2. Receives an HTTP 200 response from the server, with the response body containing an Arrow IPC stream of record batches. +3. Adds the record batches to a list as they are received. + +To run this example, first start one of the server examples in the parent directory, then: + +```sh +cargo r --release +``` +> [!NOTE] +> This client example implements low-level HTTP/1.1 details directly, instead of using an HTTP library. We intend to update the example to use [hyper](https://docs.rs/hyper/latest/hyper/) after [arrow-rs has an async Arrow IPC reader](https://github.com/apache/arrow-rs/issues/1207)). diff --git a/http/get_simple/rs/client/src/main.rs b/http/get_simple/rs/client/src/main.rs new file mode 100644 index 0000000..7e593b4 --- /dev/null +++ b/http/get_simple/rs/client/src/main.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_ipc::reader::StreamReader; +use std::{ + io::{BufRead, BufReader, Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, +}; +use tracing::{error, info, info_span}; +use tracing_subscriber::fmt::format::FmtSpan; + +fn main() { + // Configure tracing subscriber. + tracing_subscriber::fmt() + .with_span_events(FmtSpan::CLOSE) + .init(); + + info_span!("get_simple").in_scope(|| { + // Connect to server. + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000); + match TcpStream::connect(addr) { + Ok(mut stream) => { + info_span!("Reading Arrow IPC stream", %addr).in_scope(|| { + info!("Connected"); + + // Send request. + stream + .write_all(format!("GET / HTTP/1.1\r\nHost: {addr}\r\n\r\n").as_bytes()) + .unwrap(); + + // Ignore response header. + let mut reader = BufReader::new(&mut stream); + let mut chunked = false; + loop { + let mut line = String::default(); + reader.read_line(&mut line).unwrap(); + if let Some(("transfer-encoding", "chunked")) = line + .to_lowercase() + .split_once(':') + .map(|(key, value)| (key.trim(), value.trim())) + { + chunked = true; + } + if line == "\r\n" { + break; + } + } + + // Read Arrow IPC stream + let batches: Vec<_> = if chunked { + let mut buffer = Vec::default(); + loop { + // Chunk size + let mut line = String::default(); + reader.read_line(&mut line).unwrap(); + let chunk_size = u64::from_str_radix(line.trim(), 16).unwrap(); + + if chunk_size == 0 { + // Terminating chunk + break; + } else { + // Append chunk to buffer + let mut chunk_reader = reader.take(chunk_size); + chunk_reader.read_to_end(&mut buffer).unwrap(); + // Terminating CR-LF sequence + reader = chunk_reader.into_inner(); + reader.read_line(&mut String::default()).unwrap(); + } + } + StreamReader::try_new_unbuffered(buffer.as_slice(), None) + .unwrap() + .flat_map(Result::ok) + .collect() + } else { + StreamReader::try_new_unbuffered(reader, None) + .unwrap() + .flat_map(Result::ok) + .collect() + }; + + info!( + batches = batches.len(), + rows = batches.iter().map(|rb| rb.num_rows()).sum::() + ); + }); + } + Err(error) => { + error!(%error, "Connection failed") + } + } + }) +} diff --git a/http/get_simple/rs/server/Cargo.toml b/http/get_simple/rs/server/Cargo.toml new file mode 100644 index 0000000..f961a4c --- /dev/null +++ b/http/get_simple/rs/server/Cargo.toml @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "server" +version = "0.1.0" +edition = "2021" + +[dependencies] +arrow-array.workspace = true +arrow-ipc.workspace = true +arrow-schema.workspace = true +once_cell = "1.19.0" +rand = "0.8.5" +rayon = "1.9.0" +tracing.workspace = true +tracing-subscriber.workspace = true diff --git a/http/get_simple/rs/server/README.md b/http/get_simple/rs/server/README.md new file mode 100644 index 0000000..70ac8f7 --- /dev/null +++ b/http/get_simple/rs/server/README.md @@ -0,0 +1,34 @@ + + +# HTTP GET Arrow Data: Simple Rust Server Example + +This directory contains a minimal example of an HTTP server implemented in Rust. The server: + +1. Creates a list of record batches and populates it with synthesized data. +2. Listens for HTTP requests from clients. +3. Upon receiving a request, sends an HTTP 200 response with the body containing an Arrow IPC stream of record batches. + +To run this example: + +```sh +cargo r --release +``` +> [!NOTE] +> This server example implements low-level HTTP/1.1 details directly, instead of using an HTTP library. We intend to update the example to use [hyper](https://docs.rs/hyper/latest/hyper/) after [arrow-rs has an async Arrow IPC writer](https://github.com/apache/arrow-rs/issues/1207)). diff --git a/http/get_simple/rs/server/src/main.rs b/http/get_simple/rs/server/src/main.rs new file mode 100644 index 0000000..be3d981 --- /dev/null +++ b/http/get_simple/rs/server/src/main.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + io::{BufRead, BufReader, Result, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}, + sync::Arc, + thread, +}; + +use arrow_array::{Int64Array, RecordBatch}; +use arrow_ipc::writer::StreamWriter; +use arrow_schema::{DataType, Field, Fields, Schema}; +use once_cell::sync::Lazy; +use rand::{distributions::Standard, prelude::*}; +use rayon::{iter, prelude::*}; +use tracing::{error, info, info_span}; +use tracing_subscriber::fmt::format::FmtSpan; + +const RECORDS_PER_BATCH: usize = 4096; +const TOTAL_RECORDS: usize = if cfg!(debug_assertions) { + 100_000 +} else { + 100_000_000 +}; + +/// Schema for random data +static SCHEMA: Lazy> = Lazy::new(|| { + Arc::new(Schema::new( + ('a'..='d') + .map(|field_name| Field::new(field_name, DataType::Int64, true)) + .collect::(), + )) +}); + +/// Random data +static DATA: Lazy> = Lazy::new(|| { + info_span!("data", TOTAL_RECORDS, RECORDS_PER_BATCH).in_scope(|| { + info!("Generating random data"); + // Generate recordbatches with random data + iter::repeatn( + RECORDS_PER_BATCH, + TOTAL_RECORDS.div_euclid(RECORDS_PER_BATCH), + ) + .chain(iter::once(TOTAL_RECORDS.rem_euclid(RECORDS_PER_BATCH))) + .map_init(rand::thread_rng, |rng, len| { + RecordBatch::try_new( + Arc::clone(&SCHEMA), + (0..SCHEMA.all_fields().len()) + .map(|_| { + Arc::new( + rng.sample_iter::(Standard) + .take(len) + .collect::(), + ) as _ + }) + .collect(), + ) + }) + .flatten() + .collect() + }) +}); + +fn get_simple(mut stream: std::net::TcpStream) { + info!("Incoming connection"); + + // Ignore incoming request. + for _ in BufReader::new(&mut stream) + .lines() + .take_while(|line| line.as_ref().is_ok_and(|line| !line.is_empty())) + {} + + // Write response header. + stream + .write_all( + "HTTP/1.1 200 OK\r\ncontent-type: application/vnd.apache.arrow.stream\r\n\r\n" + .as_bytes(), + ) + .unwrap(); + + // Stream the body. + let mut writer = StreamWriter::try_new(stream, &SCHEMA).unwrap(); + for batch in DATA.iter() { + writer.write(batch).unwrap(); + } + writer.finish().unwrap(); + + let stream = writer.into_inner().unwrap(); + stream.shutdown(std::net::Shutdown::Both).unwrap(); +} + +fn main() -> Result<()> { + // Configure tracing subscriber. + tracing_subscriber::fmt() + .with_span_events(FmtSpan::CLOSE) + .init(); + + // Generate random data. + let _ = Lazy::force(&DATA); + + // Start listening. + let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000); + let listener = TcpListener::bind(bind_addr)?; + info!(%bind_addr, "Listening"); + + // Handle incoming connections. + loop { + match listener.accept() { + Ok((stream, remote_peer)) => { + thread::spawn(move || { + info_span!("Writing Arrow IPC stream", %remote_peer) + .in_scope(|| get_simple(stream)) + }); + } + Err(error) => { + error!(%error, "Connection failed"); + } + } + } +}