Skip to content

Commit

Permalink
chore: update proto
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed May 20, 2024
1 parent a2b8d39 commit 747903f
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 105 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/v0y4g3r/greptime-proto.git", rev = "3025c3da404c2c9ecc074757e7e76a78440ef20c" }
greptime-proto = { git = "https://github.com/v0y4g3r/greptime-proto.git", rev = "b10371f5270e31170b5f4cc8e3e705c8f9882308" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
22 changes: 12 additions & 10 deletions src/common/function/src/table/flush_compact_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use std::fmt;
use std::str::FromStr;

use api::v1::region::compact_type::Ty;
use api::v1::region::{CompactType, StrictWindow};
use api::v1::region::{compact_request, StrictWindow};
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
Expand Down Expand Up @@ -132,7 +131,10 @@ fn parse_compact_params(
);

let (table_name, compact_type) = match params {
[ValueRef::String(table_name)] => (table_name, CompactType::default()),
[ValueRef::String(table_name)] => (
table_name,
compact_request::Options::Regular(Default::default()),
),
[ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => {
let compact_type = parse_compact_type(compact_ty_str, None)?;
(table_name, compact_type)
Expand Down Expand Up @@ -160,18 +162,18 @@ fn parse_compact_params(
catalog_name,
schema_name,
table_name,
compact_type,
compact_options: compact_type,
})
}

fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<CompactType> {
fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
if type_str.eq_ignore_ascii_case("strict_window") {
let window = option.and_then(|v| i64::from_str(v).ok()).unwrap_or(0);
Ok(CompactType {
ty: Some(Ty::StrictWindow(StrictWindow { window })),
})
let window_seconds = option.and_then(|v| i64::from_str(v).ok()).unwrap_or(0);
Ok(compact_request::Options::StrictWindow(StrictWindow {
window_seconds,
}))
} else {
Ok(CompactType::default())
Ok(compact_request::Options::Regular(Default::default()))
}
}

Expand Down
42 changes: 21 additions & 21 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use api::greptime_proto::v1::region::CompactType;
use api::v1::region::compact_type::Ty;
use api::v1::region::compact_request;
use common_query::logical_plan::DfExpr;
use common_telemetry::{debug, error};
use common_time::range::TimestampRange;
Expand Down Expand Up @@ -68,7 +67,7 @@ pub struct CompactionRequest {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) compact_type: CompactType,
pub(crate) compact_options: compact_request::Options,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Waiters of the compaction request.
Expand Down Expand Up @@ -141,7 +140,7 @@ impl CompactionScheduler {
pub(crate) fn schedule_compaction(
&mut self,
region_id: RegionId,
compact_type: CompactType,
compact_options: compact_request::Options,
version_control: &VersionControlRef,
access_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
Expand All @@ -162,7 +161,7 @@ impl CompactionScheduler {
file_purger.clone(),
);
let request = status.new_compaction_request(
compact_type,
compact_options,
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
Expand All @@ -186,7 +185,7 @@ impl CompactionScheduler {

// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
CompactType::default(), // todo(hl): we should not trigger a regular compaction after major compaction.
compact_request::Options::Regular(Default::default()), // todo(hl): we should not trigger a regular compaction after major compaction.
self.request_sender.clone(),
OptionOutputTx::none(),
self.engine_config.clone(),
Expand Down Expand Up @@ -237,16 +236,17 @@ impl CompactionScheduler {
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> {
let picker = if let Some(Ty::StrictWindow(window)) = &request.compact_type.ty {
let window = if window.window == 0 {
None
let picker =
if let compact_request::Options::StrictWindow(window) = &request.compact_options {
let window = if window.window_seconds == 0 {
None
} else {
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
Some(window.window)
compaction_options_to_picker(&request.current_version.options.compaction)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
compaction_options_to_picker(&request.current_version.options.compaction)
};

let region_id = request.region_id();
debug!(
Expand Down Expand Up @@ -374,7 +374,7 @@ impl CompactionStatus {
#[allow(clippy::too_many_arguments)]
fn new_compaction_request(
&mut self,
compact_type: CompactType,
compact_options: compact_request::Options,
request_sender: Sender<WorkerRequest>,
waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
Expand All @@ -388,7 +388,7 @@ impl CompactionStatus {
engine_config,
current_version,
access_layer: self.access_layer.clone(),
compact_type,
compact_options,
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
Expand Down Expand Up @@ -562,7 +562,7 @@ mod tests {
scheduler
.schedule_compaction(
builder.region_id(),
CompactType::default(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
Expand All @@ -581,7 +581,7 @@ mod tests {
scheduler
.schedule_compaction(
builder.region_id(),
CompactType::default(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
Expand Down Expand Up @@ -644,7 +644,7 @@ mod tests {
scheduler
.schedule_compaction(
region_id,
CompactType::default(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
Expand Down Expand Up @@ -673,7 +673,7 @@ mod tests {
scheduler
.schedule_compaction(
region_id,
CompactType::default(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
scheduler
.schedule_compaction(
region_id,
CompactType::default(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
Expand Down
13 changes: 6 additions & 7 deletions src/mito2/src/compaction/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use std::collections::BTreeMap;
use std::fmt::Debug;

use api::v1::region::compact_type::Ty;
use api::v1::region::CompactType;
use api::v1::region::compact_request;
use common_telemetry::info;
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
Expand Down Expand Up @@ -53,13 +52,13 @@ impl WindowedCompactionPicker {
fn calculate_time_window(
&self,
region_id: RegionId,
compact_type: CompactType,
options: compact_request::Options,
current_version: &VersionRef,
) -> i64 {
let window = if let Some(Ty::StrictWindow(w)) = &compact_type.ty {
if w.window != 0 {
let window = if let compact_request::Options::StrictWindow(w) = &options {
if w.window_seconds != 0 {
// 0 means window is not provided.
Some(w.window)
Some(w.window_seconds)
} else {
None
}
Expand Down Expand Up @@ -90,7 +89,7 @@ impl Picker for WindowedCompactionPicker {
engine_config,
current_version,
access_layer,
compact_type,
compact_options: compact_type,
request_sender,
waiters,
file_purger,
Expand Down
5 changes: 1 addition & 4 deletions src/mito2/src/engine/append_mode_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

//! Tests for append mode.
use api::v1::region::CompactType;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
Expand Down Expand Up @@ -140,9 +139,7 @@ async fn test_append_mode_compaction() {
let output = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest {
compact_type: CompactType::default(),
}),
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
Expand Down
12 changes: 3 additions & 9 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ async fn test_compaction_region() {
let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest {
compact_type: Default::default(),
}),
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
Expand Down Expand Up @@ -186,9 +184,7 @@ async fn test_compaction_region_with_overlapping() {
let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest {
compact_type: Default::default(),
}),
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
Expand Down Expand Up @@ -239,9 +235,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest {
compact_type: Default::default(),
}),
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
COMPACTION_REQUEST_COUNT.inc();
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
req.compact_type,
req.options,
&region.version_control,
&region.access_layer,
&region.file_purger,
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::sync::Arc;

use api::v1::region::CompactType;
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
Expand Down Expand Up @@ -237,7 +237,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Schedules compaction.
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
CompactType::default(),
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
&region.file_purger,
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Requester {
.map(|partition| {
RegionRequestBody::Compact(CompactRequest {
region_id: partition.id.into(),
compact_type: Some(request.compact_type.clone()),
options: Some(request.compact_options.clone()),
})
})
.collect();
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Requester {
) -> Result<AffectedRows> {
let request = RegionRequestBody::Compact(CompactRequest {
region_id: region_id.into(),
compact_type: None, // todo(hl): maybe also support parameters in region compaction.
options: None, // todo(hl): maybe also support parameters in region compaction.
});

info!("Handle region manual compaction request: {region_id}");
Expand Down
21 changes: 16 additions & 5 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::fmt;
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::region::{
alter_request, region_request, AlterRequest, AlterRequests, CloseRequest, CompactRequest,
CompactType, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
alter_request, compact_request, region_request, AlterRequest, AlterRequests, CloseRequest,
CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{self, Rows, SemanticType};
Expand Down Expand Up @@ -199,10 +199,12 @@ fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest

fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = compact.region_id.into();
let compact_type = compact.compact_type.unwrap_or_default();
let options = compact
.options
.unwrap_or(compact_request::Options::Regular(Default::default()));
Ok(vec![(
region_id,
RegionRequest::Compact(RegionCompactRequest { compact_type }),
RegionRequest::Compact(RegionCompactRequest { options }),
)])
}

Expand Down Expand Up @@ -644,7 +646,16 @@ pub struct RegionFlushRequest {

#[derive(Debug)]
pub struct RegionCompactRequest {
pub compact_type: CompactType,
pub options: compact_request::Options,
}

impl Default for RegionCompactRequest {
fn default() -> Self {
Self {
// Default to regular compaction.
options: compact_request::Options::Regular(Default::default()),
}
}
}

/// Truncate region request.
Expand Down
Loading

0 comments on commit 747903f

Please sign in to comment.