Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 83 additions & 0 deletions go/pkg/sysdb/coordinator/create_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// testMinimalUUIDv7 is the test's copy of minimalUUIDv7 from task.go
Expand Down Expand Up @@ -663,3 +664,85 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param
func TestAttachFunctionTestSuite(t *testing.T) {
suite.Run(t, new(AttachFunctionTestSuite))
}

// TestGetSoftDeletedAttachedFunctions_TimestampConsistency verifies that timestamps
// are returned in microseconds (UnixMicro) to match other API methods
func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
ctx := context.Background()

// Create test timestamps with known values
testTime := time.Date(2025, 10, 30, 12, 0, 0, 123456000, time.UTC) // 123.456 milliseconds
expectedMicros := uint64(testTime.UnixMicro())

// Create mock coordinator with minimal setup
mockMetaDomain := &dbmodel_mocks.IMetaDomain{}
mockAttachedFunctionDb := &dbmodel_mocks.IAttachedFunctionDb{}
mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(mockAttachedFunctionDb)

// Mock the database response with our test timestamps
attachedFunctions := []*dbmodel.AttachedFunction{
{
ID: uuid.New(),
Name: "test_function",
InputCollectionID: "collection_123",
OutputCollectionName: "output_collection",
CompletionOffset: 100,
MinRecordsForInvocation: 10,
CreatedAt: testTime,
UpdatedAt: testTime,
NextRun: testTime,
},
}

mockAttachedFunctionDb.On("GetSoftDeletedAttachedFunctions", mock.Anything, mock.Anything).
Return(attachedFunctions, nil)

coordinator := &Coordinator{
catalog: Catalog{
metaDomain: mockMetaDomain,
},
}

// Call GetSoftDeletedAttachedFunctions
cutoffTime := timestamppb.New(testTime.Add(-24 * time.Hour))
resp, err := coordinator.GetSoftDeletedAttachedFunctions(ctx, &coordinatorpb.GetSoftDeletedAttachedFunctionsRequest{
CutoffTime: cutoffTime,
Limit: 100,
})

// Verify response
if err != nil {
t.Fatalf("GetSoftDeletedAttachedFunctions failed: %v", err)
}
if len(resp.AttachedFunctions) != 1 {
t.Fatalf("Expected 1 attached function, got %d", len(resp.AttachedFunctions))
}

af := resp.AttachedFunctions[0]

// Verify timestamps are in microseconds (not seconds)
if af.CreatedAt != expectedMicros {
t.Errorf("CreatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.CreatedAt)
}
if af.UpdatedAt != expectedMicros {
t.Errorf("UpdatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.UpdatedAt)
}
if af.NextRunAt != expectedMicros {
t.Errorf("NextRunAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.NextRunAt)
}

// Verify these are NOT in seconds (would be ~1000x smaller)
expectedSeconds := uint64(testTime.Unix())
if af.CreatedAt == expectedSeconds {
t.Error("CreatedAt appears to be in seconds instead of microseconds")
}
if af.UpdatedAt == expectedSeconds {
t.Error("UpdatedAt appears to be in seconds instead of microseconds")
}
if af.NextRunAt == expectedSeconds {
t.Error("NextRunAt appears to be in seconds instead of microseconds")
}

mockMetaDomain.AssertExpectations(t)
mockAttachedFunctionDb.AssertExpectations(t)
}
72 changes: 72 additions & 0 deletions go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,75 @@ func (s *Coordinator) CleanupExpiredPartialAttachedFunctions(ctx context.Context
CleanedUpIds: cleanedAttachedFunctionIDStrings,
}, nil
}

// GetSoftDeletedAttachedFunctions retrieves attached functions that are soft deleted and were updated before the cutoff time
func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
log := log.With(zap.String("method", "GetSoftDeletedAttachedFunctions"))

if req.CutoffTime == nil {
log.Error("GetSoftDeletedAttachedFunctions: cutoff_time is required")
return nil, status.Errorf(codes.InvalidArgument, "cutoff_time is required")
}

if req.Limit <= 0 {
log.Error("GetSoftDeletedAttachedFunctions: limit must be greater than 0")
return nil, status.Errorf(codes.InvalidArgument, "limit must be greater than 0")
}

cutoffTime := req.CutoffTime.AsTime()
attachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(ctx).GetSoftDeletedAttachedFunctions(cutoffTime, req.Limit)
if err != nil {
log.Error("GetSoftDeletedAttachedFunctions: failed to get soft deleted attached functions", zap.Error(err))
return nil, err
}

// Convert to proto response
protoAttachedFunctions := make([]*coordinatorpb.AttachedFunction, len(attachedFunctions))
for i, af := range attachedFunctions {
protoAttachedFunctions[i] = &coordinatorpb.AttachedFunction{
Id: af.ID.String(),
Name: af.Name,
InputCollectionId: af.InputCollectionID,
OutputCollectionName: af.OutputCollectionName,
CompletionOffset: uint64(af.CompletionOffset),
MinRecordsForInvocation: uint64(af.MinRecordsForInvocation),
CreatedAt: uint64(af.CreatedAt.UnixMicro()),
UpdatedAt: uint64(af.UpdatedAt.UnixMicro()),
}

protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.UnixMicro())
if af.OutputCollectionID != nil {
protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID)
}
}

