Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implements row group level parallel unordered scanner #3992

Merged
merged 18 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ mod tests {
let file_metas: Vec<_> = data.version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta())
.map(|file| file.meta_ref().clone())
.collect();

// 5 files for next compaction and removes old files.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl CompactionTaskImpl {
Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum());

for output in self.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));

info!(
"Compaction region {} output [{}]-> {}",
Expand Down Expand Up @@ -229,7 +229,7 @@ impl CompactionTaskImpl {
return Err(e);
}
};
deleted.extend(self.expired_ssts.iter().map(FileHandle::meta));
deleted.extend(self.expired_ssts.iter().map(|f| f.meta_ref().clone()));
let merge_time = merge_timer.stop_and_record();
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ impl MitoEngine {
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}

/// Returns a scanner to scan for `request`.
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::sst::parquet::reader::RowGroupReader;

/// Storage internal representation of a batch of rows for a primary key (time series).
///
Expand Down Expand Up @@ -699,6 +700,8 @@ pub enum Source {
Iter(BoxedBatchIterator),
/// Source from a [BoxedBatchStream].
Stream(BoxedBatchStream),
/// Source from a [RowGroupReader].
RowGroupReader(RowGroupReader),
}

impl Source {
Expand All @@ -708,6 +711,7 @@ impl Source {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
Source::Stream(stream) => stream.try_next().await,
Source::RowGroupReader(reader) => reader.next_batch().await,
}
}
}
Expand Down
44 changes: 34 additions & 10 deletions src/mito2/src/read/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
pub struct CompatReader<R> {
/// Underlying reader.
reader: R,
/// Optional primary key adapter.
compat_pk: Option<CompatPrimaryKey>,
/// Optional fields adapter.
compat_fields: Option<CompatFields>,
/// Helper to compat batches.
compat: CompatBatch,
}

impl<R> CompatReader<R> {
Expand All @@ -48,13 +46,9 @@ impl<R> CompatReader<R> {
reader_meta: RegionMetadataRef,
reader: R,
) -> Result<CompatReader<R>> {
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
let compat_fields = may_compat_fields(mapper, &reader_meta)?;

Ok(CompatReader {
reader,
compat_pk,
compat_fields,
compat: CompatBatch::new(mapper, reader_meta)?,
})
}
}
Expand All @@ -66,14 +60,44 @@ impl<R: BatchReader> BatchReader for CompatReader<R> {
return Ok(None);
};

batch = self.compat.compat_batch(batch)?;

Ok(Some(batch))
}
}

/// A helper struct to adapt schema of the batch to an expected schema.
pub(crate) struct CompatBatch {
/// Optional primary key adapter.
compat_pk: Option<CompatPrimaryKey>,
/// Optional fields adapter.
compat_fields: Option<CompatFields>,
}

impl CompatBatch {
/// Creates a new [CompatBatch].
/// - `mapper` is built from the metadata users expect to see.
/// - `reader_meta` is the metadata of the input reader.
pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
let compat_fields = may_compat_fields(mapper, &reader_meta)?;

Ok(Self {
compat_pk,
compat_fields,
})
}

/// Adapts the `batch` to the expected schema.
pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
if let Some(compat_pk) = &self.compat_pk {
batch = compat_pk.compat(batch)?;
}
if let Some(compat_fields) = &self.compat_fields {
batch = compat_fields.compat(batch);
}

Ok(Some(batch))
Ok(batch)
}
}

Expand Down
121 changes: 106 additions & 15 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

//! Scans a region according to the scan request.

use std::fmt;
use std::sync::Arc;
use std::time::Instant;

use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
Expand All @@ -32,15 +34,16 @@ use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::CompatReader;
use crate::read::compat::{CompatBatch, CompatReader};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{compat, Batch, Source};
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
Expand All @@ -51,20 +54,24 @@ pub(crate) enum Scanner {
}

impl Scanner {
/// Returns a [SendableRecordBatchStream] to retrieve scan results.
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream> {
/// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build_stream().await,
Scanner::Seq(seq_scan) => seq_scan.build_stream().await.map_err(BoxedError::new),
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}

/// Returns a [RegionScanner] to scan the region.
pub(crate) async fn region_scanner(&self) -> Result<RegionScannerRef> {
let stream = self.scan().await?;
let scanner = SinglePartitionScanner::new(stream);

Ok(Arc::new(scanner))
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => {
let stream = seq_scan.build_stream().await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}
Scanner::Unordered(unordered_scan) => Ok(Arc::new(unordered_scan)),
}
}
}

Expand Down Expand Up @@ -222,9 +229,7 @@ impl ScanRegion {
/// Unordered scan.
pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
let input = self.scan_input(true)?;
let scan = UnorderedScan::new(input);

Ok(scan)
Ok(UnorderedScan::new(input))
}

#[cfg(test)]
Expand Down Expand Up @@ -386,7 +391,7 @@ pub(crate) struct ScanInput {
/// Time range filter for time index.
time_range: Option<TimestampRange>,
/// Predicate to push down.
predicate: Option<Predicate>,
pub(crate) predicate: Option<Predicate>,
/// Memtables to scan.
pub(crate) memtables: Vec<MemtableRef>,
/// Handles to SST files to scan.
Expand Down Expand Up @@ -498,7 +503,6 @@ impl ScanInput {
}

/// Sets whether to remove deletion markers during scan.
#[allow(unused)]
#[must_use]
pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
self.filter_deleted = filter_deleted;
Expand Down Expand Up @@ -572,6 +576,61 @@ impl ScanInput {
Ok(sources)
}

/// Prunes file ranges to scan and adds them tothe `collector`.
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
) -> Result<()> {
for file in &self.files {
let res = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.index_applier(self.index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.await;
let (mut file_range_ctx, row_groups) = match res {
Ok(x) => x,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
continue;
} else {
return Err(e);
}
}
};
if !compat::has_same_columns(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
) {
// They have different schema. We need to adapt the batch first so the
// mapper can convert it.
let compat = CompatBatch::new(
&self.mapper,
file_range_ctx.read_format().metadata().clone(),
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
// Build ranges from row groups.
let file_range_ctx = Arc::new(file_range_ctx);
let file_ranges = row_groups
.into_iter()
.map(|(row_group_idx, row_selection)| {
FileRange::new(file_range_ctx.clone(), row_group_idx, row_selection)
});
collector.append_file_ranges(file.meta_ref(), file_ranges);
}

READ_SST_COUNT.observe(self.files.len() as f64);

Ok(())
}

/// Scans the input source in another task and sends batches to the sender.
pub(crate) fn spawn_scan_task(
&self,
Expand Down Expand Up @@ -620,3 +679,35 @@ impl ScanInput {
self.files.iter().map(|file| file.file_id()).collect()
}
}

/// A partition of a scanner to read.
/// It contains memtables and file ranges to scan.
#[derive(Default)]
pub(crate) struct ScanPart {
/// Memtables to scan.
/// We scan the whole memtable now. We might scan a range of the memtable in the future.
pub(crate) memtables: Vec<MemtableRef>,
/// File ranges to scan.
pub(crate) file_ranges: Vec<FileRange>,
}

impl fmt::Debug for ScanPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ScanPart({} memtables, {} file ranges)",
self.memtables.len(),
self.file_ranges.len()
)
}
}

/// A trait to collect file ranges to scan.
pub(crate) trait FileRangeCollector {
/// Appends file ranges from the **same file** to the collector.
fn append_file_ranges(
&mut self,
file_meta: &FileMeta,
file_ranges: impl Iterator<Item = FileRange>,
);
}
Loading