From 5b0e33973bffb512f6da5d3dc084e4db48862e01 Mon Sep 17 00:00:00 2001 From: JeanArhancet Date: Mon, 8 Jul 2024 00:08:19 +0200 Subject: [PATCH 1/6] feat: add partial param v1 --- influxdb3/tests/server/query.rs | 30 ++++++++++++++++++++---------- influxdb3_server/src/http/v1.rs | 31 ++++++++++++++++++++++++++----- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/influxdb3/tests/server/query.rs b/influxdb3/tests/server/query.rs index 5bd23bf9c93..208674f3186 100644 --- a/influxdb3/tests/server/query.rs +++ b/influxdb3/tests/server/query.rs @@ -1003,10 +1003,12 @@ async fn api_v1_query_chunked() { "values": [ [1, "a", 0.9], [2, "a", 0.89], - ] + ], + "partial": true } ], - "statement_id": 0 + "statement_id": 0, + "partial": true } ] }), @@ -1044,10 +1046,12 @@ async fn api_v1_query_chunked() { [1, "a", 0.9], [2, "a", 0.89], [3, "a", 0.85] - ] + ], + "partial": true } ], - "statement_id": 0 + "statement_id": 0, + "partial": true } ] }), @@ -1086,10 +1090,12 @@ async fn api_v1_query_chunked() { "values": [ [1, "a", 0.9], [2, "a", 0.89], - ] + ], + "partial": true } ], - "statement_id": 0 + "statement_id": 0, + "partial": true } ] }), @@ -1102,10 +1108,12 @@ async fn api_v1_query_chunked() { "columns": ["time","host","usage"], "values": [ [3, "a", 0.85] - ] + ], + "partial": true } ], - "statement_id": 0 + "statement_id": 0, + "partial": true } ] }), @@ -1119,10 +1127,12 @@ async fn api_v1_query_chunked() { "values": [ [4, "a", 0.5], [5, "a", 0.6], - ] + ], + "partial": true } ], - "statement_id": 0 + "statement_id": 0, + "partial": true } ] }), diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index 5a850934bd5..8f9cc616d9d 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -22,7 +22,10 @@ use chrono::{format::SecondsFormat, DateTime}; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{ready, stream::Fuse, Stream, StreamExt}; use hyper::http::HeaderValue; -use hyper::{header::ACCEPT, header::CONTENT_TYPE, Body, Request, Response, StatusCode}; +use hyper::{ + header::ACCEPT, header::CONTENT_TYPE, header::TRANSFER_ENCODING, Body, Request, Response, + StatusCode, +}; use influxdb3_write::WriteBuffer; use iox_time::TimeProvider; use observability_deps::tracing::info; @@ -35,6 +38,7 @@ use crate::QueryExecutor; use super::{Error, HttpApi, Result}; const DEFAULT_CHUNK_SIZE: usize = 10_000; +const TRANSFER_ENCODING_CHUNKED: &str = "chunked"; impl HttpApi where @@ -74,11 +78,17 @@ where QueryResponseStream::new(0, stream, chunk_size, format, epoch).map_err(QueryError)?; let body = Body::wrap_stream(stream); - Ok(Response::builder() + let mut builder = Response::builder() .status(StatusCode::OK) - .header(CONTENT_TYPE, format.as_content_type()) - .body(body) - .unwrap()) + .header(CONTENT_TYPE, format.as_content_type()); + + // Check if the response is chunked. + // If it is, add the "Transfer-Encoding: chunked" header to the response builder. + if chunked { + builder = builder.header(TRANSFER_ENCODING, TRANSFER_ENCODING_CHUNKED); + } + + Ok(builder.body(body).unwrap()) } } @@ -303,6 +313,8 @@ impl From for Bytes { struct StatementResponse { statement_id: usize, series: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + partial: Option, } /// The records produced for a single time series (measurement) @@ -311,6 +323,8 @@ struct Series { name: String, columns: Vec, values: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + partial: Option, } /// A single row, or record in a time series @@ -535,6 +549,9 @@ impl QueryResponseStream { /// Flush a single chunk, or time series, when operating in chunked mode fn flush_one(&mut self) -> QueryResponse { let columns = self.columns(); + + let partial = self.buffer.can_flush().then_some(true); + // this unwrap is okay because we only ever call flush_one // after calling can_flush on the buffer: let (name, values) = self.buffer.flush_one().unwrap(); @@ -542,11 +559,13 @@ impl QueryResponseStream { name, columns, values, + partial, }]; QueryResponse { results: vec![StatementResponse { statement_id: self.statement_id, series, + partial, }], format: self.format, } @@ -563,12 +582,14 @@ impl QueryResponseStream { name, columns: columns.clone(), values, + partial: None, }) .collect(); Ok(QueryResponse { results: vec![StatementResponse { statement_id: self.statement_id, series, + partial: None, }], format: self.format, }) From 7e610e7a9f57e07494bbea4787f2c62077c2b8b1 Mon Sep 17 00:00:00 2001 From: JeanArhancet Date: Fri, 12 Jul 2024 12:15:10 +0200 Subject: [PATCH 2/6] fix: partial field in series --- influxdb3/tests/server/query.rs | 6 ++---- influxdb3_server/src/http/v1.rs | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/influxdb3/tests/server/query.rs b/influxdb3/tests/server/query.rs index 208674f3186..87775e33442 100644 --- a/influxdb3/tests/server/query.rs +++ b/influxdb3/tests/server/query.rs @@ -1046,8 +1046,7 @@ async fn api_v1_query_chunked() { [1, "a", 0.9], [2, "a", 0.89], [3, "a", 0.85] - ], - "partial": true + ] } ], "statement_id": 0, @@ -1108,8 +1107,7 @@ async fn api_v1_query_chunked() { "columns": ["time","host","usage"], "values": [ [3, "a", 0.85] - ], - "partial": true + ] } ], "statement_id": 0, diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index 8f9cc616d9d..149da6c61a7 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -393,6 +393,15 @@ impl ChunkBuffer { } } + /// This function returns true if the number of rows in the current series exceeds the chunk size + fn is_partial_series(&self) -> bool { + if let (Some(size), Some(m)) = (self.size, self.series.back()) { + m.1.len() > size + } else { + false + } + } + /// The [`ChunkBuffer`] is operating in chunked mode, and can flush a chunk fn can_flush(&self) -> bool { if let (Some(size), Some(m)) = (self.size, self.series.back()) { @@ -550,8 +559,8 @@ impl QueryResponseStream { fn flush_one(&mut self) -> QueryResponse { let columns = self.columns(); - let partial = self.buffer.can_flush().then_some(true); - + let partial_series = self.buffer.is_partial_series().then_some(true); + let partial_results = self.buffer.can_flush().then_some(true); // this unwrap is okay because we only ever call flush_one // after calling can_flush on the buffer: let (name, values) = self.buffer.flush_one().unwrap(); @@ -559,13 +568,13 @@ impl QueryResponseStream { name, columns, values, - partial, + partial: partial_series, }]; QueryResponse { results: vec![StatementResponse { statement_id: self.statement_id, series, - partial, + partial: partial_results, }], format: self.format, } From 2cbde5ffb47b8ac40aae78fc2fe8bc8ad10becb7 Mon Sep 17 00:00:00 2001 From: JeanArhancet Date: Mon, 15 Jul 2024 20:37:48 +0200 Subject: [PATCH 3/6] test: add test partial flag --- Cargo.lock | 350 ++++++++++++++++---------------- Cargo.toml | 2 +- influxdb3_server/src/http/v1.rs | 126 ++++++++++++ 3 files changed, 302 insertions(+), 176 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f38b688e57..b9d041d4e02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -486,7 +486,7 @@ dependencies = [ "num-traits", "once_cell", "regex", - "snafu 0.8.3", + "snafu 0.8.4", "uuid", "workspace-hack", ] @@ -559,18 +559,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -595,7 +595,7 @@ dependencies = [ "iox_time", "metric", "observability_deps", - "snafu 0.8.3", + "snafu 0.8.4", "tonic 0.11.0", "workspace-hack", ] @@ -619,7 +619,7 @@ dependencies = [ "futures-util", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "itoa", "matchit", "memchr", @@ -658,7 +658,7 @@ source = "git+https://github.com/influxdata/influxdb3_core?rev=ca040f2a6e5b6470e dependencies = [ "observability_deps", "rand", - "snafu 0.8.3", + "snafu 0.8.4", "tokio", "workspace-hack", ] @@ -737,9 +737,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52" +checksum = "3d08263faac5cde2a4d52b513dadb80846023aade56fcd8fc99ba73ba8050e92" dependencies = [ "arrayref", "arrayvec", @@ -803,9 +803,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" [[package]] name = "bzip2" @@ -837,11 +837,11 @@ dependencies = [ "dashmap", "futures", "generated_types", - "hyper 0.14.29", + "hyper 0.14.30", "metric", "observability_deps", "reqwest 0.11.27", - "snafu 0.8.3", + "snafu 0.8.4", "tokio", "tokio-util", "url", @@ -850,9 +850,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.104" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74b6a57f98764a267ff415d50a25e6e166f3831a5071af4995296ea97d210490" +checksum = "47de7e88bbbd467951ae7f5a6f34f70d1b4d9cfce53d5fd70f74ebe118b3db56" dependencies = [ "jobserver", "libc", @@ -883,7 +883,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -932,9 +932,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.8" +version = "4.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84b3edb18336f4df585bc9aa31dd99c036dfa5dc5e9a2939a722a188f3a8970d" +checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462" dependencies = [ "clap_builder", "clap_derive", @@ -961,7 +961,7 @@ dependencies = [ "observability_deps", "paste", "reqwest 0.11.27", - "snafu 0.8.3", + "snafu 0.8.4", "sysinfo", "tokio", "trace_exporters", @@ -974,9 +974,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.8" +version = "4.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1c09dd5ada6c6c78075d6fd0da3f90d8080651e2d6cc8eb2f1aaa4034ced708" +checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942" dependencies = [ "anstream", "anstyle", @@ -993,7 +993,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -1285,14 +1285,14 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] name = "darling" -version = "0.20.9" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" dependencies = [ "darling_core", "darling_macro", @@ -1300,27 +1300,27 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.9" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", "strsim", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] name = "darling_macro" -version = "0.20.9" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -1359,7 +1359,7 @@ dependencies = [ "serde_json", "sha2", "siphasher 1.0.1", - "snafu 0.8.3", + "snafu 0.8.4", "sqlx", "thiserror", "uuid", @@ -1872,7 +1872,7 @@ dependencies = [ "metric", "observability_deps", "parking_lot", - "snafu 0.8.3", + "snafu 0.8.4", "tokio", "tokio_metrics_bridge", "tokio_watchdog", @@ -1934,7 +1934,7 @@ dependencies = [ "observability_deps", "once_cell", "prost 0.12.6", - "snafu 0.8.3", + "snafu 0.8.4", "tonic 0.11.0", "workspace-hack", ] @@ -2041,7 +2041,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -2286,9 +2286,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http 1.1.0", @@ -2303,7 +2303,7 @@ dependencies = [ "bytes", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2333,9 +2333,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.29" +version = "0.14.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" +checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" dependencies = [ "bytes", "futures-channel", @@ -2357,15 +2357,15 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4fe55fb7a772d59a5ff1dfbff4fe0258d19b89fec4b233e75d35d5d2316badc" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -2382,7 +2382,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper 0.14.29", + "hyper 0.14.30", "rustls 0.21.12", "tokio", "tokio-rustls 0.24.1", @@ -2396,10 +2396,10 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.0", + "hyper 1.4.1", "hyper-util", - "rustls 0.23.10", - "rustls-native-certs 0.7.0", + "rustls 0.23.11", + "rustls-native-certs 0.7.1", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2412,7 +2412,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.29", + "hyper 0.14.30", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -2428,8 +2428,8 @@ dependencies = [ "futures-channel", "futures-util", "http 1.1.0", - "http-body 1.0.0", - "hyper 1.4.0", + "http-body 1.0.1", + "hyper 1.4.1", "pin-project-lite", "socket2", "tokio", @@ -2508,7 +2508,7 @@ dependencies = [ "log", "nom", "smallvec", - "snafu 0.8.3", + "snafu 0.8.4", ] [[package]] @@ -2531,7 +2531,7 @@ dependencies = [ "futures", "hashbrown 0.14.5", "hex", - "hyper 0.14.29", + "hyper 0.14.30", "influxdb3_client", "influxdb3_process", "influxdb3_server", @@ -2649,7 +2649,7 @@ dependencies = [ "futures", "hex", "http 0.2.12", - "hyper 0.14.29", + "hyper 0.14.30", "influxdb-line-protocol", "influxdb3_process", "influxdb3_write", @@ -2800,7 +2800,7 @@ dependencies = [ "prost-build", "query_functions", "serde", - "snafu 0.8.3", + "snafu 0.8.4", "tonic 0.11.0", "tonic-build", "workspace-hack", @@ -2863,7 +2863,7 @@ dependencies = [ "ring", "serde", "siphasher 1.0.1", - "snafu 0.8.3", + "snafu 0.8.4", "sqlx", "sqlx-hotswap-pool", "thiserror", @@ -2883,7 +2883,7 @@ dependencies = [ "async-trait", "authz", "data_types", - "hyper 0.14.29", + "hyper 0.14.30", "parking_lot", "serde", "serde_urlencoded", @@ -2920,7 +2920,7 @@ dependencies = [ "predicate", "query_functions", "schema", - "snafu 0.8.3", + "snafu 0.8.4", "tokio", "tokio-stream", "trace", @@ -3015,7 +3015,7 @@ dependencies = [ "generated_types", "hashbrown 0.14.5", "http 0.2.12", - "hyper 0.14.29", + "hyper 0.14.30", "log", "metric", "metric_exporters", @@ -3027,7 +3027,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "service_grpc_testing", - "snafu 0.8.3", + "snafu 0.8.4", "tokio", "tokio-stream", "tokio-util", @@ -3391,7 +3391,7 @@ checksum = "d2f6e023aa5bdf392aa06c78e4a4e6d498baab5138d0c993503350ebbc37bf1e" dependencies = [ "assert-json-diff", "futures-core", - "hyper 0.14.29", + "hyper 0.14.30", "log", "rand", "regex", @@ -3425,7 +3425,7 @@ dependencies = [ "iox_time", "itertools 0.13.0", "schema", - "snafu 0.8.3", + "snafu 0.8.4", "workspace-hack", ] @@ -3438,7 +3438,7 @@ dependencies = [ "influxdb-line-protocol", "itertools 0.13.0", "mutable_batch", - "snafu 0.8.3", + "snafu 0.8.4", "workspace-hack", ] @@ -3453,7 +3453,7 @@ dependencies = [ "hashbrown 0.14.5", "mutable_batch", "schema", - "snafu 0.8.3", + "snafu 0.8.4", "workspace-hack", ] @@ -3642,7 +3642,7 @@ dependencies = [ "chrono", "futures", "humantime", - "hyper 0.14.29", + "hyper 0.14.30", "itertools 0.12.1", "md-5", "parking_lot", @@ -3739,7 +3739,7 @@ dependencies = [ "libc", "redox_syscall 0.5.2", "smallvec", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -3796,7 +3796,7 @@ dependencies = [ "pbjson-types", "prost 0.12.6", "schema", - "snafu 0.8.3", + "snafu 0.8.4", "thiserror", "thrift", "tokio", @@ -3937,7 +3937,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -4056,7 +4056,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -4151,7 +4151,7 @@ dependencies = [ "prost 0.12.6", "prost-types 0.12.6", "regex", - "syn 2.0.68", + "syn 2.0.71", "tempfile", ] @@ -4178,7 +4178,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -4211,7 +4211,7 @@ dependencies = [ "regex", "regex-syntax 0.8.4", "schema", - "snafu 0.8.3", + "snafu 0.8.4", "workspace-hack", ] @@ -4242,7 +4242,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.10", + "rustls 0.23.11", "thiserror", "tokio", "tracing", @@ -4258,7 +4258,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.10", + "rustls 0.23.11", "slab", "thiserror", "tinyvec", @@ -4423,7 +4423,7 @@ dependencies = [ "h2", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "hyper-rustls 0.24.2", "ipnet", "js-sys", @@ -4465,9 +4465,9 @@ dependencies = [ "futures-core", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "http-body-util", - "hyper 1.4.0", + "hyper 1.4.1", "hyper-rustls 0.27.2", "hyper-util", "ipnet", @@ -4478,8 +4478,8 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.10", - "rustls-native-certs 0.7.0", + "rustls 0.23.11", + "rustls-native-certs 0.7.1", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", @@ -4588,21 +4588,21 @@ dependencies = [ "log", "ring", "rustls-pki-types", - "rustls-webpki 0.102.4", + "rustls-webpki 0.102.5", "subtle", "zeroize", ] [[package]] name = "rustls" -version = "0.23.10" +version = "0.23.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +checksum = "4828ea528154ae444e5a642dbb7d5623354030dc9822b83fd9bb79683c7399d0" dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.102.4", + "rustls-webpki 0.102.5", "subtle", "zeroize", ] @@ -4621,9 +4621,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" dependencies = [ "openssl-probe", "rustls-pemfile 2.1.2", @@ -4669,9 +4669,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.4" +version = "0.102.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" dependencies = [ "ring", "rustls-pki-types", @@ -4730,7 +4730,7 @@ dependencies = [ "indexmap 2.2.6", "observability_deps", "once_cell", - "snafu 0.8.3", + "snafu 0.8.4", "workspace-hack", ] @@ -4796,22 +4796,22 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.203" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.203" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -4839,9 +4839,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.8.2" +version = "3.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "079f3a42cd87588d924ed95b533f8d30a483388c4e400ab736a7058e34f16169" +checksum = "e73139bc5ec2d45e6c5fd85be5a46949c1c39a4c18e56915f5eb4c12f975e377" dependencies = [ "base64 0.22.1", "chrono", @@ -4857,14 +4857,14 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.8.2" +version = "3.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc03aad67c1d26b7de277d51c86892e7d9a0110a2fe44bf6b26cc569fba302d6" +checksum = "b80d3d6b56b64335c0180e5ffde23b3c5e08c14c585b51a15bd0e95393f46703" dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -4901,7 +4901,7 @@ dependencies = [ "serde", "serde_json", "service_common", - "snafu 0.8.3", + "snafu 0.8.4", "tokio", "tonic 0.11.0", "tower_trailer", @@ -5017,11 +5017,11 @@ dependencies = [ [[package]] name = "snafu" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418b8136fec49956eba89be7da2847ec1909df92a9ae4178b5ff0ff092c8d95e" +checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d" dependencies = [ - "snafu-derive 0.8.3", + "snafu-derive 0.8.4", ] [[package]] @@ -5038,14 +5038,14 @@ dependencies = [ [[package]] name = "snafu-derive" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a4812a669da00d17d8266a0439eddcacbc88b17f732f927e52eeb9d196f7fb5" +checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5" dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -5111,7 +5111,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -5368,7 +5368,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -5390,9 +5390,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.68" +version = "2.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" dependencies = [ "proc-macro2", "quote", @@ -5413,9 +5413,9 @@ checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" [[package]] name = "sysinfo" -version = "0.30.12" +version = "0.30.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae" +checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" dependencies = [ "cfg-if", "core-foundation-sys", @@ -5503,7 +5503,7 @@ dependencies = [ "futures-util", "generated_types", "http 0.2.12", - "hyper 0.14.29", + "hyper 0.14.30", "influxdb_iox_client", "ingester_query_grpc", "insta", @@ -5522,7 +5522,7 @@ dependencies = [ "regex", "reqwest 0.11.27", "serde_json", - "snafu 0.8.3", + "snafu 0.8.4", "sqlx", "tempfile", "test_helpers", @@ -5534,22 +5534,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -5657,9 +5657,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.6.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" dependencies = [ "tinyvec_macros", ] @@ -5708,7 +5708,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -5738,7 +5738,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.10", + "rustls 0.23.11", "rustls-pki-types", "tokio", ] @@ -5805,7 +5805,7 @@ dependencies = [ "h2", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "hyper-timeout", "percent-encoding", "pin-project", @@ -5832,12 +5832,12 @@ dependencies = [ "h2", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "hyper-timeout", "percent-encoding", "pin-project", "prost 0.12.6", - "rustls-native-certs 0.7.0", + "rustls-native-certs 0.7.1", "rustls-pemfile 2.1.2", "rustls-pki-types", "tokio", @@ -5859,7 +5859,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -5975,7 +5975,7 @@ dependencies = [ "futures", "iox_time", "observability_deps", - "snafu 0.8.3", + "snafu 0.8.4", "socket2", "thrift", "tokio", @@ -5998,7 +5998,7 @@ dependencies = [ "observability_deps", "parking_lot", "pin-project", - "snafu 0.8.3", + "snafu 0.8.4", "tower", "trace", "workspace-hack", @@ -6024,7 +6024,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -6233,9 +6233,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.9.1" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ "getrandom", "serde", @@ -6340,7 +6340,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", "wasm-bindgen-shared", ] @@ -6374,7 +6374,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6462,7 +6462,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -6471,7 +6471,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -6489,7 +6489,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -6509,18 +6509,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.5", - "windows_aarch64_msvc 0.52.5", - "windows_i686_gnu 0.52.5", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.5", - "windows_x86_64_gnu 0.52.5", - "windows_x86_64_gnullvm 0.52.5", - "windows_x86_64_msvc 0.52.5", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -6531,9 +6531,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -6543,9 +6543,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -6555,15 +6555,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" [[package]] name = "windows_i686_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -6573,9 +6573,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -6585,9 +6585,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -6597,9 +6597,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -6609,9 +6609,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winreg" @@ -6669,7 +6669,7 @@ dependencies = [ "getrandom", "hashbrown 0.14.5", "heck 0.4.1", - "hyper 0.14.29", + "hyper 0.14.30", "indexmap 2.2.6", "itertools 0.12.1", "lalrpop-util", @@ -6723,7 +6723,7 @@ dependencies = [ "strum", "subtle", "syn 1.0.109", - "syn 2.0.68", + "syn 2.0.71", "thrift", "tokio", "tokio-stream", @@ -6760,22 +6760,22 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" [[package]] name = "zerocopy" -version = "0.7.34" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.34" +version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -6795,32 +6795,32 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] name = "zstd" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "7.1.0" +version = "7.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" +checksum = "fa556e971e7b568dc775c136fc9de8c779b1c2fc3a63defaafadffdbd3181afa" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.11+zstd.1.5.6" +version = "2.0.12+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75652c55c0b6f3e6f12eb786fe1bc960396bf05a1eb3bf1f3691c3610ac2e6d4" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" dependencies = [ "cc", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index 336e8c34466..9ab6d157147 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ dotenvy = "0.15.7" flate2 = "1.0.27" futures = "0.3.28" futures-util = "0.3.30" -hashbrown = "0.14.5" +hashbrown = { version = "0.14.5", features = ["serde"] } hex = "0.4.3" http = "0.2.9" humantime = "2.1.0" diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index 149da6c61a7..a3e0af9af92 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -768,3 +768,129 @@ impl Stream for QueryResponseStream { } } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::ArrayRef; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use arrow_array::{Float64Array, Int64Array, StringArray, TimestampNanosecondArray}; + use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + use futures::stream::{self, StreamExt}; + use serde_json::json; + use std::sync::Arc; + + fn times(vals: &[i64]) -> ArrayRef { + Arc::new(TimestampNanosecondArray::from_iter_values( + vals.iter().cloned(), + )) + } + + fn strs>(vals: &[Option]) -> ArrayRef { + Arc::new(StringArray::from_iter(vals)) + } + + fn f64s(vals: &[Option]) -> ArrayRef { + Arc::new(Float64Array::from_iter(vals.iter())) + } + + fn i64s(vals: &[Option]) -> ArrayRef { + Arc::new(Int64Array::from_iter(vals.iter().cloned())) + } + + fn create_test_record_batch() -> RecordBatch { + let meta = serde_json::to_string(&json!({ + "measurement_column_index": 0, + "tag_key_columns": [], + })) + .unwrap(); + let schema = Arc::new(Schema::new_with_metadata( + vec![ + Field::new("iox::measurement", DataType::Utf8, false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("cpu", DataType::Utf8, true), + Field::new("device", DataType::Utf8, true), + Field::new("usage_idle", DataType::Float64, true), + Field::new("free", DataType::Int64, true), + ], + HashMap::from([("iox::influxql::group_key::metadata".to_owned(), meta)]), + )); + RecordBatch::try_new( + schema, + vec![ + strs(&[Some("cpu"), Some("cpu"), Some("cpu"), Some("cpu")]), + times(&[ + 1157082300000000000, + 1157082310000000000, + 1157082400000000000, + 1157082320000000000, + ]), + strs(&[Some("cpu0"), Some("cpu0"), Some("cpu1"), Some("cpu2")]), + strs(&[Some("disk1s1"), None, Some("disk1s1"), None]), + f64s(&[Some(99.1), Some(99.8), Some(99.2), Some(99.3)]), + i64s(&[None, Some(2133), Some(4110), Some(1995)]), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_partial_flag() { + let batch = create_test_record_batch(); + let schema = batch.schema(); + let input_stream = stream::iter(vec![Ok(batch.clone())]); + let input: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( + schema, + Box::pin(input_stream), + )); + let chunk_size = Some(1); + let mut query_response_stream = + QueryResponseStream::new(0, input, chunk_size, QueryFormat::Json, None).unwrap(); + + // Counters for assertions + let mut counter = 0; + + while let Some(response) = query_response_stream.next().await { + match response { + Ok(resp) => { + println!("Received response: {:?}", resp); + + match counter { + 0 => { + assert!(resp.results[0].partial.unwrap()); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 1); + } + 1 => { + assert!(resp.results[0].partial.unwrap()); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 1); + } + 2 => { + assert!(resp.results[0].partial.unwrap()); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 1); + } + 3 => { + assert_eq!(resp.results[0].partial, None); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 1); + } + _ => panic!("Received more responses than expected"), + } + + counter += 1; + } + Err(err) => panic!("Error while polling stream: {:?}", err), + } + } + + // Ensure we received exactly 4 responses + assert_eq!(counter, 4, "Expected 4 responses, but received {}", counter); + } +} From 6ad9462ebed91ca941edac7dd393cbd3e1a6e509 Mon Sep 17 00:00:00 2001 From: JeanArhancet Date: Tue, 16 Jul 2024 23:30:30 +0200 Subject: [PATCH 4/6] test: refactor to use multiple recordbatch --- influxdb3_server/src/http/v1.rs | 73 +++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index a3e0af9af92..73edc316b81 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -20,7 +20,8 @@ use arrow::{ use bytes::Bytes; use chrono::{format::SecondsFormat, DateTime}; use datafusion::physical_plan::SendableRecordBatchStream; -use futures::{ready, stream::Fuse, Stream, StreamExt}; +use futures::future::FusedFuture; +use futures::{pin_mut, ready, stream::Fuse, Stream, StreamExt}; use hyper::http::HeaderValue; use hyper::{ header::ACCEPT, header::CONTENT_TYPE, header::TRANSFER_ENCODING, Body, Request, Response, @@ -776,6 +777,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::{Float64Array, Int64Array, StringArray, TimestampNanosecondArray}; + use datafusion::error::DataFusionError; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream::{self, StreamExt}; use serde_json::json; @@ -791,15 +793,7 @@ mod tests { Arc::new(StringArray::from_iter(vals)) } - fn f64s(vals: &[Option]) -> ArrayRef { - Arc::new(Float64Array::from_iter(vals.iter())) - } - - fn i64s(vals: &[Option]) -> ArrayRef { - Arc::new(Int64Array::from_iter(vals.iter().cloned())) - } - - fn create_test_record_batch() -> RecordBatch { + fn create_test_record_batch() -> Vec> { let meta = serde_json::to_string(&json!({ "measurement_column_index": 0, "tag_key_columns": [], @@ -813,42 +807,57 @@ mod tests { DataType::Timestamp(TimeUnit::Nanosecond, None), false, ), - Field::new("cpu", DataType::Utf8, true), - Field::new("device", DataType::Utf8, true), - Field::new("usage_idle", DataType::Float64, true), - Field::new("free", DataType::Int64, true), + Field::new("value", DataType::Utf8, true), ], HashMap::from([("iox::influxql::group_key::metadata".to_owned(), meta)]), )); - RecordBatch::try_new( - schema, + let record_batch_0 = Ok(RecordBatch::try_new( + Arc::clone(&schema), vec![ - strs(&[Some("cpu"), Some("cpu"), Some("cpu"), Some("cpu")]), + strs(&[Some("cpu"), Some("cpu")]), + times(&[1157082300000000000, 1157082310000000000]), + strs(&[Some("cpu0"), Some("cpu0")]), + ], + ) + .unwrap()); + + let record_batch_1 = Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![ + strs(&[Some("cpu"), Some("cpu"), Some("cpu")]), times(&[ 1157082300000000000, - 1157082310000000000, 1157082400000000000, 1157082320000000000, ]), - strs(&[Some("cpu0"), Some("cpu0"), Some("cpu1"), Some("cpu2")]), - strs(&[Some("disk1s1"), None, Some("disk1s1"), None]), - f64s(&[Some(99.1), Some(99.8), Some(99.2), Some(99.3)]), - i64s(&[None, Some(2133), Some(4110), Some(1995)]), + strs(&[Some("cpu0"), Some("cpu0"), Some("cpu2")]), ], ) - .unwrap() + .unwrap()); + + let record_batch_2 = Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![ + strs(&[Some("mem"), Some("mem")]), + times(&[1157082500000000000, 1157082420000000000]), + strs(&[Some("mem0"), Some("mem2")]), + ], + ) + .unwrap()); + + vec![record_batch_0, record_batch_1, record_batch_2] } #[tokio::test] async fn test_partial_flag() { let batch = create_test_record_batch(); - let schema = batch.schema(); - let input_stream = stream::iter(vec![Ok(batch.clone())]); + let schema = batch[0].as_ref().unwrap().schema(); + let input_stream = stream::iter(batch); let input: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( schema, Box::pin(input_stream), )); - let chunk_size = Some(1); + let chunk_size = Some(2); let mut query_response_stream = QueryResponseStream::new(0, input, chunk_size, QueryFormat::Json, None).unwrap(); @@ -863,23 +872,27 @@ mod tests { match counter { 0 => { assert!(resp.results[0].partial.unwrap()); + assert!(resp.results[0].series[0].partial.unwrap()); assert_eq!(resp.results[0].series[0].name, "cpu"); - assert_eq!(resp.results[0].series[0].values.len(), 1); + assert_eq!(resp.results[0].series[0].values.len(), 2); } 1 => { assert!(resp.results[0].partial.unwrap()); + assert!(resp.results[0].series[0].partial.unwrap()); assert_eq!(resp.results[0].series[0].name, "cpu"); - assert_eq!(resp.results[0].series[0].values.len(), 1); + assert_eq!(resp.results[0].series[0].values.len(), 2); } 2 => { assert!(resp.results[0].partial.unwrap()); + assert_eq!(resp.results[0].series[0].partial, None); assert_eq!(resp.results[0].series[0].name, "cpu"); assert_eq!(resp.results[0].series[0].values.len(), 1); } 3 => { assert_eq!(resp.results[0].partial, None); - assert_eq!(resp.results[0].series[0].name, "cpu"); - assert_eq!(resp.results[0].series[0].values.len(), 1); + assert_eq!(resp.results[0].series[0].partial, None); + assert_eq!(resp.results[0].series[0].name, "mem"); + assert_eq!(resp.results[0].series[0].values.len(), 2); } _ => panic!("Received more responses than expected"), } From 25f05a6da253f0aff93a5635d15ab562d4c05e29 Mon Sep 17 00:00:00 2001 From: JeanArhancet Date: Thu, 25 Jul 2024 13:15:42 +0200 Subject: [PATCH 5/6] refactor: use peekable --- influxdb3_server/src/http/v1.rs | 42 ++++++++++++--------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index 73edc316b81..a5b2cedb868 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -16,12 +16,11 @@ use arrow::{ }, record_batch::RecordBatch, }; - use bytes::Bytes; use chrono::{format::SecondsFormat, DateTime}; use datafusion::physical_plan::SendableRecordBatchStream; -use futures::future::FusedFuture; -use futures::{pin_mut, ready, stream::Fuse, Stream, StreamExt}; +use futures::{future::FusedFuture, stream::FusedStream}; +use futures::{pin_mut, ready, stream::Fuse, stream::Peekable, FutureExt, Stream, StreamExt}; use hyper::http::HeaderValue; use hyper::{ header::ACCEPT, header::CONTENT_TYPE, header::TRANSFER_ENCODING, Body, Request, Response, @@ -394,15 +393,6 @@ impl ChunkBuffer { } } - /// This function returns true if the number of rows in the current series exceeds the chunk size - fn is_partial_series(&self) -> bool { - if let (Some(size), Some(m)) = (self.size, self.series.back()) { - m.1.len() > size - } else { - false - } - } - /// The [`ChunkBuffer`] is operating in chunked mode, and can flush a chunk fn can_flush(&self) -> bool { if let (Some(size), Some(m)) = (self.size, self.series.back()) { @@ -433,9 +423,10 @@ impl ChunkBuffer { /// /// The input stream is wrapped in [`Fuse`], because of the [`Stream`] implementation /// below, it is possible that the input stream is polled after completion. + struct QueryResponseStream { buffer: ChunkBuffer, - input: Fuse, + input: Peekable>, column_map: HashMap, statement_id: usize, format: QueryFormat, @@ -471,7 +462,7 @@ impl QueryResponseStream { Ok(Self { buffer, column_map, - input: input.fuse(), + input: input.fuse().peekable(), format, statement_id, epoch, @@ -557,13 +548,12 @@ impl QueryResponseStream { } /// Flush a single chunk, or time series, when operating in chunked mode - fn flush_one(&mut self) -> QueryResponse { + fn flush_one(&mut self, has_more_data: bool, stream_finished: bool) -> QueryResponse { let columns = self.columns(); + let partial_series = if has_more_data { Some(true) } else { None }; + let partial_results = if stream_finished { Some(true) } else { None }; - let partial_series = self.buffer.is_partial_series().then_some(true); - let partial_results = self.buffer.can_flush().then_some(true); - // this unwrap is okay because we only ever call flush_one - // after calling can_flush on the buffer: + // This unwrap is okay because we only ever call flush_one after calling can_flush on the buffer let (name, values) = self.buffer.flush_one().unwrap(); let series = vec![Series { name, @@ -729,22 +719,20 @@ impl Stream for QueryResponseStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // check for data in the buffer that can be flushed, if we are operating in chunked mode, - // this will drain the buffer as much as possible by repeatedly returning Ready here - // until the buffer can no longer flush, and before the input stream is polled again: if self.buffer.can_flush() { - return Poll::Ready(Some(Ok(self.flush_one()))); + let has_more_data = ready!(Pin::new(&mut self.input).poll_peek(cx)).is_some(); + let stream_finished = false; + return Poll::Ready(Some(Ok(self.flush_one(has_more_data, stream_finished)))); } // poll the input record batch stream: match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - // buffer the yielded batch: if let Err(e) = self.buffer_record_batch(batch) { return Poll::Ready(Some(Err(e))); } if self.buffer.can_flush() { - // if we can flush the buffer, do so now, and return - Poll::Ready(Some(Ok(self.flush_one()))) + let has_more_data = ready!(Pin::new(&mut self.input).poll_peek(cx)).is_some(); + Poll::Ready(Some(Ok(self.flush_one(has_more_data, true)))) } else { // otherwise, we want to poll again in order to pull more // batches from the input record batch stream: @@ -894,7 +882,7 @@ mod tests { assert_eq!(resp.results[0].series[0].name, "mem"); assert_eq!(resp.results[0].series[0].values.len(), 2); } - _ => panic!("Received more responses than expected"), + _ => (), } counter += 1; From 53a1bb6ac593d67d2963025d589981eb68f1b7ff Mon Sep 17 00:00:00 2001 From: JeanArhancet Date: Sun, 28 Jul 2024 21:22:45 +0200 Subject: [PATCH 6/6] test: add another test --- influxdb3_server/src/http/v1.rs | 73 +++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index a5b2cedb868..c68f4f3ad79 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -894,4 +894,77 @@ mod tests { // Ensure we received exactly 4 responses assert_eq!(counter, 4, "Expected 4 responses, but received {}", counter); } + + #[tokio::test] + async fn test_partial_flag_one_stream() { + let meta = serde_json::to_string(&json!({ + "measurement_column_index": 0, + "tag_key_columns": [], + })) + .unwrap(); + let schema = Arc::new(Schema::new_with_metadata( + vec![ + Field::new("iox::measurement", DataType::Utf8, false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("value", DataType::Utf8, true), + ], + HashMap::from([("iox::influxql::group_key::metadata".to_owned(), meta)]), + )); + let record_batch_0 = Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![ + strs(&[Some("cpu"), Some("cpu")]), + times(&[1157082300000000000, 1157082310000000000]), + strs(&[Some("cpu0"), Some("cpu0")]), + ], + ) + .unwrap()); + + let batch = vec![record_batch_0]; + let schema = batch[0].as_ref().unwrap().schema(); + let input_stream = stream::iter(batch); + let input: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( + schema, + Box::pin(input_stream), + )); + let chunk_size = Some(1); + let mut query_response_stream = + QueryResponseStream::new(0, input, chunk_size, QueryFormat::Json, None).unwrap(); + + // Counters for assertions + let mut counter = 0; + + while let Some(response) = query_response_stream.next().await { + match response { + Ok(resp) => { + println!("Received response: {:?}", resp); + + match counter { + 0 => { + assert!(resp.results[0].partial.unwrap()); + assert!(resp.results[0].series[0].partial.unwrap()); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 1); + } + 1 => { + assert_eq!(resp.results[0].partial, None); + assert_eq!(resp.results[0].series[0].partial, None); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 1); + } + _ => (), + } + + counter += 1; + } + Err(err) => panic!("Error while polling stream: {:?}", err), + } + } + + assert_eq!(counter, 2, "Expected 2 responses, but received {}", counter); + } }