Skip to content

Commit

Permalink
feat: enable sort spilling when memory reach the limit. (databendlabs…
Browse files Browse the repository at this point in the history
…#13989)

* Add metrics for sort spill.

* refactor: use builder pattern to build `TransformSortMerge(Limit)`.

* Enable `TransformSortSpill`.

* Ensure spill is activated.

* Move sort spill test to stateless tests.

* Fix bugs and test results.

* Fix.

* Fix tests.

---------

Co-authored-by: sundyli <[email protected]>
  • Loading branch information
RinChanNOWWW and sundy-li authored Dec 13, 2023
1 parent 193ed56 commit e69c920
Show file tree
Hide file tree
Showing 15 changed files with 899 additions and 353 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions src/common/metrics/src/metrics/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,43 @@ pub fn metrics_inc_exchange_read_count(v: usize) {
pub fn metrics_inc_exchange_read_bytes(c: usize) {
EXCHANGE_READ_BYTES.inc_by(c as u64);
}

// Sort spill metrics
pub fn metrics_inc_sort_spill_count() {
let labels = &vec![("spill", "sort_spill".to_string())];
SPILL_COUNT.get_or_create(labels).inc();
}

pub fn metrics_inc_sort_spill_write_count() {
let labels = &vec![("spill", "sort_spill".to_string())];
SPILL_WRITE_COUNT.get_or_create(labels).inc();
}

pub fn metrics_inc_sort_spill_write_bytes(c: u64) {
let labels = &vec![("spill", "sort_spill".to_string())];
SPILL_WRITE_BYTES.get_or_create(labels).inc_by(c);
}

pub fn metrics_inc_sort_spill_write_milliseconds(c: u64) {
let labels = &vec![("spill", "sort_spill".to_string())];
SPILL_WRITE_MILLISECONDS
.get_or_create(labels)
.observe(c as f64)
}

pub fn metrics_inc_sort_spill_read_count() {
let labels = &vec![("spill", "sort_spill".to_string())];
SPILL_READ_COUNT.get_or_create(labels).inc();
}

pub fn metrics_inc_sort_spill_read_bytes(c: u64) {
let labels = &vec![("spill", "sort_spill".to_string())];
SPILL_READ_BYTES.get_or_create(labels).inc_by(c);
}

pub fn metrics_inc_sort_spill_read_milliseconds(c: u64) {
let labels = &vec![("spill", "sort_spill".to_string())];
SPILL_READ_MILLISECONDS
.get_or_create(labels)
.observe(c as f64);
}
5 changes: 5 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ impl DataBlock {
})
}

#[inline]
pub fn replace_meta(&mut self, meta: BlockMetaInfoPtr) {
self.meta.replace(meta);
}

#[inline]
pub fn get_meta(&self) -> Option<&BlockMetaInfoPtr> {
self.meta.as_ref()
Expand Down
2 changes: 2 additions & 0 deletions src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ async-backtrace = { workspace = true }
async-trait = { workspace = true }
jsonb = { workspace = true }
match-template = { workspace = true }
serde = { workspace = true }
typetag = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["match-template"]
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ pub use transform_dummy::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_sort_merge::sort_merge;
pub use transform_sort_merge::*;
pub use transform_sort_merge_base::*;
pub use transform_sort_merge_limit::*;
pub use transform_sort_partial::*;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

mod cursor;
mod rows;
mod spill;
pub mod utils;

pub use cursor::*;
pub use rows::*;
pub use spill::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_expression::BlockMetaInfo;

/// Mark a partially sorted [`DataBlock`] as a block needs to be spilled.
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct SortSpillMetaWithParams {
pub batch_size: usize,
pub num_merge: usize,
}

#[typetag::serde(name = "sort_spill")]
impl BlockMetaInfo for SortSpillMetaWithParams {
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
unimplemented!("Unimplemented equals SortSpillMeta")
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
unimplemented!("Unimplemented clone SortSpillMeta")
}
}

/// Mark a partially sorted [`DataBlock`] as a block needs to be spilled.
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct SortSpillMeta {}

#[typetag::serde(name = "sort_spill")]
impl BlockMetaInfo for SortSpillMeta {
fn equals(&self, _: &Box<dyn BlockMetaInfo>) -> bool {
unimplemented!("Unimplemented equals SortSpillMeta")
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
unimplemented!("Unimplemented clone SortSpillMeta")
}
}
Loading

0 comments on commit e69c920

Please sign in to comment.