Skip to content

Commit

Permalink
feat: add TimeSeriesRowSelector hint (#4327)
Browse files Browse the repository at this point in the history
* feat: Add TimeSeriesRowSelector

* feat: scan allow specify series row selector

* chore: Update comment
  • Loading branch information
evenyag authored Jul 9, 2024
1 parent 1ddf19d commit 458e5d7
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ impl MetadataRegion {
filters: vec![],
output_ordering: None,
limit: None,
series_row_selector: None,
};
let record_batch_stream = self
.mito
Expand Down Expand Up @@ -405,6 +406,7 @@ impl MetadataRegion {
filters: vec![filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
}
}

Expand Down Expand Up @@ -565,6 +567,7 @@ mod test {
filters: vec![expected_filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
};
let actual_scan_request = MetadataRegion::build_read_request(key);
assert_eq!(actual_scan_request, expected_scan_request);
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine/projection_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ async fn test_scan_projection() {
filters: Vec::new(),
output_ordering: None,
limit: None,
series_row_selector: None,
};
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
18 changes: 16 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use smallvec::SmallVec;
use store_api::region_engine::RegionScannerRef;
use store_api::storage::ScanRequest;
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -297,7 +297,8 @@ impl ScanRegion {
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
.with_merge_mode(self.version.options.merge_mode());
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector.clone());
Ok(input)
}

Expand Down Expand Up @@ -410,6 +411,8 @@ pub(crate) struct ScanInput {
pub(crate) filter_deleted: bool,
/// Mode to merge duplicate rows.
pub(crate) merge_mode: MergeMode,
/// Hint to select rows from time series.
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
}

impl ScanInput {
Expand All @@ -431,6 +434,7 @@ impl ScanInput {
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::default(),
series_row_selector: None,
}
}

Expand Down Expand Up @@ -517,6 +521,16 @@ impl ScanInput {
self
}

/// Sets the time series row selector.
#[must_use]
pub(crate) fn with_series_row_selector(
mut self,
series_row_selector: Option<TimeSeriesRowSelector>,
) -> Self {
self.series_row_selector = series_row_selector;
self
}

/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
Expand Down
2 changes: 1 addition & 1 deletion src/store-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ pub use datatypes::schema::{
};

pub use self::descriptors::*;
pub use self::requests::ScanRequest;
pub use self::requests::{ScanRequest, TimeSeriesRowSelector};
pub use self::types::SequenceNumber;
9 changes: 9 additions & 0 deletions src/store-api/src/storage/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
use common_recordbatch::OrderOption;
use datafusion_expr::expr::Expr;

/// A hint on how to select rows from a time-series.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TimeSeriesRowSelector {
/// Only keep the last row of each time-series.
LastRow,
}

#[derive(Default, Clone, Debug, PartialEq, Eq)]
pub struct ScanRequest {
/// Indices of columns to read, `None` to read all columns. This indices is
Expand All @@ -29,4 +36,6 @@ pub struct ScanRequest {
/// If set, it contains the amount of rows needed by the caller,
/// The data source should return *at least* this number of rows if available.
pub limit: Option<usize>,
/// Optional hint to select rows from time-series.
pub series_row_selector: Option<TimeSeriesRowSelector>,
}

0 comments on commit 458e5d7

Please sign in to comment.