log.Info("GetSoftDeletedAttachedFunctions: completed successfully",
zap.Int("count", len(attachedFunctions)))

return &coordinatorpb.GetSoftDeletedAttachedFunctionsResponse{
AttachedFunctions: protoAttachedFunctions,
}, nil
}

// FinishAttachedFunctionDeletion permanently deletes an attached function from the database (hard delete)
// This should only be called after the soft delete grace period has passed
func (s *Coordinator) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
log := log.With(zap.String("method", "FinishAttachedFunctionDeletion"))

attachedFunctionID, err := uuid.Parse(req.AttachedFunctionId)
if err != nil {
log.Error("FinishAttachedFunctionDeletion: invalid attached_function_id", zap.Error(err))
return nil, status.Errorf(codes.InvalidArgument, "invalid attached_function_id: %v", err)
}

err = s.catalog.metaDomain.AttachedFunctionDb(ctx).HardDeleteAttachedFunction(attachedFunctionID)
if err != nil {
log.Error("FinishAttachedFunctionDeletion: failed to hard delete attached function", zap.Error(err))
return nil, err
}

log.Info("FinishAttachedFunctionDeletion: completed successfully",
zap.String("attached_function_id", attachedFunctionID.String()))

return &coordinatorpb.FinishAttachedFunctionDeletionResponse{}, nil
}
26 changes: 26 additions & 0 deletions go/pkg/sysdb/grpc/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,29 @@ func (s *Server) CleanupExpiredPartialAttachedFunctions(ctx context.Context, req
log.Info("CleanupExpiredPartialAttachedFunctions succeeded", zap.Uint64("cleaned_up_count", res.CleanedUpCount))
return res, nil
}

func (s *Server) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
log.Info("GetSoftDeletedAttachedFunctions", zap.Time("cutoff_time", req.CutoffTime.AsTime()), zap.Int32("limit", req.Limit))

res, err := s.coordinator.GetSoftDeletedAttachedFunctions(ctx, req)
if err != nil {
log.Error("GetSoftDeletedAttachedFunctions failed", zap.Error(err))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}

log.Info("GetSoftDeletedAttachedFunctions succeeded", zap.Int("count", len(res.AttachedFunctions)))
return res, nil
}

func (s *Server) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
log.Info("FinishAttachedFunctionDeletion", zap.String("id", req.AttachedFunctionId))

res, err := s.coordinator.FinishAttachedFunctionDeletion(ctx, req)
if err != nil {
log.Error("FinishAttachedFunctionDeletion failed", zap.Error(err))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}

log.Info("FinishAttachedFunctionDeletion succeeded", zap.String("id", req.AttachedFunctionId))
return res, nil
}
48 changes: 48 additions & 0 deletions go/pkg/sysdb/metastore/db/dao/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,51 @@ func (s *attachedFunctionDb) CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid

return ids, nil
}

// GetSoftDeletedAttachedFunctions returns attached functions that are soft deleted
// and were updated before the cutoff time (eligible for hard deletion)
func (s *attachedFunctionDb) GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*dbmodel.AttachedFunction, error) {
var attachedFunctions []*dbmodel.AttachedFunction
err := s.db.
Where("is_deleted = ?", true).
Where("updated_at < ?", cutoffTime).
Limit(int(limit)).
Find(&attachedFunctions).Error

if err != nil {
log.Error("GetSoftDeletedAttachedFunctions failed",
zap.Error(err),
zap.Time("cutoff_time", cutoffTime))
return nil, err
}

log.Debug("GetSoftDeletedAttachedFunctions found attached functions",
zap.Int("count", len(attachedFunctions)),
zap.Time("cutoff_time", cutoffTime))

return attachedFunctions, nil
}

