diff --git a/influxdb3_server/src/http/v1.rs b/influxdb3_server/src/http/v1.rs index a5b2cedb868..fbfaa5037d7 100644 --- a/influxdb3_server/src/http/v1.rs +++ b/influxdb3_server/src/http/v1.rs @@ -894,4 +894,78 @@ mod tests { // Ensure we received exactly 4 responses assert_eq!(counter, 4, "Expected 4 responses, but received {}", counter); } + + #[tokio::test] + async fn test_partial_flag_one_stream() { + let meta = serde_json::to_string(&json!({ + "measurement_column_index": 0, + "tag_key_columns": [], + })) + .unwrap(); + let schema = Arc::new(Schema::new_with_metadata( + vec![ + Field::new("iox::measurement", DataType::Utf8, false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("value", DataType::Utf8, true), + ], + HashMap::from([("iox::influxql::group_key::metadata".to_owned(), meta)]), + )); + let record_batch_0 = Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![ + strs(&[Some("cpu"), Some("cpu")]), + times(&[1157082300000000000, 1157082310000000000]), + strs(&[Some("cpu0"), Some("cpu0")]), + ], + ) + .unwrap()); + + let batch = vec![record_batch_0]; + let schema = batch[0].as_ref().unwrap().schema(); + let input_stream = stream::iter(batch); + let input: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( + schema, + Box::pin(input_stream), + )); + let chunk_size = Some(1); + let mut query_response_stream = + QueryResponseStream::new(0, input, chunk_size, QueryFormat::Json, None).unwrap(); + + // Counters for assertions + let mut counter = 0; + + while let Some(response) = query_response_stream.next().await { + match response { + Ok(resp) => { + println!("Received response: {:?}", resp); + + match counter { + 0 => { + assert!(resp.results[0].partial.unwrap()); + assert!(resp.results[0].series[0].partial.unwrap()); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 2); + } + 1 => { + assert_eq!(resp.results[0].partial, None); + assert_eq!(resp.results[0].series[0].partial, None); + assert_eq!(resp.results[0].series[0].name, "cpu"); + assert_eq!(resp.results[0].series[0].values.len(), 2); + } + _ => (), + } + + counter += 1; + } + Err(err) => panic!("Error while polling stream: {:?}", err), + } + } + + // Ensure we received exactly 4 responses + assert_eq!(counter, 2, "Expected 2 responses, but received {}", counter); + } }