Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ tracing = { version = "0.1" }
tracing-bunyan-formatter = "0.3"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.11.0", features = ["v4", "fast-rng", "macro-diagnostics", "serde"] }
uuid = { version = "1.11.0", features = ["v4", "v7", "fast-rng", "macro-diagnostics", "serde"] }
utoipa = { version = "5.0.0", features = ["macros", "axum_extras", "debug", "uuid"] }
sqlx = { version = "0.8.3", features = ["runtime-tokio", "sqlite", "postgres", "chrono"] }
sha2 = "0.10.8"
Expand Down
3 changes: 3 additions & 0 deletions examples/task_api_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import chromadb
import time

# Connect to Chroma server
client = chromadb.HttpClient(host="localhost", port=8000)
Expand Down Expand Up @@ -60,6 +61,8 @@
print("Task is now registered and will run on new data!")
print("=" * 60)

time.sleep(10)

# Add more documents to trigger task execution
print("\nAdding more documents...")
collection.add(
Expand Down
12 changes: 12 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/google/uuid"
"github.com/pingcap/log"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -243,6 +244,17 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
}

func (s *Coordinator) FlushCollectionCompactionAndTask(
ctx context.Context,
flushCollectionCompaction *model.FlushCollectionCompaction,
taskID uuid.UUID,
taskRunNonce uuid.UUID,
completionOffset int64,
nextRunDelaySecs uint64,
) (*model.FlushCollectionInfo, error) {
return s.catalog.FlushCollectionCompactionAndTask(ctx, flushCollectionCompaction, taskID, taskRunNonce, completionOffset, nextRunDelaySecs)
}

func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string, minVersionsIfAlive *uint64) ([]*model.CollectionToGc, error) {
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID, minVersionsIfAlive)
}
Expand Down
5 changes: 5 additions & 0 deletions go/pkg/sysdb/coordinator/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/chroma-core/chroma/go/pkg/types"
"github.com/google/uuid"
)

type Collection struct {
Expand Down Expand Up @@ -98,6 +99,10 @@ type FlushCollectionInfo struct {
ID string
CollectionVersion int32
TenantLastCompactionTime int64
// Optional task fields (only populated for task-based compactions)
TaskNextNonce *uuid.UUID
TaskNextRun *time.Time
TaskCompletionOffset *int64
}

func FilterCollection(collection *Collection, collectionID types.UniqueID, collectionName *string) bool {
Expand Down
56 changes: 54 additions & 2 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (tc *Catalog) createCollectionImpl(txCtx context.Context, createCollection
return nil, false, err
}
if len(databases) == 0 {
log.Error("database not found", zap.Error(err))
log.Error("database not found for database", zap.String("database_name", databaseName), zap.String("tenant_id", tenantID))
return nil, false, common.ErrDatabaseNotFound
}

Expand Down Expand Up @@ -1343,7 +1343,7 @@ func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollec
return nil, false, err
}
if len(databases) == 0 {
log.Error("database not found", zap.Error(err))
log.Error("database not found for database", zap.String("database_name", createCollection.DatabaseName), zap.String("tenant_id", createCollection.TenantID))
return nil, false, common.ErrDatabaseNotFound
}

Expand Down Expand Up @@ -1719,6 +1719,58 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio
return flushCollectionInfo, nil
}

// FlushCollectionCompactionAndTask atomically updates collection compaction data and task completion offset.
// NOTE: This does NOT advance next_nonce - that is done separately by AdvanceTask in PrepareTask.
// This only updates the completion_offset to record how far we've processed.
// This is only supported for versioned collections (the modern/default path).
func (tc *Catalog) FlushCollectionCompactionAndTask(
ctx context.Context,
flushCollectionCompaction *model.FlushCollectionCompaction,
taskID uuid.UUID,
taskRunNonce uuid.UUID,
completionOffset int64,
nextRunDelaySecs uint64,
) (*model.FlushCollectionInfo, error) {
if !tc.versionFileEnabled {
// Task-based compactions are only supported with versioned collections
log.Error("FlushCollectionCompactionAndTask is only supported for versioned collections")
return nil, errors.New("task-based compaction requires versioned collections")
}

var flushCollectionInfo *model.FlushCollectionInfo

err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
var err error
flushCollectionInfo, err = tc.FlushCollectionCompactionForVersionedCollection(txCtx, flushCollectionCompaction)
if err != nil {
return err
}

// Update ONLY completion_offset - next_nonce was already advanced in PrepareTask
// We still validate taskRunNonce to ensure we're updating the correct epoch
err = tc.metaDomain.TaskDb(txCtx).UpdateCompletionOffset(taskID, taskRunNonce, completionOffset)
if err != nil {
return err
}

return nil
})

if err != nil {
return nil, err
}

// Populate task fields with authoritative values from database
flushCollectionInfo.TaskCompletionOffset = &completionOffset

log.Info("FlushCollectionCompactionAndTask",
zap.String("collection_id", flushCollectionCompaction.ID.String()),
zap.String("task_id", taskID.String()),
zap.Int64("completion_offset", completionOffset))

return flushCollectionInfo, nil
}

func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVersionFile, collectionID string, version int64) error {
if versionFile.GetCollectionInfoImmutable().GetCollectionId() != collectionID {
log.Error("collection id mismatch", zap.String("collection_id", collectionID), zap.String("version_file_collection_id", versionFile.GetCollectionInfoImmutable().GetCollectionId()))
Expand Down
Loading
Loading