Skip to content

Commit d996d65

Browse files
committed
[ENH]: Garbage collection for soft deleted attached functions
1 parent 47287c4 commit d996d65

File tree

8 files changed

+358
-0
lines changed

8 files changed

+358
-0
lines changed

go/pkg/sysdb/coordinator/task.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,3 +820,75 @@ func (s *Coordinator) CleanupExpiredPartialAttachedFunctions(ctx context.Context
820820
CleanedUpIds: cleanedAttachedFunctionIDStrings,
821821
}, nil
822822
}
823+
824+
// GetSoftDeletedAttachedFunctions retrieves attached functions that are soft deleted and were updated before the cutoff time
825+
func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
826+
log := log.With(zap.String("method", "GetSoftDeletedAttachedFunctions"))
827+
828+
if req.CutoffTime == nil {
829+
log.Error("GetSoftDeletedAttachedFunctions: cutoff_time is required")
830+
return nil, status.Errorf(codes.InvalidArgument, "cutoff_time is required")
831+
}
832+
833+
if req.Limit <= 0 {
834+
log.Error("GetSoftDeletedAttachedFunctions: limit must be greater than 0")
835+
return nil, status.Errorf(codes.InvalidArgument, "limit must be greater than 0")
836+
}
837+
838+
cutoffTime := req.CutoffTime.AsTime()
839+
attachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(ctx).GetSoftDeletedAttachedFunctions(cutoffTime, req.Limit)
840+
if err != nil {
841+
log.Error("GetSoftDeletedAttachedFunctions: failed to get soft deleted attached functions", zap.Error(err))
842+
return nil, err
843+
}
844+
845+
// Convert to proto response
846+
protoAttachedFunctions := make([]*coordinatorpb.AttachedFunction, len(attachedFunctions))
847+
for i, af := range attachedFunctions {
848+
protoAttachedFunctions[i] = &coordinatorpb.AttachedFunction{
849+
Id: af.ID.String(),
850+
Name: af.Name,
851+
InputCollectionId: af.InputCollectionID,
852+
OutputCollectionName: af.OutputCollectionName,
853+
CompletionOffset: uint64(af.CompletionOffset),
854+
MinRecordsForInvocation: uint64(af.MinRecordsForInvocation),
855+
CreatedAt: uint64(af.CreatedAt.Unix()),
856+
UpdatedAt: uint64(af.UpdatedAt.Unix()),
857+
}
858+
859+
protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.Unix())
860+
if af.OutputCollectionID != nil {
861+
protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID)
862+
}
863+
}
864+
865+
log.Info("GetSoftDeletedAttachedFunctions: completed successfully",
866+
zap.Int("count", len(attachedFunctions)))
867+
868+
return &coordinatorpb.GetSoftDeletedAttachedFunctionsResponse{
869+
AttachedFunctions: protoAttachedFunctions,
870+
}, nil
871+
}
872+
873+
// FinishAttachedFunctionDeletion permanently deletes an attached function from the database (hard delete)
874+
// This should only be called after the soft delete grace period has passed
875+
func (s *Coordinator) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
876+
log := log.With(zap.String("method", "FinishAttachedFunctionDeletion"))
877+
878+
attachedFunctionID, err := uuid.Parse(req.Id)
879+
if err != nil {
880+
log.Error("FinishAttachedFunctionDeletion: invalid attached_function_id", zap.Error(err))
881+
return nil, status.Errorf(codes.InvalidArgument, "invalid attached_function_id: %v", err)
882+
}
883+
884+
err = s.catalog.metaDomain.AttachedFunctionDb(ctx).HardDeleteAttachedFunction(attachedFunctionID)
885+
if err != nil {
886+
log.Error("FinishAttachedFunctionDeletion: failed to hard delete attached function", zap.Error(err))
887+
return nil, err
888+
}
889+
890+
log.Info("FinishAttachedFunctionDeletion: completed successfully",
891+
zap.String("attached_function_id", attachedFunctionID.String()))
892+
893+
return &coordinatorpb.FinishAttachedFunctionDeletionResponse{}, nil
894+
}

