diff --git a/Cargo.lock b/Cargo.lock index a8c544b99e5..86088bf4f1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3500,6 +3500,8 @@ dependencies = [ "proptest-state-machine", "prost 0.13.5", "rand 0.8.5", + "s3heap", + "s3heap-service", "serde", "serde_json", "sqlx", diff --git a/go/pkg/sysdb/coordinator/create_task_test.go b/go/pkg/sysdb/coordinator/create_task_test.go index 2fcfef94cb2..2b187f5872d 100644 --- a/go/pkg/sysdb/coordinator/create_task_test.go +++ b/go/pkg/sysdb/coordinator/create_task_test.go @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" ) // testMinimalUUIDv7 is the test's copy of minimalUUIDv7 from task.go @@ -663,3 +664,85 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param func TestAttachFunctionTestSuite(t *testing.T) { suite.Run(t, new(AttachFunctionTestSuite)) } + +// TestGetSoftDeletedAttachedFunctions_TimestampConsistency verifies that timestamps +// are returned in microseconds (UnixMicro) to match other API methods +func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) { + ctx := context.Background() + + // Create test timestamps with known values + testTime := time.Date(2025, 10, 30, 12, 0, 0, 123456000, time.UTC) // 123.456 milliseconds + expectedMicros := uint64(testTime.UnixMicro()) + + // Create mock coordinator with minimal setup + mockMetaDomain := &dbmodel_mocks.IMetaDomain{} + mockAttachedFunctionDb := &dbmodel_mocks.IAttachedFunctionDb{} + mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(mockAttachedFunctionDb) + + // Mock the database response with our test timestamps + attachedFunctions := []*dbmodel.AttachedFunction{ + { + ID: uuid.New(), + Name: "test_function", + InputCollectionID: "collection_123", + OutputCollectionName: "output_collection", + CompletionOffset: 100, + MinRecordsForInvocation: 10, + CreatedAt: testTime, + UpdatedAt: testTime, + NextRun: testTime, + }, + } + + mockAttachedFunctionDb.On("GetSoftDeletedAttachedFunctions", mock.Anything, mock.Anything). + Return(attachedFunctions, nil) + + coordinator := &Coordinator{ + catalog: Catalog{ + metaDomain: mockMetaDomain, + }, + } + + // Call GetSoftDeletedAttachedFunctions + cutoffTime := timestamppb.New(testTime.Add(-24 * time.Hour)) + resp, err := coordinator.GetSoftDeletedAttachedFunctions(ctx, &coordinatorpb.GetSoftDeletedAttachedFunctionsRequest{ + CutoffTime: cutoffTime, + Limit: 100, + }) + + // Verify response + if err != nil { + t.Fatalf("GetSoftDeletedAttachedFunctions failed: %v", err) + } + if len(resp.AttachedFunctions) != 1 { + t.Fatalf("Expected 1 attached function, got %d", len(resp.AttachedFunctions)) + } + + af := resp.AttachedFunctions[0] + + // Verify timestamps are in microseconds (not seconds) + if af.CreatedAt != expectedMicros { + t.Errorf("CreatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.CreatedAt) + } + if af.UpdatedAt != expectedMicros { + t.Errorf("UpdatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.UpdatedAt) + } + if af.NextRunAt != expectedMicros { + t.Errorf("NextRunAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.NextRunAt) + } + + // Verify these are NOT in seconds (would be ~1000x smaller) + expectedSeconds := uint64(testTime.Unix()) + if af.CreatedAt == expectedSeconds { + t.Error("CreatedAt appears to be in seconds instead of microseconds") + } + if af.UpdatedAt == expectedSeconds { + t.Error("UpdatedAt appears to be in seconds instead of microseconds") + } + if af.NextRunAt == expectedSeconds { + t.Error("NextRunAt appears to be in seconds instead of microseconds") + } + + mockMetaDomain.AssertExpectations(t) + mockAttachedFunctionDb.AssertExpectations(t) +} diff --git a/go/pkg/sysdb/coordinator/task.go b/go/pkg/sysdb/coordinator/task.go index aee0e2c426a..b821d1699fa 100644 --- a/go/pkg/sysdb/coordinator/task.go +++ b/go/pkg/sysdb/coordinator/task.go @@ -822,3 +822,75 @@ func (s *Coordinator) CleanupExpiredPartialAttachedFunctions(ctx context.Context CleanedUpIds: cleanedAttachedFunctionIDStrings, }, nil } + +// GetSoftDeletedAttachedFunctions retrieves attached functions that are soft deleted and were updated before the cutoff time +func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) { + log := log.With(zap.String("method", "GetSoftDeletedAttachedFunctions")) + + if req.CutoffTime == nil { + log.Error("GetSoftDeletedAttachedFunctions: cutoff_time is required") + return nil, status.Errorf(codes.InvalidArgument, "cutoff_time is required") + } + + if req.Limit <= 0 { + log.Error("GetSoftDeletedAttachedFunctions: limit must be greater than 0") + return nil, status.Errorf(codes.InvalidArgument, "limit must be greater than 0") + } + + cutoffTime := req.CutoffTime.AsTime() + attachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(ctx).GetSoftDeletedAttachedFunctions(cutoffTime, req.Limit) + if err != nil { + log.Error("GetSoftDeletedAttachedFunctions: failed to get soft deleted attached functions", zap.Error(err)) + return nil, err + } + + // Convert to proto response + protoAttachedFunctions := make([]*coordinatorpb.AttachedFunction, len(attachedFunctions)) + for i, af := range attachedFunctions { + protoAttachedFunctions[i] = &coordinatorpb.AttachedFunction{ + Id: af.ID.String(), + Name: af.Name, + InputCollectionId: af.InputCollectionID, + OutputCollectionName: af.OutputCollectionName, + CompletionOffset: uint64(af.CompletionOffset), + MinRecordsForInvocation: uint64(af.MinRecordsForInvocation), + CreatedAt: uint64(af.CreatedAt.UnixMicro()), + UpdatedAt: uint64(af.UpdatedAt.UnixMicro()), + } + + protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.UnixMicro()) + if af.OutputCollectionID != nil { + protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID) + } + } + + log.Info("GetSoftDeletedAttachedFunctions: completed successfully", + zap.Int("count", len(attachedFunctions))) + + return &coordinatorpb.GetSoftDeletedAttachedFunctionsResponse{ + AttachedFunctions: protoAttachedFunctions, + }, nil +} + +// FinishAttachedFunctionDeletion permanently deletes an attached function from the database (hard delete) +// This should only be called after the soft delete grace period has passed +func (s *Coordinator) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) { + log := log.With(zap.String("method", "FinishAttachedFunctionDeletion")) + + attachedFunctionID, err := uuid.Parse(req.AttachedFunctionId) + if err != nil { + log.Error("FinishAttachedFunctionDeletion: invalid attached_function_id", zap.Error(err)) + return nil, status.Errorf(codes.InvalidArgument, "invalid attached_function_id: %v", err) + } + + err = s.catalog.metaDomain.AttachedFunctionDb(ctx).HardDeleteAttachedFunction(attachedFunctionID) + if err != nil { + log.Error("FinishAttachedFunctionDeletion: failed to hard delete attached function", zap.Error(err)) + return nil, err + } + + log.Info("FinishAttachedFunctionDeletion: completed successfully", + zap.String("attached_function_id", attachedFunctionID.String())) + + return &coordinatorpb.FinishAttachedFunctionDeletionResponse{}, nil +} diff --git a/go/pkg/sysdb/grpc/task_service.go b/go/pkg/sysdb/grpc/task_service.go index 5f6e1656c76..c13e67a124e 100644 --- a/go/pkg/sysdb/grpc/task_service.go +++ b/go/pkg/sysdb/grpc/task_service.go @@ -139,3 +139,29 @@ func (s *Server) CleanupExpiredPartialAttachedFunctions(ctx context.Context, req log.Info("CleanupExpiredPartialAttachedFunctions succeeded", zap.Uint64("cleaned_up_count", res.CleanedUpCount)) return res, nil } + +func (s *Server) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) { + log.Info("GetSoftDeletedAttachedFunctions", zap.Time("cutoff_time", req.CutoffTime.AsTime()), zap.Int32("limit", req.Limit)) + + res, err := s.coordinator.GetSoftDeletedAttachedFunctions(ctx, req) + if err != nil { + log.Error("GetSoftDeletedAttachedFunctions failed", zap.Error(err)) + return nil, grpcutils.BuildInternalGrpcError(err.Error()) + } + + log.Info("GetSoftDeletedAttachedFunctions succeeded", zap.Int("count", len(res.AttachedFunctions))) + return res, nil +} + +func (s *Server) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) { + log.Info("FinishAttachedFunctionDeletion", zap.String("id", req.AttachedFunctionId)) + + res, err := s.coordinator.FinishAttachedFunctionDeletion(ctx, req) + if err != nil { + log.Error("FinishAttachedFunctionDeletion failed", zap.Error(err)) + return nil, grpcutils.BuildInternalGrpcError(err.Error()) + } + + log.Info("FinishAttachedFunctionDeletion succeeded", zap.String("id", req.AttachedFunctionId)) + return res, nil +} diff --git a/go/pkg/sysdb/metastore/db/dao/task.go b/go/pkg/sysdb/metastore/db/dao/task.go index da0fef8b769..6eec73b7bec 100644 --- a/go/pkg/sysdb/metastore/db/dao/task.go +++ b/go/pkg/sysdb/metastore/db/dao/task.go @@ -403,3 +403,51 @@ func (s *attachedFunctionDb) CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid return ids, nil } + +// GetSoftDeletedAttachedFunctions returns attached functions that are soft deleted +// and were updated before the cutoff time (eligible for hard deletion) +func (s *attachedFunctionDb) GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*dbmodel.AttachedFunction, error) { + var attachedFunctions []*dbmodel.AttachedFunction + err := s.db. + Where("is_deleted = ?", true). + Where("updated_at < ?", cutoffTime). + Limit(int(limit)). + Find(&attachedFunctions).Error + + if err != nil { + log.Error("GetSoftDeletedAttachedFunctions failed", + zap.Error(err), + zap.Time("cutoff_time", cutoffTime)) + return nil, err + } + + log.Debug("GetSoftDeletedAttachedFunctions found attached functions", + zap.Int("count", len(attachedFunctions)), + zap.Time("cutoff_time", cutoffTime)) + + return attachedFunctions, nil +} + +// HardDeleteAttachedFunction permanently deletes an attached function from the database +// This should only be called after the soft delete grace period has passed +func (s *attachedFunctionDb) HardDeleteAttachedFunction(id uuid.UUID) error { + result := s.db.Unscoped().Delete(&dbmodel.AttachedFunction{}, "id = ? AND is_deleted = true", id) + + if result.Error != nil { + log.Error("HardDeleteAttachedFunction failed", + zap.Error(result.Error), + zap.String("id", id.String())) + return result.Error + } + + if result.RowsAffected == 0 { + log.Warn("HardDeleteAttachedFunction: no rows affected (attached function not found)", + zap.String("id", id.String())) + return nil // Idempotent - no error if not found + } + + log.Info("HardDeleteAttachedFunction succeeded", + zap.String("id", id.String())) + + return nil +} diff --git a/go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go b/go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go index c2e726c4627..51783a00445 100644 --- a/go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go +++ b/go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go @@ -3,6 +3,8 @@ package mocks import ( + "time" + dbmodel "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel" mock "github.com/stretchr/testify/mock" @@ -368,6 +370,54 @@ func (_m *IAttachedFunctionDb) CleanupExpiredPartialAttachedFunctions(maxAgeSeco return r0, r1 } +// GetSoftDeletedAttachedFunctions provides a mock function with given fields: cutoffTime, limit +func (_m *IAttachedFunctionDb) GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*dbmodel.AttachedFunction, error) { + ret := _m.Called(cutoffTime, limit) + + if len(ret) == 0 { + panic("no return value specified for GetSoftDeletedAttachedFunctions") + } + + var r0 []*dbmodel.AttachedFunction + var r1 error + if rf, ok := ret.Get(0).(func(time.Time, int32) ([]*dbmodel.AttachedFunction, error)); ok { + return rf(cutoffTime, limit) + } + if rf, ok := ret.Get(0).(func(time.Time, int32) []*dbmodel.AttachedFunction); ok { + r0 = rf(cutoffTime, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*dbmodel.AttachedFunction) + } + } + + if rf, ok := ret.Get(1).(func(time.Time, int32) error); ok { + r1 = rf(cutoffTime, limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HardDeleteAttachedFunction provides a mock function with given fields: id +func (_m *IAttachedFunctionDb) HardDeleteAttachedFunction(id uuid.UUID) error { + ret := _m.Called(id) + + if len(ret) == 0 { + panic("no return value specified for HardDeleteAttachedFunction") + } + + var r0 error + if rf, ok := ret.Get(0).(func(uuid.UUID) error); ok { + r0 = rf(id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewIAttachedFunctionDb creates a new instance of IAttachedFunctionDb. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewIAttachedFunctionDb(t interface { diff --git a/go/pkg/sysdb/metastore/db/dbmodel/task.go b/go/pkg/sysdb/metastore/db/dbmodel/task.go index 455d41e7f88..81474b7ee8a 100644 --- a/go/pkg/sysdb/metastore/db/dbmodel/task.go +++ b/go/pkg/sysdb/metastore/db/dbmodel/task.go @@ -58,4 +58,6 @@ type IAttachedFunctionDb interface { PeekScheduleByCollectionId(collectionIDs []string) ([]*AttachedFunction, error) GetMinCompletionOffsetForCollection(inputCollectionID string) (*int64, error) CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid.UUID, error) + GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*AttachedFunction, error) + HardDeleteAttachedFunction(id uuid.UUID) error } diff --git a/idl/chromadb/proto/coordinator.proto b/idl/chromadb/proto/coordinator.proto index 456e3cbf8e5..b1f9fd0b45d 100644 --- a/idl/chromadb/proto/coordinator.proto +++ b/idl/chromadb/proto/coordinator.proto @@ -683,6 +683,21 @@ message PeekScheduleByCollectionIdResponse { repeated ScheduleEntry schedule = 1; } +message GetSoftDeletedAttachedFunctionsRequest { + google.protobuf.Timestamp cutoff_time = 1; + int32 limit = 2; +} + +message GetSoftDeletedAttachedFunctionsResponse { + repeated AttachedFunction attached_functions = 1; +} + +message FinishAttachedFunctionDeletionRequest { + string attached_function_id = 1; +} + +message FinishAttachedFunctionDeletionResponse {} + service SysDB { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {} rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {} @@ -731,4 +746,6 @@ service SysDB { rpc CleanupExpiredPartialAttachedFunctions(CleanupExpiredPartialAttachedFunctionsRequest) returns (CleanupExpiredPartialAttachedFunctionsResponse) {} rpc GetFunctions(GetFunctionsRequest) returns (GetFunctionsResponse) {} rpc PeekScheduleByCollectionId(PeekScheduleByCollectionIdRequest) returns (PeekScheduleByCollectionIdResponse) {} + rpc GetSoftDeletedAttachedFunctions(GetSoftDeletedAttachedFunctionsRequest) returns (GetSoftDeletedAttachedFunctionsResponse) {} + rpc FinishAttachedFunctionDeletion(FinishAttachedFunctionDeletionRequest) returns (FinishAttachedFunctionDeletionResponse) {} } diff --git a/rust/frontend/src/impls/service_based_frontend.rs b/rust/frontend/src/impls/service_based_frontend.rs index 21315f31e67..8372091da1d 100644 --- a/rust/frontend/src/impls/service_based_frontend.rs +++ b/rust/frontend/src/impls/service_based_frontend.rs @@ -1974,6 +1974,11 @@ impl ServiceBasedFrontend { chroma_sysdb::DeleteAttachedFunctionError::FailedToDeleteAttachedFunction(s) => { DetachFunctionError::Internal(Box::new(chroma_error::TonicError(s))) } + chroma_sysdb::DeleteAttachedFunctionError::NotImplemented => { + DetachFunctionError::Internal(Box::new(chroma_error::TonicError( + tonic::Status::unimplemented("Not implemented"), + ))) + } })?; Ok(DetachFunctionResponse { success: true }) diff --git a/rust/garbage_collector/Cargo.toml b/rust/garbage_collector/Cargo.toml index 4a3ef5a7bb5..ee73fbaa090 100644 --- a/rust/garbage_collector/Cargo.toml +++ b/rust/garbage_collector/Cargo.toml @@ -55,6 +55,8 @@ chroma-index = { workspace = true } chroma-memberlist = { workspace = true } chroma-tracing = { workspace = true } chroma-jemalloc-pprof-server = { workspace = true } +s3heap = { workspace = true } +s3heap-service = { workspace = true } wal3 = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/rust/garbage_collector/garbage_collector_config.yaml b/rust/garbage_collector/garbage_collector_config.yaml index b5cf5236794..5847f5b03db 100644 --- a/rust/garbage_collector/garbage_collector_config.yaml +++ b/rust/garbage_collector/garbage_collector_config.yaml @@ -5,6 +5,7 @@ otel_filters: filter_level: "debug" relative_cutoff_time_seconds: 43200 # GC all versions created at time < now() - relative_cutoff_time_seconds (12 hours) max_collections_to_gc: 1000 # Maximum number of collections to GC in one run +max_attached_functions_to_gc_per_run: 100 # Maximum number of attached functions to garbage collect per run gc_interval_mins: 120 # Run GC every x mins disallow_collections: [] # collection ids to disable GC on sysdb_config: diff --git a/rust/garbage_collector/src/config.rs b/rust/garbage_collector/src/config.rs index 1e31edde372..f3038973eeb 100644 --- a/rust/garbage_collector/src/config.rs +++ b/rust/garbage_collector/src/config.rs @@ -27,6 +27,12 @@ pub struct GarbageCollectorConfig { default = "GarbageCollectorConfig::default_collection_soft_delete_grace_period" )] pub(super) collection_soft_delete_grace_period: Duration, + #[serde( + rename = "attached_function_soft_delete_grace_period_seconds", + deserialize_with = "deserialize_duration_from_seconds", + default = "GarbageCollectorConfig::default_attached_function_soft_delete_grace_period" + )] + pub(super) attached_function_soft_delete_grace_period: Duration, #[serde( rename = "version_relative_cutoff_time_seconds", alias = "relative_cutoff_time_seconds", @@ -67,6 +73,12 @@ pub struct GarbageCollectorConfig { pub log: LogConfig, #[serde(default)] pub enable_dangerous_option_to_ignore_min_versions_for_wal3: bool, + #[serde(default = "GarbageCollectorConfig::default_heap_prune_buckets_to_read")] + pub heap_prune_buckets_to_read: u32, + #[serde(default = "GarbageCollectorConfig::default_heap_prune_max_items")] + pub heap_prune_max_items: u32, + #[serde(default = "GarbageCollectorConfig::default_max_attached_functions_to_gc_per_run")] + pub max_attached_functions_to_gc_per_run: i32, } impl GarbageCollectorConfig { @@ -119,6 +131,22 @@ impl GarbageCollectorConfig { Duration::from_secs(60 * 60 * 24) // 1 day } + fn default_attached_function_soft_delete_grace_period() -> Duration { + Duration::from_secs(60 * 60 * 24) // 1 day + } + + fn default_heap_prune_buckets_to_read() -> u32 { + 10 // Scan up to 10 time buckets per shard + } + + fn default_heap_prune_max_items() -> u32 { + 10000 // Prune up to 10k items per shard per GC pass + } + + fn default_max_attached_functions_to_gc_per_run() -> i32 { + 100 + } + fn enable_log_gc_for_tenant_threshold() -> String { "00000000-0000-0000-0000-000000000000".to_string() } diff --git a/rust/garbage_collector/src/garbage_collector_component.rs b/rust/garbage_collector/src/garbage_collector_component.rs index ff2965a1463..56678306347 100644 --- a/rust/garbage_collector/src/garbage_collector_component.rs +++ b/rust/garbage_collector/src/garbage_collector_component.rs @@ -25,8 +25,10 @@ use futures::StreamExt; use opentelemetry::metrics::{Counter, Histogram}; use opentelemetry::trace::TraceContextExt; use parking_lot::Mutex; +use s3heap_service::SysDbScheduler; use std::{ fmt::{Debug, Formatter}, + sync::Arc, time::{Duration, SystemTime}, }; use thiserror::Error; @@ -147,6 +149,167 @@ impl GarbageCollector { Ok(result) } + async fn prune_heap_across_shards(&self, cutoff_time: chrono::DateTime) { + tracing::info!( + "Pruning completed tasks from all heap shards (buckets_to_read={}, max_items={})", + self.config.heap_prune_buckets_to_read, + self.config.heap_prune_max_items + ); + + let prune_limits = s3heap::Limits::default() + .with_buckets(self.config.heap_prune_buckets_to_read as usize) + .with_items(self.config.heap_prune_max_items as usize) + .with_time_cut_off(cutoff_time); + + let mut total_stats = s3heap::PruneStats::default(); + let mut service_index = 0; + + // Create scheduler for checking task completion status + let scheduler: Arc = + Arc::new(SysDbScheduler::new(self.sysdb_client.clone())); + + // Iterate over all log service shards (rust-log-service-0, rust-log-service-1, ...) + // Limit to 100 shards to prevent infinite loop if we get unexpected errors + const MAX_SHARDS: u32 = 100; + loop { + if service_index >= MAX_SHARDS { + tracing::warn!( + "Reached max shard limit of {} during heap pruning", + MAX_SHARDS + ); + break; + } + let heap_prefix = + s3heap::heap_path_from_hostname(&format!("rust-log-service-{}", service_index)); + + let pruner = match s3heap::HeapPruner::new( + self.storage.clone(), + heap_prefix.clone(), + Arc::clone(&scheduler), + ) { + Ok(pruner) => pruner, + Err(s3heap::Error::UninitializedHeap(_)) => { + tracing::debug!( + "No heap found at shard {} - stopping iteration", + service_index + ); + break; + } + Err(e) => { + tracing::warn!( + "Error creating heap pruner for shard {}: {:?} - continuing", + service_index, + e + ); + service_index += 1; + continue; + } + }; + + match pruner.prune(prune_limits.clone()).await { + Ok(stats) => { + tracing::debug!( + "Pruned shard {}: {} items pruned, {} buckets deleted", + service_index, + stats.items_pruned, + stats.buckets_deleted + ); + total_stats.merge(&stats); + } + Err(e) => { + tracing::error!("Failed to prune heap shard {}: {}", service_index, e); + } + } + + service_index += 1; + } + + tracing::info!( + "Heap pruning complete: {} items pruned, {} items retained, {} buckets deleted, {} buckets updated across {} shards", + total_stats.items_pruned, + total_stats.items_retained, + total_stats.buckets_deleted, + total_stats.buckets_updated, + service_index + ); + } + + async fn garbage_collect_attached_functions( + &mut self, + attached_function_soft_delete_absolute_cutoff_time: SystemTime, + ) -> (u32, u32) { + tracing::info!("Checking for soft-deleted attached functions to hard delete"); + match self + .sysdb_client + .get_soft_deleted_attached_functions( + attached_function_soft_delete_absolute_cutoff_time, + self.config.max_attached_functions_to_gc_per_run, + ) + .await + { + Ok(attached_functions_to_delete) => { + if !attached_functions_to_delete.is_empty() { + tracing::info!( + "Found {} soft-deleted attached functions to hard delete", + attached_functions_to_delete.len() + ); + + let deletion_jobs = attached_functions_to_delete.into_iter().map(|attached_function_id| { + tracing::info!( + "Hard deleting attached function: {}", + attached_function_id + ); + + let instrumented_span = span!(parent: None, tracing::Level::INFO, "Hard delete attached function", attached_function_id = %attached_function_id); + Span::current().add_link(instrumented_span.context().span().span_context().clone()); + + let mut sysdb = self.sysdb_client.clone(); + Box::pin(async move { + // tanujnay112: Could batch this but just following the pattern of collection deletion below. + sysdb.finish_attached_function_deletion(attached_function_id).await + .map(|_| attached_function_id) + }.instrument(instrumented_span)) as std::pin::Pin> + Send + '_>> + }); + + let mut deletion_stream = + futures::stream::iter(deletion_jobs).buffer_unordered(10); + + let mut num_deleted = 0; + let mut num_failed = 0; + while let Some(result) = deletion_stream.next().await { + match result { + Ok(attached_function_id) => { + tracing::info!( + "Successfully hard deleted attached function: {}", + attached_function_id + ); + num_deleted += 1; + } + Err(e) => { + tracing::error!("Failed to hard delete attached function: {}", e); + num_failed += 1; + } + } + } + + tracing::info!( + "Attached function deletion completed: {} deleted, {} failed", + num_deleted, + num_failed + ); + (num_deleted, num_failed) + } else { + tracing::debug!("No soft-deleted attached functions found to hard delete"); + (0, 0) + } + } + Err(e) => { + tracing::error!("Failed to get soft-deleted attached functions: {}", e); + (0, 0) + } + } + } + async fn garbage_collect_collection( &self, version_absolute_cutoff_time: DateTime, @@ -390,6 +553,30 @@ impl Handler for GarbageCollector { self.config.collection_soft_delete_grace_period ); + let attached_function_soft_delete_absolute_cutoff_time = + now - self.config.attached_function_soft_delete_grace_period; + tracing::debug!( + "Using absolute cutoff time: {:?} for soft deleted attached functions (grace period: {:?})", + attached_function_soft_delete_absolute_cutoff_time, + self.config.attached_function_soft_delete_grace_period + ); + + // Garbage collect soft-deleted attached functions that are past the grace period + let (num_attached_functions_deleted, num_attached_functions_failed) = self + .garbage_collect_attached_functions(attached_function_soft_delete_absolute_cutoff_time) + .await; + tracing::debug!( + "Garbage collected {} soft-deleted attached functions, {} failed", + num_attached_functions_deleted, + num_attached_functions_failed + ); + + // Prune heap of completed tasks across all log service shards + let cutoff_time = chrono::DateTime::::from( + attached_function_soft_delete_absolute_cutoff_time, + ); + self.prune_heap_across_shards(cutoff_time).await; + // Get all collections to gc and create gc orchestrator for each. tracing::info!("Getting collections to gc"); let collections_to_gc = self @@ -840,6 +1027,7 @@ mod tests { }], version_cutoff_time: Duration::from_secs(1), collection_soft_delete_grace_period: Duration::from_secs(1), + attached_function_soft_delete_grace_period: Duration::from_secs(1), max_collections_to_gc: 100, max_collections_to_fetch: None, min_versions_to_keep: 2, @@ -868,6 +1056,9 @@ mod tests { log: LogConfig::Grpc(GrpcLogConfig::default()), enable_dangerous_option_to_ignore_min_versions_for_wal3: false, max_concurrent_list_files_operations_per_collection: 10, + heap_prune_buckets_to_read: 10, + heap_prune_max_items: 10000, + max_attached_functions_to_gc_per_run: 100, }; let registry = Registry::new(); @@ -1045,6 +1236,7 @@ mod tests { otel_endpoint: "none".to_string(), version_cutoff_time: Duration::from_secs(1), collection_soft_delete_grace_period: Duration::from_secs(1), + attached_function_soft_delete_grace_period: Duration::from_secs(1), max_collections_to_gc: 100, min_versions_to_keep: 2, filter_min_versions_if_alive: None, @@ -1195,4 +1387,245 @@ mod tests { } ); } + + #[tokio::test] + async fn test_k8s_integration_gc_prunes_heap_after_attached_function_deletion() { + use chroma_storage::Storage; + use chroma_sysdb::GrpcSysDb; + use s3heap::{HeapReader, HeapWriter, Limits, Schedule, Triggerable}; + use s3heap_service::SysDbScheduler; + + let tenant_id = format!("test-tenant-{}", Uuid::new_v4()); + + // Use actual log service shard naming for the heap + let log_service_index = 0; + let heap_prefix = + s3heap::heap_path_from_hostname(&format!("rust-log-service-{}", log_service_index)); + + let config = GarbageCollectorConfig { + service_name: "gc".to_string(), + otel_endpoint: "none".to_string(), + otel_filters: vec![], + version_cutoff_time: Duration::from_secs(1), + collection_soft_delete_grace_period: Duration::from_secs(1), + attached_function_soft_delete_grace_period: Duration::from_secs(1), + max_collections_to_gc: 0, // Don't GC collections in this test, only test heap pruning + max_collections_to_fetch: None, + min_versions_to_keep: 2, + filter_min_versions_if_alive: None, + gc_interval_mins: 10, + disallow_collections: HashSet::new(), + sysdb_config: GrpcSysDbConfig { + host: "localhost".to_string(), + port: 50051, + connect_timeout_ms: 5000, + request_timeout_ms: 10000, + num_channels: 1, + }, + dispatcher_config: DispatcherConfig::default(), + storage_config: s3_config_for_localhost_with_bucket_name("chroma-storage").await, + default_mode: CleanupMode::DeleteV2, + tenant_mode_overrides: None, + assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig::default(), + my_member_id: "test-gc-heap-pruning".to_string(), + memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig::default(), + port: 50056, + root_cache_config: Default::default(), + jemalloc_pprof_server_port: None, + enable_log_gc_for_tenant: Vec::new(), + enable_log_gc_for_tenant_threshold: "tenant-threshold".to_string(), + log: LogConfig::Grpc(GrpcLogConfig::default()), + enable_dangerous_option_to_ignore_min_versions_for_wal3: false, + max_concurrent_list_files_operations_per_collection: 10, + heap_prune_buckets_to_read: 10, // Reduce for faster test + heap_prune_max_items: 100, // Reduce for faster test + max_attached_functions_to_gc_per_run: 100, + }; + + let registry = Registry::new(); + + // Initialize storage and sysdb + let storage = Storage::try_from_config(&config.storage_config, ®istry) + .await + .unwrap(); + let mut sysdb = GrpcSysDb::try_from_config(&config.sysdb_config, ®istry) + .await + .unwrap(); + + // Create a test collection + let mut clients = ChromaGrpcClients::new().await.unwrap(); + let collection_name = format!("test-collection-{}", Uuid::new_v4()); + let database_name = format!("test-db-{}", Uuid::new_v4()); + let collection_id_str = clients + .create_database_and_collection(&tenant_id, &database_name, &collection_name, false) + .await + .unwrap(); + let collection_id = CollectionUuid::from_str(&collection_id_str).unwrap(); + + tracing::info!(%collection_id, "Created test collection"); + + // Create an attached function using the record_counter operator that exists + let attached_function_id = sysdb + .create_attached_function( + "test_function".to_string(), + "record_counter".to_string(), // Use existing operator + collection_id, + format!("test_output_{}", collection_id), + serde_json::json!({}), + tenant_id.clone(), + database_name.clone(), + 1, + ) + .await + .unwrap(); + + tracing::info!( + ?attached_function_id, + "Created attached function with record_counter operator" + ); + + // Write some scheduled tasks to the heap (using the actual log service heap path) + let scheduler: Arc = + Arc::new(SysDbScheduler::new(SysDb::Grpc(sysdb.clone()))); + + let writer = HeapWriter::new(storage.clone(), heap_prefix.clone(), Arc::clone(&scheduler)) + .await + .unwrap(); + + // Schedule 3 tasks + let now = chrono::Utc::now(); + let schedules = vec![ + Schedule { + triggerable: Triggerable { + partitioning: collection_id.0.into(), + scheduling: attached_function_id.0.into(), + }, + nonce: uuid::Uuid::new_v4(), + next_scheduled: now + chrono::Duration::seconds(10), + }, + Schedule { + triggerable: Triggerable { + partitioning: collection_id.0.into(), + scheduling: attached_function_id.0.into(), + }, + nonce: uuid::Uuid::new_v4(), + next_scheduled: now + chrono::Duration::seconds(20), + }, + Schedule { + triggerable: Triggerable { + partitioning: collection_id.0.into(), + scheduling: attached_function_id.0.into(), + }, + nonce: uuid::Uuid::new_v4(), + next_scheduled: now + chrono::Duration::seconds(30), + }, + ]; + + writer.push(&schedules).await.unwrap(); + tracing::info!("Pushed {} schedules to heap", schedules.len()); + + // Verify tasks are in the heap before GC + let reader = HeapReader::new(storage.clone(), heap_prefix.clone(), Arc::clone(&scheduler)) + .await + .unwrap(); + + let items_before = reader.peek(|_, _| true, Limits::default()).await.unwrap(); + let items_before_count = items_before.len(); + tracing::info!("Items in heap before GC: {}", items_before_count); + assert!( + items_before_count > 0, + "Should have at least 1 item in heap before GC" + ); + + // Count items for our specific attached function + let our_items_before = items_before + .iter() + .filter(|(_, item)| item.trigger.scheduling == attached_function_id.0.into()) + .count(); + tracing::info!( + "Items for our attached function before GC: {}", + our_items_before + ); + assert!( + our_items_before > 0, + "Should have items for our attached function" + ); + + // Soft delete the attached function + sysdb + .soft_delete_attached_function( + attached_function_id, + false, // don't delete output + ) + .await + .unwrap(); + tracing::info!("Soft deleted attached function"); + + // Wait for grace period to expire + tokio::time::sleep(Duration::from_secs(2)).await; + tracing::info!("Grace period expired, starting GC"); + + // Now run the garbage collector - it should both hard-delete the function AND prune the heap + let system = System::new(); + let mut garbage_collector = + GarbageCollector::try_from_config(&(config.clone(), system.clone()), ®istry) + .await + .unwrap(); + + // Don't set dispatcher - this test only cares about heap pruning + // Without a dispatcher, truncate_dirty_log will return early + garbage_collector.set_system(system.clone()); + + let mut gc_handle = system.start_component(garbage_collector); + + // Send memberlist update (required for GC to start processing) + gc_handle + .send( + vec![Member { + member_id: config.my_member_id.clone(), + member_ip: "0.0.0.0".to_string(), + member_node_name: format!("{}-node", config.my_member_id), + }], + None, + ) + .await + .unwrap(); + + tracing::info!("Sending GC request..."); + + let result = gc_handle + .request( + GarbageCollectMessage { tenant: None }, + Some(Span::current()), + ) + .await + .unwrap(); + + tracing::info!(?result, "Garbage collection completed"); + + // The GC should have: + // 1. Hard deleted the attached function (grace period expired) + // 2. Pruned the heap items associated with it + + // Verify heap items for our attached function were pruned - this is the key assertion + let items_after = reader.peek(|_, _| true, Limits::default()).await.unwrap(); + let items_after_count = items_after.len(); + tracing::info!("Items in heap after GC: {}", items_after_count); + + // Count items for our specific attached function after GC + let our_items_after = items_after + .iter() + .filter(|(_, item)| item.trigger.scheduling == attached_function_id.0.into()) + .count(); + tracing::info!( + "Items for our attached function after GC: {}", + our_items_after + ); + + assert_eq!( + our_items_after, 0, + "GC should have pruned all heap items for the deleted attached function (before: {}, after: {})", + our_items_before, our_items_after + ); + } } diff --git a/rust/sysdb/src/sysdb.rs b/rust/sysdb/src/sysdb.rs index caff1b7594a..471c188ee19 100644 --- a/rust/sysdb/src/sysdb.rs +++ b/rust/sysdb/src/sysdb.rs @@ -2198,6 +2198,69 @@ impl GrpcSysDb { .collect::, ScheduleEntryConversionError>>() .map_err(PeekScheduleError::Conversion) } + + async fn get_soft_deleted_attached_functions( + &mut self, + cutoff_time: SystemTime, + limit: i32, + ) -> Result, GetSoftDeletedAttachedFunctionsError> { + let cutoff_timestamp = prost_types::Timestamp { + seconds: cutoff_time + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + nanos: 0, + }; + + let req = chroma_proto::GetSoftDeletedAttachedFunctionsRequest { + cutoff_time: Some(cutoff_timestamp), + limit, + }; + + let res = self + .client + .get_soft_deleted_attached_functions(req) + .await + .map_err(|e| { + GetSoftDeletedAttachedFunctionsError::FailedToGetSoftDeletedAttachedFunctions(e) + })?; + + let attached_function_ids: Result, _> = res + .into_inner() + .attached_functions + .into_iter() + .map(|af| { + uuid::Uuid::parse_str(&af.id) + .map(chroma_types::AttachedFunctionUuid) + .map_err(|e| { + tracing::error!( + attached_function_id = %af.id, + error = %e, + "Server returned invalid attached_function_id UUID" + ); + GetSoftDeletedAttachedFunctionsError::ServerReturnedInvalidData + }) + }) + .collect(); + + attached_function_ids + } + + async fn finish_attached_function_deletion( + &mut self, + attached_function_id: chroma_types::AttachedFunctionUuid, + ) -> Result<(), FinishAttachedFunctionDeletionError> { + let req = chroma_proto::FinishAttachedFunctionDeletionRequest { + attached_function_id: attached_function_id.to_string(), + }; + + self.client + .finish_attached_function_deletion(req) + .await + .map_err(FinishAttachedFunctionDeletionError::FailedToFinishDeletion)?; + + Ok(()) + } } #[derive(Error, Debug)] @@ -2217,6 +2280,36 @@ impl ChromaError for PeekScheduleError { } } +#[derive(Error, Debug)] +pub enum GetSoftDeletedAttachedFunctionsError { + #[error("Failed to get soft deleted attached functions: {0}")] + FailedToGetSoftDeletedAttachedFunctions(#[from] tonic::Status), + #[error("Server returned invalid data - response contains corrupt attached function IDs")] + ServerReturnedInvalidData, + #[error("Not implemented for this SysDb backend")] + NotImplemented, +} + +impl ChromaError for GetSoftDeletedAttachedFunctionsError { + fn code(&self) -> ErrorCodes { + ErrorCodes::Internal + } +} + +#[derive(Error, Debug)] +pub enum FinishAttachedFunctionDeletionError { + #[error("Failed to finish attached function deletion: {0}")] + FailedToFinishDeletion(#[from] tonic::Status), + #[error("Not implemented for this SysDb backend")] + NotImplemented, +} + +impl ChromaError for FinishAttachedFunctionDeletionError { + fn code(&self) -> ErrorCodes { + ErrorCodes::Internal + } +} + #[derive(Error, Debug)] pub enum GetLastCompactionTimeError { #[error("Failed to fetch")] @@ -2424,13 +2517,37 @@ impl SysDb { grpc.soft_delete_attached_function(attached_function_id, delete_output) .await } - SysDb::Sqlite(_) => { - // SQLite implementation doesn't support soft_delete_attached_function yet - todo!("soft_delete_attached_function not implemented for SQLite") + SysDb::Sqlite(_) => Err(DeleteAttachedFunctionError::NotImplemented), + SysDb::Test(_) => Err(DeleteAttachedFunctionError::NotImplemented), + } + } + + pub async fn get_soft_deleted_attached_functions( + &mut self, + cutoff_time: SystemTime, + limit: i32, + ) -> Result, GetSoftDeletedAttachedFunctionsError> { + match self { + SysDb::Grpc(grpc) => { + grpc.get_soft_deleted_attached_functions(cutoff_time, limit) + .await } - SysDb::Test(_) => { - todo!() + SysDb::Sqlite(_) => Err(GetSoftDeletedAttachedFunctionsError::NotImplemented), + SysDb::Test(_) => Err(GetSoftDeletedAttachedFunctionsError::NotImplemented), + } + } + + pub async fn finish_attached_function_deletion( + &mut self, + attached_function_id: chroma_types::AttachedFunctionUuid, + ) -> Result<(), FinishAttachedFunctionDeletionError> { + match self { + SysDb::Grpc(grpc) => { + grpc.finish_attached_function_deletion(attached_function_id) + .await } + SysDb::Sqlite(_) => Err(FinishAttachedFunctionDeletionError::NotImplemented), + SysDb::Test(_) => Err(FinishAttachedFunctionDeletionError::NotImplemented), } } } @@ -2513,6 +2630,8 @@ pub enum DeleteAttachedFunctionError { NotFound, #[error("Failed to delete attached function: {0}")] FailedToDeleteAttachedFunction(#[from] tonic::Status), + #[error("Not implemented for this SysDb backend")] + NotImplemented, } impl ChromaError for DeleteAttachedFunctionError { @@ -2520,6 +2639,7 @@ impl ChromaError for DeleteAttachedFunctionError { match self { DeleteAttachedFunctionError::NotFound => ErrorCodes::NotFound, DeleteAttachedFunctionError::FailedToDeleteAttachedFunction(e) => e.code().into(), + DeleteAttachedFunctionError::NotImplemented => ErrorCodes::Internal, } } }