Skip to content

Commit

Permalink
test: refactor to use multiple recordbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
JeanArhancet committed Jul 16, 2024
1 parent 2cbde5f commit 6ad9462
Showing 1 changed file with 43 additions and 30 deletions.
73 changes: 43 additions & 30 deletions influxdb3_server/src/http/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use arrow::{
use bytes::Bytes;
use chrono::{format::SecondsFormat, DateTime};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{ready, stream::Fuse, Stream, StreamExt};
use futures::future::FusedFuture;
use futures::{pin_mut, ready, stream::Fuse, Stream, StreamExt};
use hyper::http::HeaderValue;
use hyper::{
header::ACCEPT, header::CONTENT_TYPE, header::TRANSFER_ENCODING, Body, Request, Response,
Expand Down Expand Up @@ -776,6 +777,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::{Float64Array, Int64Array, StringArray, TimestampNanosecondArray};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::{self, StreamExt};
use serde_json::json;
Expand All @@ -791,15 +793,7 @@ mod tests {
Arc::new(StringArray::from_iter(vals))
}

fn f64s(vals: &[Option<f64>]) -> ArrayRef {
Arc::new(Float64Array::from_iter(vals.iter()))
}

fn i64s(vals: &[Option<i64>]) -> ArrayRef {
Arc::new(Int64Array::from_iter(vals.iter().cloned()))
}

fn create_test_record_batch() -> RecordBatch {
fn create_test_record_batch() -> Vec<Result<RecordBatch, DataFusionError>> {
let meta = serde_json::to_string(&json!({
"measurement_column_index": 0,
"tag_key_columns": [],
Expand All @@ -813,42 +807,57 @@ mod tests {
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("cpu", DataType::Utf8, true),
Field::new("device", DataType::Utf8, true),
Field::new("usage_idle", DataType::Float64, true),
Field::new("free", DataType::Int64, true),
Field::new("value", DataType::Utf8, true),
],
HashMap::from([("iox::influxql::group_key::metadata".to_owned(), meta)]),
));
RecordBatch::try_new(
schema,
let record_batch_0 = Ok(RecordBatch::try_new(
Arc::clone(&schema),
vec![
strs(&[Some("cpu"), Some("cpu"), Some("cpu"), Some("cpu")]),
strs(&[Some("cpu"), Some("cpu")]),
times(&[1157082300000000000, 1157082310000000000]),
strs(&[Some("cpu0"), Some("cpu0")]),
],
)
.unwrap());

let record_batch_1 = Ok(RecordBatch::try_new(
Arc::clone(&schema),
vec![
strs(&[Some("cpu"), Some("cpu"), Some("cpu")]),
times(&[
1157082300000000000,
1157082310000000000,
1157082400000000000,
1157082320000000000,
]),
strs(&[Some("cpu0"), Some("cpu0"), Some("cpu1"), Some("cpu2")]),
strs(&[Some("disk1s1"), None, Some("disk1s1"), None]),
f64s(&[Some(99.1), Some(99.8), Some(99.2), Some(99.3)]),
i64s(&[None, Some(2133), Some(4110), Some(1995)]),
strs(&[Some("cpu0"), Some("cpu0"), Some("cpu2")]),
],
)
.unwrap()
.unwrap());

let record_batch_2 = Ok(RecordBatch::try_new(
Arc::clone(&schema),
vec![
strs(&[Some("mem"), Some("mem")]),
times(&[1157082500000000000, 1157082420000000000]),
strs(&[Some("mem0"), Some("mem2")]),
],
)
.unwrap());

vec![record_batch_0, record_batch_1, record_batch_2]
}

#[tokio::test]
async fn test_partial_flag() {
let batch = create_test_record_batch();
let schema = batch.schema();
let input_stream = stream::iter(vec![Ok(batch.clone())]);
let schema = batch[0].as_ref().unwrap().schema();
let input_stream = stream::iter(batch);
let input: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
Box::pin(input_stream),
));
let chunk_size = Some(1);
let chunk_size = Some(2);
let mut query_response_stream =
QueryResponseStream::new(0, input, chunk_size, QueryFormat::Json, None).unwrap();

Expand All @@ -863,23 +872,27 @@ mod tests {
match counter {
0 => {
assert!(resp.results[0].partial.unwrap());
assert!(resp.results[0].series[0].partial.unwrap());
assert_eq!(resp.results[0].series[0].name, "cpu");
assert_eq!(resp.results[0].series[0].values.len(), 1);
assert_eq!(resp.results[0].series[0].values.len(), 2);
}
1 => {
assert!(resp.results[0].partial.unwrap());
assert!(resp.results[0].series[0].partial.unwrap());
assert_eq!(resp.results[0].series[0].name, "cpu");
assert_eq!(resp.results[0].series[0].values.len(), 1);
assert_eq!(resp.results[0].series[0].values.len(), 2);
}
2 => {
assert!(resp.results[0].partial.unwrap());
assert_eq!(resp.results[0].series[0].partial, None);
assert_eq!(resp.results[0].series[0].name, "cpu");
assert_eq!(resp.results[0].series[0].values.len(), 1);
}
3 => {
assert_eq!(resp.results[0].partial, None);
assert_eq!(resp.results[0].series[0].name, "cpu");
assert_eq!(resp.results[0].series[0].values.len(), 1);
assert_eq!(resp.results[0].series[0].partial, None);
assert_eq!(resp.results[0].series[0].name, "mem");
assert_eq!(resp.results[0].series[0].values.len(), 2);
}
_ => panic!("Received more responses than expected"),
}
Expand Down

0 comments on commit 6ad9462

Please sign in to comment.