go/pkg/sysdb/grpc/task_service.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,29 @@ func (s *Server) CleanupExpiredPartialAttachedFunctions(ctx context.Context, req
139139
log.Info("CleanupExpiredPartialAttachedFunctions succeeded", zap.Uint64("cleaned_up_count", res.CleanedUpCount))
140140
return res, nil
141141
}
142+
143+
func (s *Server) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
144+
log.Info("GetSoftDeletedAttachedFunctions", zap.Time("cutoff_time", req.CutoffTime.AsTime()), zap.Int32("limit", req.Limit))
145+
146+
res, err := s.coordinator.GetSoftDeletedAttachedFunctions(ctx, req)
147+
if err != nil {
148+
log.Error("GetSoftDeletedAttachedFunctions failed", zap.Error(err))
149+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
150+
}
151+
152+
log.Info("GetSoftDeletedAttachedFunctions succeeded", zap.Int("count", len(res.AttachedFunctions)))
153+
return res, nil
154+
}
155+
156+
func (s *Server) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
157+
log.Info("FinishAttachedFunctionDeletion", zap.String("id", req.Id))
158+
159+
res, err := s.coordinator.FinishAttachedFunctionDeletion(ctx, req)
160+
if err != nil {
161+
log.Error("FinishAttachedFunctionDeletion failed", zap.Error(err))
162+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
163+
}
164+
165+
log.Info("FinishAttachedFunctionDeletion succeeded", zap.String("id", req.Id))
166+
return res, nil
167+
}

go/pkg/sysdb/metastore/db/dao/task.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,51 @@ func (s *attachedFunctionDb) CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid
403403

404404
return ids, nil
405405
}
406+
407+
// GetSoftDeletedAttachedFunctions returns attached functions that are soft deleted
408+
// and were updated before the cutoff time (eligible for hard deletion)
409+
func (s *attachedFunctionDb) GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*dbmodel.AttachedFunction, error) {
410+
var attachedFunctions []*dbmodel.AttachedFunction
411+
err := s.db.
412+
Where("is_deleted = ?", true).
413+
Where("updated_at < ?", cutoffTime).
414+
Limit(int(limit)).
415+
Find(&attachedFunctions).Error
416+
417+
if err != nil {
418+
log.Error("GetSoftDeletedAttachedFunctions failed",
419+
zap.Error(err),
420+
zap.Time("cutoff_time", cutoffTime))
421+
return nil, err
422+
}
423+
424+
log.Debug("GetSoftDeletedAttachedFunctions found attached functions",
425+
zap.Int("count", len(attachedFunctions)),
426+
zap.Time("cutoff_time", cutoffTime))
427+
428+
return attachedFunctions, nil
429+
}
430+
431+
// HardDeleteAttachedFunction permanently deletes an attached function from the database
432+
// This should only be called after the soft delete grace period has passed
433+
func (s *attachedFunctionDb) HardDeleteAttachedFunction(id uuid.UUID) error {
434+
result := s.db.Unscoped().Delete(&dbmodel.AttachedFunction{}, "id = ?", id)
435+
436+
if result.Error != nil {
437+
log.Error("HardDeleteAttachedFunction failed",
438+
zap.Error(result.Error),
439+
zap.String("id", id.String()))
440+
return result.Error
441+
}
442+
443+
if result.RowsAffected == 0 {
444+
log.Warn("HardDeleteAttachedFunction: no rows affected (attached function not found)",
445+
zap.String("id", id.String()))
446+
return nil // Idempotent - no error if not found
447+
}
448+
449+
log.Info("HardDeleteAttachedFunction succeeded",
450+
zap.String("id", id.String()))
451+
452+
return nil
453+
}

go/pkg/sysdb/metastore/db/dbmodel/task.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,6 @@ type IAttachedFunctionDb interface {
5858
PeekScheduleByCollectionId(collectionIDs []string) ([]*AttachedFunction, error)
5959
GetMinCompletionOffsetForCollection(inputCollectionID string) (*int64, error)
6060
CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid.UUID, error)
61+
GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*AttachedFunction, error)
62+
HardDeleteAttachedFunction(id uuid.UUID) error
6163
}

idl/chromadb/proto/coordinator.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,21 @@ message PeekScheduleByCollectionIdResponse {
682682
repeated ScheduleEntry schedule = 1;
683683
}
684684