// HardDeleteAttachedFunction permanently deletes an attached function from the database
// This should only be called after the soft delete grace period has passed
func (s *attachedFunctionDb) HardDeleteAttachedFunction(id uuid.UUID) error {
result := s.db.Unscoped().Delete(&dbmodel.AttachedFunction{}, "id = ? AND is_deleted = true", id)

if result.Error != nil {
log.Error("HardDeleteAttachedFunction failed",
zap.Error(result.Error),
zap.String("id", id.String()))
return result.Error
}

if result.RowsAffected == 0 {
log.Warn("HardDeleteAttachedFunction: no rows affected (attached function not found)",
zap.String("id", id.String()))
return nil // Idempotent - no error if not found
}

log.Info("HardDeleteAttachedFunction succeeded",
zap.String("id", id.String()))

return nil
}
50 changes: 50 additions & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ type IAttachedFunctionDb interface {
PeekScheduleByCollectionId(collectionIDs []string) ([]*AttachedFunction, error)
GetMinCompletionOffsetForCollection(inputCollectionID string) (*int64, error)
CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid.UUID, error)
GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*AttachedFunction, error)
HardDeleteAttachedFunction(id uuid.UUID) error
}
17 changes: 17 additions & 0 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,21 @@ message PeekScheduleByCollectionIdResponse {
repeated ScheduleEntry schedule = 1;
}

message GetSoftDeletedAttachedFunctionsRequest {
google.protobuf.Timestamp cutoff_time = 1;
int32 limit = 2;
}

message GetSoftDeletedAttachedFunctionsResponse {
repeated AttachedFunction attached_functions = 1;
}

message FinishAttachedFunctionDeletionRequest {
string attached_function_id = 1;
}

message FinishAttachedFunctionDeletionResponse {}

service SysDB {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
Expand Down Expand Up @@ -731,4 +746,6 @@ service SysDB {
rpc CleanupExpiredPartialAttachedFunctions(CleanupExpiredPartialAttachedFunctionsRequest) returns (CleanupExpiredPartialAttachedFunctionsResponse) {}
rpc GetFunctions(GetFunctionsRequest) returns (GetFunctionsResponse) {}
rpc PeekScheduleByCollectionId(PeekScheduleByCollectionIdRequest) returns (PeekScheduleByCollectionIdResponse) {}
rpc GetSoftDeletedAttachedFunctions(GetSoftDeletedAttachedFunctionsRequest) returns (GetSoftDeletedAttachedFunctionsResponse) {}
rpc FinishAttachedFunctionDeletion(FinishAttachedFunctionDeletionRequest) returns (FinishAttachedFunctionDeletionResponse) {}
}
5 changes: 5 additions & 0 deletions rust/frontend/src/impls/service_based_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,11 @@ impl ServiceBasedFrontend {
chroma_sysdb::DeleteAttachedFunctionError::FailedToDeleteAttachedFunction(s) => {
DetachFunctionError::Internal(Box::new(chroma_error::TonicError(s)))
}
chroma_sysdb::DeleteAttachedFunctionError::NotImplemented => {
DetachFunctionError::Internal(Box::new(chroma_error::TonicError(
tonic::Status::unimplemented("Not implemented"),
)))
}
Comment on lines +1978 to +1981
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

It's good that you're handling the NotImplemented error case. However, returning a generic tonic::Status::unimplemented("Not implemented") might be a bit opaque for clients. It would be more informative to specify which backend is missing the implementation, which can aid in debugging, especially in environments where different backends might be in use.

Consider making the error message more specific.

Context for Agents
[**BestPractice**]

It's good that you're handling the `NotImplemented` error case. However, returning a generic `tonic::Status::unimplemented("Not implemented")` might be a bit opaque for clients. It would be more informative to specify which backend is missing the implementation, which can aid in debugging, especially in environments where different backends might be in use.

Consider making the error message more specific.

File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1981

})?;

Ok(DetachFunctionResponse { success: true })
Expand Down
2 changes: 2 additions & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ chroma-index = { workspace = true }
chroma-memberlist = { workspace = true }
chroma-tracing = { workspace = true }
chroma-jemalloc-pprof-server = { workspace = true }
s3heap = { workspace = true }
s3heap-service = { workspace = true }
wal3 = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
Expand Down
1 change: 1 addition & 0 deletions rust/garbage_collector/garbage_collector_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ otel_filters:
filter_level: "debug"
relative_cutoff_time_seconds: 43200 # GC all versions created at time < now() - relative_cutoff_time_seconds (12 hours)
max_collections_to_gc: 1000 # Maximum number of collections to GC in one run
max_attached_functions_to_gc_per_run: 100 # Maximum number of attached functions to garbage collect per run
gc_interval_mins: 120 # Run GC every x mins
disallow_collections: [] # collection ids to disable GC on
sysdb_config:
Expand Down
Loading
Loading