@@ -4,6 +4,10 @@ use super::OneOffCompactMessage;
44use super :: RebuildMessage ;
55use crate :: compactor:: types:: ScheduledCompactMessage ;
66use crate :: config:: CompactionServiceConfig ;
7+ use crate :: execution:: operators:: purge_dirty_log:: PurgeDirtyLog ;
8+ use crate :: execution:: operators:: purge_dirty_log:: PurgeDirtyLogError ;
9+ use crate :: execution:: operators:: purge_dirty_log:: PurgeDirtyLogInput ;
10+ use crate :: execution:: operators:: purge_dirty_log:: PurgeDirtyLogOutput ;
711use crate :: execution:: orchestration:: CompactOrchestrator ;
812use crate :: execution:: orchestration:: CompactionResponse ;
913use async_trait:: async_trait;
@@ -18,8 +22,10 @@ use chroma_memberlist::memberlist_provider::Memberlist;
1822use chroma_segment:: spann_provider:: SpannProvider ;
1923use chroma_storage:: Storage ;
2024use chroma_sysdb:: SysDb ;
25+ use chroma_system:: wrap;
2126use chroma_system:: Dispatcher ;
2227use chroma_system:: Orchestrator ;
28+ use chroma_system:: TaskResult ;
2329use chroma_system:: { Component , ComponentContext , ComponentHandle , Handler , System } ;
2430use chroma_types:: CollectionUuid ;
2531use futures:: stream:: FuturesUnordered ;
@@ -58,6 +64,7 @@ pub(crate) struct CompactionManager {
5864 max_compaction_size : usize ,
5965 max_partition_size : usize ,
6066 fetch_log_batch_size : u32 ,
67+ purge_dirty_log_timeout_seconds : u64 ,
6168 on_next_memberlist_signal : Option < oneshot:: Sender < ( ) > > ,
6269}
6370
@@ -92,6 +99,7 @@ impl CompactionManager {
9299 max_compaction_size : usize ,
93100 max_partition_size : usize ,
94101 fetch_log_batch_size : u32 ,
102+ purge_dirty_log_timeout_seconds : u64 ,
95103 ) -> Self {
96104 CompactionManager {
97105 system,
@@ -110,6 +118,7 @@ impl CompactionManager {
110118 max_partition_size,
111119 on_next_memberlist_signal : None ,
112120 fetch_log_batch_size,
121+ purge_dirty_log_timeout_seconds,
113122 }
114123 }
115124
@@ -187,7 +196,7 @@ impl CompactionManager {
187196 }
188197
189198 #[ instrument( name = "CompactionManager::rebuild_batch" ) ]
190- pub ( crate ) async fn rebuild_batch ( & mut self , collection_ids : Vec < CollectionUuid > ) {
199+ pub ( crate ) async fn rebuild_batch ( & mut self , collection_ids : & [ CollectionUuid ] ) {
191200 let _ = collection_ids
192201 . iter ( )
193202 . map ( |id| self . compact ( * id, true ) )
@@ -196,6 +205,41 @@ impl CompactionManager {
196205 . await ;
197206 }
198207
208+ #[ instrument( name = "CompactionManager::purge_dirty_log" , skip( ctx) ) ]
209+ pub ( crate ) async fn purge_dirty_log ( & mut self , ctx : & ComponentContext < Self > ) {
210+ let deleted_collection_uuids = self . scheduler . drain_deleted_collections ( ) ;
211+ if deleted_collection_uuids. is_empty ( ) {
212+ tracing:: info!( "Skipping purge dirty log because there is no deleted collections" ) ;
213+ return ;
214+ }
215+ let purge_dirty_log = PurgeDirtyLog {
216+ log_client : self . log . clone ( ) ,
217+ timeout : Duration :: from_secs ( self . purge_dirty_log_timeout_seconds ) ,
218+ } ;
219+ let purge_dirty_log_input = PurgeDirtyLogInput {
220+ collection_uuids : deleted_collection_uuids. clone ( ) ,
221+ } ;
222+ let purge_dirty_log_task = wrap (
223+ Box :: new ( purge_dirty_log) ,
224+ purge_dirty_log_input,
225+ ctx. receiver ( ) ,
226+ ) ;
227+ let Some ( mut dispatcher) = self . dispatcher . clone ( ) else {
228+ tracing:: error!( "Unable to create background task to purge dirty log: Dispatcher is not set for compaction manager" ) ;
229+ return ;
230+ } ;
231+ if let Err ( err) = dispatcher
232+ . send ( purge_dirty_log_task, Some ( Span :: current ( ) ) )
233+ . await
234+ {
235+ tracing:: error!( "Unable to create background task to purge dirty log: {err}" ) ;
236+ return ;
237+ } ;
238+ tracing:: info!(
239+ "Purging dirty logs for deleted collections: [{deleted_collection_uuids:?}]" ,
240+ ) ;
241+ }
242+
199243 pub ( crate ) fn set_dispatcher ( & mut self , dispatcher : ComponentHandle < Dispatcher > ) {
200244 self . dispatcher = Some ( dispatcher) ;
201245 }
@@ -240,6 +284,7 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
240284 let max_compaction_size = config. compactor . max_compaction_size ;
241285 let max_partition_size = config. compactor . max_partition_size ;
242286 let fetch_log_batch_size = config. compactor . fetch_log_batch_size ;
287+ let purge_dirty_log_timeout_seconds = config. compactor . purge_dirty_log_timeout_seconds ;
243288 let mut disabled_collections =
244289 HashSet :: with_capacity ( config. compactor . disabled_collections . len ( ) ) ;
245290 for collection_id_str in & config. compactor . disabled_collections {
@@ -298,6 +343,7 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
298343 max_compaction_size,
299344 max_partition_size,
300345 fetch_log_batch_size,
346+ purge_dirty_log_timeout_seconds,
301347 ) )
302348 }
303349}
@@ -342,6 +388,7 @@ impl Handler<ScheduledCompactMessage> for CompactionManager {
342388 ) {
343389 tracing:: info!( "CompactionManager: Performing scheduled compaction" ) ;
344390 let _ = self . compact_batch ( ) . await ;
391+ self . purge_dirty_log ( ctx) . await ;
345392
346393 // Compaction is done, schedule the next compaction
347394 ctx. scheduler . schedule (
@@ -382,7 +429,11 @@ impl Handler<RebuildMessage> for CompactionManager {
382429 "Rebuild started for collections: {:?}" ,
383430 message. collection_ids
384431 ) ;
385- self . rebuild_batch ( message. collection_ids ) . await ;
432+ self . rebuild_batch ( & message. collection_ids ) . await ;
433+ tracing:: info!(
434+ "Rebuild completed for collections: {:?}" ,
435+ message. collection_ids
436+ ) ;
386437 }
387438}
388439
@@ -400,6 +451,21 @@ impl Handler<Memberlist> for CompactionManager {
400451 }
401452}
402453
454+ #[ async_trait]
455+ impl Handler < TaskResult < PurgeDirtyLogOutput , PurgeDirtyLogError > > for CompactionManager {
456+ type Result = ( ) ;
457+
458+ async fn handle (
459+ & mut self ,
460+ message : TaskResult < PurgeDirtyLogOutput , PurgeDirtyLogError > ,
461+ _ctx : & ComponentContext < CompactionManager > ,
462+ ) {
463+ if let Err ( err) = message. into_inner ( ) {
464+ tracing:: error!( "Error when purging dirty log: {err}" ) ;
465+ }
466+ }
467+ }
468+
403469pub struct RegisterOnReadySignal {
404470 pub on_ready_tx : oneshot:: Sender < ( ) > ,
405471}
@@ -618,6 +684,7 @@ mod tests {
618684 let max_compaction_size = 1000 ;
619685 let max_partition_size = 1000 ;
620686 let fetch_log_batch_size = 100 ;
687+ let purge_dirty_log_timeout_seconds = 60 ;
621688
622689 // Set assignment policy
623690 let mut assignment_policy = Box :: new ( RendezvousHashingAssignmentPolicy :: default ( ) ) ;
@@ -682,6 +749,7 @@ mod tests {
682749 max_compaction_size,
683750 max_partition_size,
684751 fetch_log_batch_size,
752+ purge_dirty_log_timeout_seconds,
685753 ) ;
686754
687755 let dispatcher = Dispatcher :: new ( DispatcherConfig {
0 commit comments