diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index b48dc2ccfb08..7eb77b87ed7e 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -23,6 +23,12 @@ use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; +use futures::TryStreamExt; +use log_store::kafka::log_store::KafkaLogStore; +use rstest::rstest; +use rstest_reuse::{self, apply}; +use store_api::logstore::provider::Provider; +use store_api::logstore::LogStore; use store_api::metadata::ColumnMetadata; use store_api::region_engine::RegionEngine; use store_api::region_request::{ @@ -34,9 +40,12 @@ use crate::config::MitoConfig; use crate::engine::listener::AlterFlushListener; use crate::engine::MitoEngine; use crate::test_util::{ - build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, - TestEnv, + build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, + prepare_test_for_kafka_log_store, put_rows, rows_schema, single_kafka_log_store_factory, + CreateRequestBuilder, LogStoreFactory, TestEnv, }; +use crate::wal::entry_reader::decode_stream; +use crate::wal::raw_entry_reader::flatten_stream; async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) { let request = ScanRequest::default(); @@ -68,6 +77,51 @@ fn add_tag1() -> RegionAlterRequest { } } +#[apply(single_kafka_log_store_factory)] +async fn test_alter_region_notification(factory: Option) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + + let mut env = TestEnv::new().with_log_store_factory(factory.clone()); + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + let topic = prepare_test_for_kafka_log_store(&factory).await; + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + let request = add_tag1(); + engine + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + + let topic = topic.unwrap(); + let log_store = env.log_store().unwrap().into_kafka_log_store(); + let provider = Provider::kafka_provider(topic); + let stream = log_store.read(&provider, 0, None).await.unwrap(); + let entries = decode_stream(flatten_stream::(stream, provider.clone())) + .try_collect::>() + .await + .unwrap(); + + // Flush sst notification + assert_eq!(entries[1].1.mutations[0].op_type(), api::v1::OpType::Notify); + // Modify table metadata notification + assert_eq!(entries[2].1.mutations[0].op_type(), api::v1::OpType::Notify); +} + #[tokio::test] async fn test_alter_region() { common_telemetry::init_default_ut_logging(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index e19f95088c46..f60e79ce1bad 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -19,6 +19,12 @@ use api::v1::{ColumnSchema, Rows}; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datatypes::prelude::ScalarVector; use datatypes::vectors::TimestampMillisecondVector; +use futures::TryStreamExt; +use log_store::kafka::log_store::KafkaLogStore; +use rstest::rstest; +use rstest_reuse::{self, apply}; +use store_api::logstore::provider::Provider; +use store_api::logstore::LogStore; use store_api::region_engine::RegionEngine; use store_api::region_request::{ RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest, @@ -30,8 +36,12 @@ use crate::config::MitoConfig; use crate::engine::listener::CompactionListener; use crate::engine::MitoEngine; use crate::test_util::{ - build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv, + build_rows_for_key, column_metadata_to_column_schema, kafka_log_store_factory, + prepare_test_for_kafka_log_store, put_rows, single_kafka_log_store_factory, + CreateRequestBuilder, LogStoreFactory, TestEnv, }; +use crate::wal::entry_reader::decode_stream; +use crate::wal::raw_entry_reader::flatten_stream; async fn put_and_flush( engine: &MitoEngine, @@ -105,6 +115,65 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec { res } +#[apply(single_kafka_log_store_factory)] +async fn test_compaction_region_notification(factory: Option) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + + let mut env = TestEnv::new().with_log_store_factory(factory.clone()); + let engine = env.create_engine(MitoConfig::default()).await; + let topic = prepare_test_for_kafka_log_store(&factory).await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.max_active_window_runs", "1") + .insert_option("compaction.twcs.max_inactive_window_runs", "1") + .build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + // Flush 5 SSTs for compaction. + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + put_and_flush(&engine, region_id, &column_schemas, 20..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 15..30).await; + put_and_flush(&engine, region_id, &column_schemas, 15..25).await; + + let result = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let topic = topic.unwrap(); + let log_store = env.log_store().unwrap().into_kafka_log_store(); + let provider = Provider::kafka_provider(topic); + let stream = log_store.read(&provider, 0, None).await.unwrap(); + let entries = decode_stream(flatten_stream::(stream, provider.clone())) + .try_collect::>() + .await + .unwrap(); + + let notifications = entries + .into_iter() + .filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify)) + .count(); + assert_eq!(notifications, 6); +} + #[tokio::test] async fn test_compaction_region() { common_telemetry::init_default_ut_logging(); diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 51f2a976b343..f3c22357b395 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -16,7 +16,13 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use common_time::util::current_time_millis; +use futures::TryStreamExt; +use log_store::kafka::log_store::KafkaLogStore; use object_store::ObjectStore; +use rstest::rstest; +use rstest_reuse::{self, apply}; +use store_api::logstore::provider::Provider; +use store_api::logstore::LogStore; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; @@ -29,7 +35,69 @@ use crate::engine::MitoEngine; use crate::manifest::action::RegionEdit; use crate::region::MitoRegionRef; use crate::sst::file::{FileId, FileMeta}; -use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::test_util::{ + kafka_log_store_factory, prepare_test_for_kafka_log_store, single_kafka_log_store_factory, + CreateRequestBuilder, LogStoreFactory, TestEnv, +}; +use crate::wal::entry_reader::decode_stream; +use crate::wal::raw_entry_reader::flatten_stream; + +#[apply(single_kafka_log_store_factory)] +async fn test_edit_region_notification(factory: Option) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + + let mut env = TestEnv::new().with_log_store_factory(factory.clone()); + let engine = env.create_engine(MitoConfig::default()).await; + let topic = prepare_test_for_kafka_log_store(&factory).await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + let file_id = FileId::random(); + // Simulating the ingestion of an SST file. + env.get_object_store() + .unwrap() + .write( + &format!("{}/{}.parquet", region.region_dir(), file_id), + b"x".as_slice(), + ) + .await + .unwrap(); + let edit = RegionEdit { + files_to_add: vec![FileMeta { + region_id: region.region_id, + file_id, + level: 0, + ..Default::default() + }], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + engine.edit_region(region.region_id, edit).await.unwrap(); + let topic = topic.unwrap(); + let log_store = env.log_store().unwrap().into_kafka_log_store(); + let provider = Provider::kafka_provider(topic); + let stream = log_store.read(&provider, 0, None).await.unwrap(); + let entries = decode_stream(flatten_stream::(stream, provider.clone())) + .try_collect::>() + .await + .unwrap(); + let notifications = entries + .into_iter() + .filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify)) + .count(); + assert_eq!(notifications, 1); +} #[tokio::test] async fn test_edit_region_schedule_compaction() { diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index aac02db91ef2..c211eb7ea73f 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -24,6 +24,7 @@ use common_time::util::current_time_millis; use common_wal::options::WAL_OPTIONS_KEY; use rstest::rstest; use rstest_reuse::{self, apply}; +use store_api::logstore::LogStore; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -33,8 +34,8 @@ use crate::engine::listener::{FlushListener, StallListener}; use crate::test_util::{ build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows, - raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder, - LogStoreFactory, MockWriteBufferManager, TestEnv, + raft_engine_log_store_factory, reopen_region, rows_schema, single_kafka_log_store_factory, + CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv, }; use crate::time_provider::TimeProvider; use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS; @@ -305,6 +306,54 @@ async fn test_flush_reopen_region(factory: Option) { assert_eq!(5, version_data.committed_sequence); } +#[apply(single_kafka_log_store_factory)] +async fn test_flush_notification(factory: Option) { + use futures::TryStreamExt; + use log_store::kafka::log_store::KafkaLogStore; + use store_api::logstore::provider::Provider; + + use crate::wal::entry_reader::decode_stream; + use crate::wal::raw_entry_reader::flatten_stream; + + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + + let mut env = TestEnv::new().with_log_store_factory(factory.clone()); + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + let topic = prepare_test_for_kafka_log_store(&factory).await; + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + flush_region(&engine, region_id, None).await; + + let topic = topic.unwrap(); + let log_store = env.log_store().unwrap().into_kafka_log_store(); + let provider = Provider::kafka_provider(topic); + let stream = log_store.read(&provider, 0, None).await.unwrap(); + let entries = decode_stream(flatten_stream::(stream, provider.clone())) + .try_collect::>() + .await + .unwrap(); + let notifications = entries + .into_iter() + .filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify)) + .count(); + assert_eq!(notifications, 1); +} + #[derive(Debug)] pub(crate) struct MockTimeProvider { now: AtomicI64, diff --git a/src/mito2/src/manifest_notifier.rs b/src/mito2/src/manifest_notifier.rs index d860a83fdd78..89d016c07c37 100644 --- a/src/mito2/src/manifest_notifier.rs +++ b/src/mito2/src/manifest_notifier.rs @@ -70,7 +70,7 @@ impl ManifestNotifier { pub(crate) fn push_notification(&mut self, version: ManifestVersion) { self.wal_entry.mutations.push(Mutation { op_type: OpType::Notify.into(), - sequence: self.next_entry_id, + sequence: self.next_sequence, rows: None, manifest_notification: Some(api::v1::ManifestNotification { version }), }); @@ -99,6 +99,6 @@ impl ManifestNotifier { pub(crate) fn finish(&mut self) { self.version_control - .set_sequence_and_entry_id(self.next_sequence - 1, self.next_sequence - 1); + .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index da0451bc1aa8..315dfdf62bbc 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -112,6 +112,12 @@ pub(crate) fn kafka_log_store_factory() -> Option { #[tokio::test] pub(crate) fn multiple_log_store_factories(#[case] factory: Option) {} +#[template] +#[rstest] +#[case::with_kafka(kafka_log_store_factory())] +#[tokio::test] +pub(crate) fn single_kafka_log_store_factory(#[case] factory: Option) {} + #[derive(Clone)] pub(crate) struct RaftEngineLogStoreFactory; @@ -182,6 +188,15 @@ pub(crate) enum LogStoreImpl { Kafka(Arc), } +impl LogStoreImpl { + pub(crate) fn into_kafka_log_store(self) -> Arc { + match self { + LogStoreImpl::RaftEngine(_) => unreachable!(), + LogStoreImpl::Kafka(log_store) => log_store, + } + } +} + /// Env to test mito engine. pub struct TestEnv { /// Path to store data. @@ -248,6 +263,10 @@ impl TestEnv { self.object_store_manager.clone() } + pub(crate) fn log_store(&self) -> Option { + self.log_store.clone() + } + /// Creates a new engine with specific config under this env. pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index 848f0c523870..14dfab47be36 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -21,6 +21,7 @@ use snafu::{ensure, ResultExt}; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; +use super::raw_entry_reader::EntryStream; use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result}; use crate::wal::raw_entry_reader::RawEntryReader; use crate::wal::{EntryId, WalEntryStream}; @@ -64,31 +65,34 @@ impl LogStoreEntryReader { impl WalEntryReader for LogStoreEntryReader { fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result> { let LogStoreEntryReader { reader } = self; - let mut stream = reader.read(ns, start_id)?; - - let stream = stream! { - let mut buffered_entry = None; - while let Some(next_entry) = stream.next().await { - match buffered_entry.take() { - Some(entry) => { - yield decode_raw_entry(entry); - buffered_entry = Some(next_entry?); - }, - None => { - buffered_entry = Some(next_entry?); - } - }; - } - if let Some(entry) = buffered_entry { - // Ignores tail corrupted data. - if entry.is_complete() { + let stream = reader.read(ns, start_id)?; + + Ok(decode_stream(stream)) + } +} + +pub(crate) fn decode_stream(mut stream: EntryStream<'static>) -> WalEntryStream<'static> { + stream! { + let mut buffered_entry = None; + while let Some(next_entry) = stream.next().await { + match buffered_entry.take() { + Some(entry) => { yield decode_raw_entry(entry); + buffered_entry = Some(next_entry?); + }, + None => { + buffered_entry = Some(next_entry?); } + }; + } + if let Some(entry) = buffered_entry { + // Ignores tail corrupted data. + if entry.is_complete() { + yield decode_raw_entry(entry); } - }; - - Ok(Box::pin(stream)) + } } + .boxed() } #[cfg(test)] diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 85a0c945b9fd..2258ef99410d 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -17,12 +17,14 @@ use std::sync::Arc; use async_stream::try_stream; use common_error::ext::BoxedError; use futures::stream::BoxStream; +use futures::StreamExt; use snafu::ResultExt; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; +#[cfg(test)] +use store_api::logstore::SendableEntryStream; use store_api::logstore::{LogStore, WalIndex}; use store_api::storage::RegionId; -use tokio_stream::StreamExt; use crate::error::{self, Result}; use crate::wal::EntryId; @@ -123,6 +125,29 @@ where } } +#[cfg(test)] +pub(crate) fn flatten_stream( + mut stream: SendableEntryStream<'static, Entry, S::Error>, + provider: Provider, +) -> EntryStream<'static> { + let stream = try_stream!({ + while let Some(entries) = stream.next().await { + let entries = + entries + .map_err(BoxedError::new) + .with_context(|_| error::ReadWalSnafu { + provider: provider.clone(), + })?; + + for entry in entries { + yield entry + } + } + }); + + stream.boxed() +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index edc0feb07adc..be9290bf1ddd 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -14,6 +14,7 @@ use api::v1::region::compact_request; use common_telemetry::{error, info, warn}; +use store_api::logstore::LogStore; use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; @@ -23,7 +24,7 @@ use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Handles compaction request submitted to region worker. pub(crate) async fn handle_compaction_request( &mut self,