685+
message GetSoftDeletedAttachedFunctionsRequest {
686+
google.protobuf.Timestamp cutoff_time = 1;
687+
int32 limit = 2;
688+
}
689+
690+
message GetSoftDeletedAttachedFunctionsResponse {
691+
repeated AttachedFunction attached_functions = 1;
692+
}
693+
694+
message FinishAttachedFunctionDeletionRequest {
695+
string id = 1;
696+
}
697+
698+
message FinishAttachedFunctionDeletionResponse {}
699+
685700
service SysDB {
686701
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
687702
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
@@ -730,4 +745,6 @@ service SysDB {
730745
rpc CleanupExpiredPartialAttachedFunctions(CleanupExpiredPartialAttachedFunctionsRequest) returns (CleanupExpiredPartialAttachedFunctionsResponse) {}
731746
rpc GetFunctions(GetFunctionsRequest) returns (GetFunctionsResponse) {}
732747
rpc PeekScheduleByCollectionId(PeekScheduleByCollectionIdRequest) returns (PeekScheduleByCollectionIdResponse) {}
748+
rpc GetSoftDeletedAttachedFunctions(GetSoftDeletedAttachedFunctionsRequest) returns (GetSoftDeletedAttachedFunctionsResponse) {}
749+
rpc FinishAttachedFunctionDeletion(FinishAttachedFunctionDeletionRequest) returns (FinishAttachedFunctionDeletionResponse) {}
733750
}

rust/garbage_collector/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ pub struct GarbageCollectorConfig {
2727
default = "GarbageCollectorConfig::default_collection_soft_delete_grace_period"
2828
)]
2929
pub(super) collection_soft_delete_grace_period: Duration,
30+
#[serde(
31+
rename = "attached_function_soft_delete_grace_period_seconds",
32+
deserialize_with = "deserialize_duration_from_seconds",
33+
default = "GarbageCollectorConfig::default_attached_function_soft_delete_grace_period"
34+
)]
35+
pub(super) attached_function_soft_delete_grace_period: Duration,
3036
#[serde(
3137
rename = "version_relative_cutoff_time_seconds",
3238
alias = "relative_cutoff_time_seconds",
@@ -119,6 +125,10 @@ impl GarbageCollectorConfig {
119125
Duration::from_secs(60 * 60 * 24) // 1 day
120126
}
121127

128+
fn default_attached_function_soft_delete_grace_period() -> Duration {
129+
Duration::from_secs(60 * 60 * 24) // 1 day
130+
}
131+
122132
fn enable_log_gc_for_tenant_threshold() -> String {
123133
"00000000-0000-0000-0000-000000000000".to_string()
124134
}

rust/garbage_collector/src/garbage_collector_component.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,61 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
390390
self.config.collection_soft_delete_grace_period
391391
);
392392

393+
let attached_function_soft_delete_absolute_cutoff_time =
394+
now - self.config.attached_function_soft_delete_grace_period;
395+
tracing::debug!(
396+
"Using absolute cutoff time: {:?} for soft deleted attached functions (grace period: {:?})",
397+
attached_function_soft_delete_absolute_cutoff_time,
398+
self.config.attached_function_soft_delete_grace_period
399+
);
400+
401+
// Garbage collect soft-deleted attached functions that are past the grace period
402+
tracing::info!("Checking for soft-deleted attached functions to hard delete");
403+
match self
404+
.sysdb_client
405+
.get_soft_deleted_attached_functions(
406+
attached_function_soft_delete_absolute_cutoff_time,
407+
100, // Limit to 100 at a time
408+
)
409+
.await
410+
{
411+
Ok(attached_functions_to_delete) => {
412+
if !attached_functions_to_delete.is_empty() {
413+
tracing::info!(
414+
"Found {} soft-deleted attached functions to hard delete",
415+
attached_functions_to_delete.len()
416+
);
417+
418+
for attached_function_id in attached_functions_to_delete {
419+
match self
420+
.sysdb_client
421+
.finish_attached_function_deletion(attached_function_id)
422+
.await
423+
{
424+
Ok(_) => {
425+
tracing::info!(
426+
"Successfully hard deleted attached function: {}",
427+
attached_function_id
428+
);
429+
}
430+
Err(e) => {
431+
tracing::error!(
432+
"Failed to hard delete attached function {}: {}",
433+
attached_function_id,
434+
e
435+
);
436+
}
437+
}
438+
}
439+
} else {
440+
tracing::debug!("No soft-deleted attached functions found to hard delete");
441+
}
442+
}
443+
Err(e) => {
444+
tracing::error!("Failed to get soft-deleted attached functions: {}", e);
445+
}
446+
}
447+
393448
// Get all collections to gc and create gc orchestrator for each.
394449
tracing::info!("Getting collections to gc");
395450
let collections_to_gc = self
@@ -840,6 +895,7 @@ mod tests {
840895
}],
841896
version_cutoff_time: Duration::from_secs(1),
842897
collection_soft_delete_grace_period: Duration::from_secs(1),
898+
attached_function_soft_delete_grace_period: Duration::from_secs(1),
843899
max_collections_to_gc: 100,
844900
max_collections_to_fetch: None,
845901
min_versions_to_keep: 2,
@@ -1045,6 +1101,7 @@ mod tests {
10451101
otel_endpoint: "none".to_string(),
10461102
version_cutoff_time: Duration::from_secs(1),
10471103
collection_soft_delete_grace_period: Duration::from_secs(1),
1104+
attached_function_soft_delete_grace_period: Duration::from_secs(1),
10481105
max_collections_to_gc: 100,
10491106
min_versions_to_keep: 2,
10501107
filter_min_versions_if_alive: None,

0 commit comments

Comments
 (0)