diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index a5b8ab0fcc65..0da472629751 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -143,7 +143,7 @@ async fn test_read_version_basic() { .rev() .collect::>(); - let dummy_sst = StagingSstableInfo::new( + let dummy_sst = Arc::new(StagingSstableInfo::new( vec![ LocalSstableInfo::for_test(SstableInfo { object_id: 1, @@ -181,7 +181,7 @@ async fn test_read_version_basic() { epoch_id_vec_for_clear, batch_id_vec_for_clear, 1, - ); + )); { read_version.update(VersionUpdate::Staging(StagingData::Sst(dummy_sst))); @@ -368,7 +368,7 @@ async fn test_read_filter_basic() { // // Update read version via staging SSTs // let sst_id = 233; -// let staging_sst = gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch); +// let staging_sst = Arc::new(gen_dummy_sst_info(sst_id, imms.clone(), table_id, epoch)); // read_version_vec.iter().for_each(|v| { // v.write().update(VersionUpdate::Staging(StagingData::Sst( // StagingSstableInfo::new( diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index b18f698689c8..cbbc3f656e8e 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -392,9 +392,10 @@ impl HummockEventHandler { // older data first .rev() .for_each(|staging_sstable_info| { + let staging_sstable_info_ref = Arc::new(staging_sstable_info); self.for_each_read_version(|read_version| { read_version.update(VersionUpdate::Staging(StagingData::Sst( - staging_sstable_info.clone(), + staging_sstable_info_ref.clone(), ))) }); }); @@ -438,6 +439,7 @@ impl HummockEventHandler { fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) { // todo: do some prune for version update + let staging_sstable_info = Arc::new(staging_sstable_info); self.for_each_read_version(|read_version| { trace!("data_spilled. SST size {}", staging_sstable_info.imm_size()); read_version.update(VersionUpdate::Staging(StagingData::Sst( diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index b0e1f5911fea..892671f9364d 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -116,7 +116,7 @@ impl StagingSstableInfo { pub enum StagingData { ImmMem(ImmutableMemtable), MergedImmMem(ImmutableMemtable, Vec), - Sst(StagingSstableInfo), + Sst(Arc), } pub enum VersionUpdate { @@ -139,7 +139,7 @@ pub struct StagingVersion { pub imm: VecDeque, // newer data comes first - pub sst: VecDeque, + pub sst: VecDeque>, } impl StagingVersion { @@ -285,7 +285,7 @@ impl HummockReadVersion { StagingData::MergedImmMem(merged_imm, imm_ids) => { self.add_merged_imm(merged_imm, imm_ids); } - StagingData::Sst(staging_sst) => { + StagingData::Sst(staging_sst_ref) => { // The following properties must be ensured: // 1) self.staging.imm is sorted by imm id descendingly // 2) staging_sst.imm_ids preserves the imm id partial @@ -308,7 +308,7 @@ impl HummockReadVersion { self.staging.imm.iter().map(|imm| imm.batch_id()).collect(); // intersected batch_id order from oldest to newest - let intersect_imm_ids = staging_sst + let intersect_imm_ids = staging_sst_ref .imm_ids .iter() .rev() @@ -343,15 +343,15 @@ impl HummockReadVersion { staging_sst.epochs {:?}, local_imm_ids {:?}, intersect_imm_ids {:?}", - staging_sst.imm_size, - staging_sst.imm_ids, - staging_sst.epochs, + staging_sst_ref.imm_size, + staging_sst_ref.imm_ids, + staging_sst_ref.epochs, local_imm_ids, intersect_imm_ids, ); } } - self.staging.sst.push_front(staging_sst); + self.staging.sst.push_front(staging_sst_ref); } } },