Skip to content

Commit

Permalink
restrict maturity period to retention (quickwit-oss#5543)
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a authored Nov 8, 2024
1 parent d4ad40d commit db1751b
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 9 deletions.
9 changes: 8 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use quickwit_actors::{
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_config::{IndexingSettings, SourceConfig};
use quickwit_config::{IndexingSettings, RetentionPolicy, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_ingest::IngesterPool;
use quickwit_proto::indexing::IndexingPipelineId;
Expand Down Expand Up @@ -367,6 +367,7 @@ impl IndexingPipeline {
UploaderType::IndexUploader,
self.params.metastore.clone(),
self.params.merge_policy.clone(),
self.params.retention_policy.clone(),
self.params.split_store.clone(),
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
self.params.max_concurrent_split_uploads_index,
Expand Down Expand Up @@ -585,6 +586,7 @@ pub struct IndexingPipelineParams {

// Merge-related parameters
pub merge_policy: Arc<dyn MergePolicy>,
pub retention_policy: Option<RetentionPolicy>,
pub merge_planner_mailbox: Mailbox<MergePlanner>,
pub max_concurrent_split_uploads_merge: usize,

Expand Down Expand Up @@ -717,6 +719,7 @@ mod tests {
storage,
split_store,
merge_policy: default_merge_policy(),
retention_policy: None,
queues_dir_path: PathBuf::from("./queues"),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
Expand Down Expand Up @@ -831,6 +834,7 @@ mod tests {
storage,
split_store,
merge_policy: default_merge_policy(),
retention_policy: None,
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down Expand Up @@ -908,6 +912,7 @@ mod tests {
metastore: metastore.clone(),
split_store: split_store.clone(),
merge_policy: default_merge_policy(),
retention_policy: None,
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
Expand All @@ -930,6 +935,7 @@ mod tests {
storage,
split_store,
merge_policy: default_merge_policy(),
retention_policy: None,
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down Expand Up @@ -1057,6 +1063,7 @@ mod tests {
storage,
split_store,
merge_policy: default_merge_policy(),
retention_policy: None,
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl IndexingService {
})?;
let merge_policy =
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
let retention_policy = index_config.retention_policy_opt.clone();
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
Expand All @@ -301,6 +302,7 @@ impl IndexingService {
split_store: split_store.clone(),
merge_scheduler_service: self.merge_scheduler_service.clone(),
merge_policy: merge_policy.clone(),
retention_policy: retention_policy.clone(),
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
Expand Down Expand Up @@ -329,6 +331,7 @@ impl IndexingService {

// Merge-related parameters
merge_policy,
retention_policy,
max_concurrent_split_uploads_merge,
merge_planner_mailbox,

Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use quickwit_common::io::{IoControls, Limiter};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_config::RetentionPolicy;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata,
Expand Down Expand Up @@ -286,6 +287,7 @@ impl MergePipeline {
UploaderType::MergeUploader,
self.params.metastore.clone(),
self.params.merge_policy.clone(),
self.params.retention_policy.clone(),
self.params.split_store.clone(),
merge_publisher_mailbox.into(),
self.params.max_concurrent_split_uploads,
Expand Down Expand Up @@ -572,6 +574,7 @@ pub struct MergePipelineParams {
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub retention_policy: Option<RetentionPolicy>,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
Expand Down Expand Up @@ -635,6 +638,7 @@ mod tests {
merge_scheduler_service: universe.get_or_spawn_one(),
split_store,
merge_policy: default_merge_policy(),
retention_policy: None,
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
Expand Down
12 changes: 12 additions & 0 deletions quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use once_cell::sync::OnceCell;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::spawn_named_task;
use quickwit_config::RetentionPolicy;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_metastore::{SplitMetadata, StageSplitsRequestExt};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, StageSplitsRequest};
Expand Down Expand Up @@ -166,6 +167,7 @@ pub struct Uploader {
uploader_type: UploaderType,
metastore: MetastoreServiceClient,
merge_policy: Arc<dyn MergePolicy>,
retention_policy: Option<RetentionPolicy>,
split_store: IndexingSplitStore,
split_update_mailbox: SplitsUpdateMailbox,
max_concurrent_split_uploads: usize,
Expand All @@ -174,10 +176,12 @@ pub struct Uploader {
}

impl Uploader {
#[allow(clippy::too_many_arguments)]
pub fn new(
uploader_type: UploaderType,
metastore: MetastoreServiceClient,
merge_policy: Arc<dyn MergePolicy>,
retention_policy: Option<RetentionPolicy>,
split_store: IndexingSplitStore,
split_update_mailbox: SplitsUpdateMailbox,
max_concurrent_split_uploads: usize,
Expand All @@ -187,6 +191,7 @@ impl Uploader {
uploader_type,
metastore,
merge_policy,
retention_policy,
split_store,
split_update_mailbox,
max_concurrent_split_uploads,
Expand Down Expand Up @@ -300,6 +305,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
let index_uid = batch.index_uid();
let ctx_clone = ctx.clone();
let merge_policy = self.merge_policy.clone();
let retention_policy = self.retention_policy.clone();
debug!(split_ids=?split_ids, "start-stage-and-store-splits");
let event_broker = self.event_broker.clone();
spawn_named_task(
Expand All @@ -324,6 +330,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
)?;
let split_metadata = create_split_metadata(
&merge_policy,
retention_policy.as_ref(),
&packaged_split.split_attrs,
packaged_split.tags.clone(),
split_streamer.footer_range.start..split_streamer.footer_range.end,
Expand Down Expand Up @@ -535,6 +542,7 @@ mod tests {
UploaderType::IndexUploader,
MetastoreServiceClient::from_mock(mock_metastore),
merge_policy,
None,
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down Expand Up @@ -650,6 +658,7 @@ mod tests {
UploaderType::IndexUploader,
MetastoreServiceClient::from_mock(mock_metastore),
merge_policy,
None,
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down Expand Up @@ -797,6 +806,7 @@ mod tests {
UploaderType::IndexUploader,
MetastoreServiceClient::from_mock(mock_metastore),
merge_policy,
None,
split_store,
SplitsUpdateMailbox::Publisher(publisher_mailbox),
4,
Expand Down Expand Up @@ -870,6 +880,7 @@ mod tests {
UploaderType::IndexUploader,
MetastoreServiceClient::from_mock(mock_metastore),
default_merge_policy(),
None,
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down Expand Up @@ -974,6 +985,7 @@ mod tests {
UploaderType::IndexUploader,
MetastoreServiceClient::from_mock(mock_metastore),
merge_policy,
None,
split_store,
SplitsUpdateMailbox::Publisher(publisher_mailbox),
4,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/merge_policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ pub mod tests {
source_id: "test_source".to_string(),
};
let split_attrs = merge_split_attrs(pipeline_id, merged_split_id, splits).unwrap();
create_split_metadata(merge_policy, &split_attrs, tags, 0..0)
create_split_metadata(merge_policy, None, &split_attrs, tags, 0..0)
}

fn apply_merge(
Expand Down
101 changes: 95 additions & 6 deletions quickwit/quickwit-indexing/src/models/split_attrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use std::collections::BTreeSet;
use std::fmt;
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use std::time::Duration;

use quickwit_metastore::SplitMetadata;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId, SourceId, SplitId};
use tantivy::DateTime;
use time::OffsetDateTime;
Expand Down Expand Up @@ -92,13 +93,27 @@ impl fmt::Debug for SplitAttrs {

pub fn create_split_metadata(
merge_policy: &Arc<dyn MergePolicy>,
retention_policy: Option<&quickwit_config::RetentionPolicy>,
split_attrs: &SplitAttrs,
tags: BTreeSet<String>,
footer_offsets: Range<u64>,
) -> SplitMetadata {
let create_timestamp = OffsetDateTime::now_utc().unix_timestamp();
let maturity =

let time_range = split_attrs
.time_range
.as_ref()
.map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs());

let mut maturity =
merge_policy.split_maturity(split_attrs.num_docs as usize, split_attrs.num_merge_ops);
if let Some(max_maturity) = max_maturity_before_end_of_retention(
retention_policy,
create_timestamp,
time_range.as_ref().map(|time_range| *time_range.end()),
) {
maturity = maturity.min(max_maturity);
}
SplitMetadata {
node_id: split_attrs.node_id.to_string(),
index_uid: split_attrs.index_uid.clone(),
Expand All @@ -107,10 +122,7 @@ pub fn create_split_metadata(
split_id: split_attrs.split_id.clone(),
partition_id: split_attrs.partition_id,
num_docs: split_attrs.num_docs as usize,
time_range: split_attrs
.time_range
.as_ref()
.map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs()),
time_range,
uncompressed_docs_size_in_bytes: split_attrs.uncompressed_docs_size_in_bytes,
create_timestamp,
maturity,
Expand All @@ -120,3 +132,80 @@ pub fn create_split_metadata(
num_merge_ops: split_attrs.num_merge_ops,
}
}

/// reduce the maturity period of a split based on retention policy, so that it doesn't get merged
/// after it expires.
fn max_maturity_before_end_of_retention(
retention_policy: Option<&quickwit_config::RetentionPolicy>,
create_timestamp: i64,
time_range_end: Option<i64>,
) -> Option<SplitMaturity> {
let time_range_end = time_range_end? as u64;
let retention_period_s = retention_policy?.retention_period().ok()?.as_secs();

let maturity = if let Some(maturation_period_s) =
(time_range_end + retention_period_s).checked_sub(create_timestamp as u64)
{
SplitMaturity::Immature {
maturation_period: Duration::from_secs(maturation_period_s),
}
} else {
// this split could be deleted as soon as it is created. Ideally we would
// handle that sooner.
SplitMaturity::Mature
};
Some(maturity)
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use quickwit_metastore::SplitMaturity;

use super::max_maturity_before_end_of_retention;

#[test]
fn test_max_maturity_before_end_of_retention() {
let retention_policy = quickwit_config::RetentionPolicy {
evaluation_schedule: "daily".to_string(),
retention_period: "300 sec".to_string(),
};
let create_timestamp = 1000;

// this should be deleted asap, not subject to merge
assert_eq!(
max_maturity_before_end_of_retention(
Some(&retention_policy),
create_timestamp,
Some(200),
),
Some(SplitMaturity::Mature)
);

// retention ends at 750 + 300 = 1050, which is 50s from now
assert_eq!(
max_maturity_before_end_of_retention(
Some(&retention_policy),
create_timestamp,
Some(750),
),
Some(SplitMaturity::Immature {
maturation_period: Duration::from_secs(50)
})
);

// no retention policy
assert_eq!(
max_maturity_before_end_of_retention(None, create_timestamp, Some(850),),
None,
);

// no timestamp_range.end but a retention policy, that's odd, don't change anything about
// the maturity period
assert_eq!(
max_maturity_before_end_of_retention(Some(&retention_policy), create_timestamp, None,),
None,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl DeleteTaskPipeline {
UploaderType::DeleteUploader,
self.metastore.clone(),
merge_policy,
index_config.retention_policy_opt.clone(),
split_store.clone(),
SplitsUpdateMailbox::Publisher(publisher_mailbox),
self.max_concurrent_split_uploads,
Expand Down
19 changes: 18 additions & 1 deletion quickwit/quickwit-metastore/src/split_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl FromStr for SplitState {
/// or `Immature` with a given maturation period.
/// The maturity is determined by the `MergePolicy`.
#[serde_as]
#[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq, PartialOrd, Ord)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum SplitMaturity {
Expand Down Expand Up @@ -439,4 +439,21 @@ mod tests {

assert_eq!(format!("{:?}", split_metadata), expected_output);
}

#[test]
fn test_spit_maturity_order() {
assert!(
SplitMaturity::Mature
< SplitMaturity::Immature {
maturation_period: Duration::from_secs(0)
}
);
assert!(
SplitMaturity::Immature {
maturation_period: Duration::from_secs(0)
} < SplitMaturity::Immature {
maturation_period: Duration::from_secs(1)
}
);
}
}

0 comments on commit db1751b

Please sign in to comment.