From 458e5d7e6601aaabfcc941111863bf7200af46bd Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 9 Jul 2024 20:29:47 +0800 Subject: [PATCH] feat: add `TimeSeriesRowSelector` hint (#4327) * feat: Add TimeSeriesRowSelector * feat: scan allow specify series row selector * chore: Update comment --- src/metric-engine/src/metadata_region.rs | 3 +++ src/mito2/src/engine/projection_test.rs | 1 + src/mito2/src/read/scan_region.rs | 18 ++++++++++++++++-- src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/requests.rs | 9 +++++++++ 5 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index a246977649ea..8e892e7ba726 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -348,6 +348,7 @@ impl MetadataRegion { filters: vec![], output_ordering: None, limit: None, + series_row_selector: None, }; let record_batch_stream = self .mito @@ -405,6 +406,7 @@ impl MetadataRegion { filters: vec![filter_expr], output_ordering: None, limit: None, + series_row_selector: None, } } @@ -565,6 +567,7 @@ mod test { filters: vec![expected_filter_expr], output_ordering: None, limit: None, + series_row_selector: None, }; let actual_scan_request = MetadataRegion::build_read_request(key); assert_eq!(actual_scan_request, expected_scan_request); diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index 6e31a56c8b37..37a458082086 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -78,6 +78,7 @@ async fn test_scan_projection() { filters: Vec::new(), output_ordering: None, limit: None, + series_row_selector: None, }; let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d5c128c6bf5d..3ba32250bf23 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -26,7 +26,7 @@ use common_time::Timestamp; use datafusion::physical_plan::DisplayFormatType; use smallvec::SmallVec; use store_api::region_engine::RegionScannerRef; -use store_api::storage::ScanRequest; +use store_api::storage::{ScanRequest, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; use tokio::sync::{mpsc, Mutex, Semaphore}; use tokio_stream::wrappers::ReceiverStream; @@ -297,7 +297,8 @@ impl ScanRegion { .with_start_time(self.start_time) .with_append_mode(self.version.options.append_mode) .with_filter_deleted(filter_deleted) - .with_merge_mode(self.version.options.merge_mode()); + .with_merge_mode(self.version.options.merge_mode()) + .with_series_row_selector(self.request.series_row_selector.clone()); Ok(input) } @@ -410,6 +411,8 @@ pub(crate) struct ScanInput { pub(crate) filter_deleted: bool, /// Mode to merge duplicate rows. pub(crate) merge_mode: MergeMode, + /// Hint to select rows from time series. + pub(crate) series_row_selector: Option, } impl ScanInput { @@ -431,6 +434,7 @@ impl ScanInput { append_mode: false, filter_deleted: true, merge_mode: MergeMode::default(), + series_row_selector: None, } } @@ -517,6 +521,16 @@ impl ScanInput { self } + /// Sets the time series row selector. + #[must_use] + pub(crate) fn with_series_row_selector( + mut self, + series_row_selector: Option, + ) -> Self { + self.series_row_selector = series_row_selector; + self + } + /// Scans sources in parallel. /// /// # Panics if the input doesn't allow parallel scan. diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index fcbf4ef09cda..d3331e4e707e 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -25,5 +25,5 @@ pub use datatypes::schema::{ }; pub use self::descriptors::*; -pub use self::requests::ScanRequest; +pub use self::requests::{ScanRequest, TimeSeriesRowSelector}; pub use self::types::SequenceNumber; diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 5e43a2ab0dbc..42af126ff9db 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -15,6 +15,13 @@ use common_recordbatch::OrderOption; use datafusion_expr::expr::Expr; +/// A hint on how to select rows from a time-series. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum TimeSeriesRowSelector { + /// Only keep the last row of each time-series. + LastRow, +} + #[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct ScanRequest { /// Indices of columns to read, `None` to read all columns. This indices is @@ -29,4 +36,6 @@ pub struct ScanRequest { /// If set, it contains the amount of rows needed by the caller, /// The data source should return *at least* this number of rows if available. pub limit: Option, + /// Optional hint to select rows from time-series. + pub series_row_selector: Option, }