Skip to content

Commit 19d4ecf

Browse files
committed
[ENH]: Add operator to finalize a task's completion
1 parent ff77a60 commit 19d4ecf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3789
-336
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ tracing = { version = "0.1" }
5151
tracing-bunyan-formatter = "0.3"
5252
tracing-opentelemetry = "0.28.0"
5353
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
54-
uuid = { version = "1.11.0", features = ["v4", "fast-rng", "macro-diagnostics", "serde"] }
54+
uuid = { version = "1.11.0", features = ["v4", "v7", "fast-rng", "macro-diagnostics", "serde"] }
5555
utoipa = { version = "5.0.0", features = ["macros", "axum_extras", "debug", "uuid"] }
5656
sqlx = { version = "0.8.3", features = ["runtime-tokio", "sqlite", "postgres", "chrono"] }
5757
sha2 = "0.10.8"

examples/task_api_example.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"""
88

99
import chromadb
10+
import time
1011

1112
# Connect to Chroma server
1213
client = chromadb.HttpClient(host="localhost", port=8000)
@@ -60,6 +61,8 @@
6061
print("Task is now registered and will run on new data!")
6162
print("=" * 60)
6263

64+
time.sleep(10)
65+
6366
# Add more documents to trigger task execution
6467
print("\nAdding more documents...")
6568
collection.add(

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
1313
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
1414
"github.com/chroma-core/chroma/go/pkg/types"
15+
"github.com/google/uuid"
1516
"github.com/pingcap/log"
1617
"go.uber.org/zap"
1718
)
@@ -243,6 +244,17 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
243244
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
244245
}
245246

247+
func (s *Coordinator) FlushCollectionCompactionAndTask(
248+
ctx context.Context,
249+
flushCollectionCompaction *model.FlushCollectionCompaction,
250+
taskID uuid.UUID,
251+
taskRunNonce uuid.UUID,
252+
completionOffset int64,
253+
nextRunDelaySecs uint64,
254+
) (*model.FlushCollectionInfo, error) {
255+
return s.catalog.FlushCollectionCompactionAndTask(ctx, flushCollectionCompaction, taskID, taskRunNonce, completionOffset, nextRunDelaySecs)
256+
}
257+
246258
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string, minVersionsIfAlive *uint64) ([]*model.CollectionToGc, error) {
247259
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID, minVersionsIfAlive)
248260
}

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"github.com/chroma-core/chroma/go/pkg/types"
7+
"github.com/google/uuid"
78
)
89

910
type Collection struct {
@@ -98,6 +99,10 @@ type FlushCollectionInfo struct {
9899
ID string
99100
CollectionVersion int32
100101
TenantLastCompactionTime int64
102+
// Optional task fields (only populated for task-based compactions)
103+
TaskNextNonce *uuid.UUID
104+
TaskNextRun *time.Time
105+
TaskCompletionOffset *int64
101106
}
102107

103108
func FilterCollection(collection *Collection, collectionID types.UniqueID, collectionName *string) bool {

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func (tc *Catalog) createCollectionImpl(txCtx context.Context, createCollection
280280
return nil, false, err
281281
}
282282
if len(databases) == 0 {
283-
log.Error("database not found", zap.Error(err))
283+
log.Error("database not found for database 283", zap.String("database_name", databaseName), zap.String("tenant_id", tenantID))
284284
return nil, false, common.ErrDatabaseNotFound
285285
}
286286

@@ -1343,7 +1343,7 @@ func (tc *Catalog) CreateCollectionAndSegments(ctx context.Context, createCollec
13431343
return nil, false, err
13441344
}
13451345
if len(databases) == 0 {
1346-
log.Error("database not found", zap.Error(err))
1346+
log.Error("database not found for database", zap.String("database_name", createCollection.DatabaseName), zap.String("tenant_id", createCollection.TenantID))
13471347
return nil, false, common.ErrDatabaseNotFound
13481348
}
13491349

@@ -1719,6 +1719,58 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio
17191719
return flushCollectionInfo, nil
17201720
}
17211721

1722+
// FlushCollectionCompactionAndTask atomically updates collection compaction data and task completion offset.
1723+
// NOTE: This does NOT advance next_nonce - that is done separately by AdvanceTask in PrepareTask.
1724+
// This only updates the completion_offset to record how far we've processed.
1725+
// This is only supported for versioned collections (the modern/default path).
1726+
func (tc *Catalog) FlushCollectionCompactionAndTask(
1727+
ctx context.Context,
1728+
flushCollectionCompaction *model.FlushCollectionCompaction,
1729+
taskID uuid.UUID,
1730+
taskRunNonce uuid.UUID,
1731+
completionOffset int64,
1732+
nextRunDelaySecs uint64,
1733+
) (*model.FlushCollectionInfo, error) {
1734+
if !tc.versionFileEnabled {
1735+
// Task-based compactions are only supported with versioned collections
1736+
log.Error("FlushCollectionCompactionAndTask is only supported for versioned collections")
1737+
return nil, errors.New("task-based compaction requires versioned collections")
1738+
}
1739+
1740+
var flushCollectionInfo *model.FlushCollectionInfo
1741+
1742+
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
1743+
var err error
1744+
flushCollectionInfo, err = tc.FlushCollectionCompactionForVersionedCollection(txCtx, flushCollectionCompaction)
1745+
if err != nil {
1746+
return err
1747+
}
1748+
1749+
// Update ONLY completion_offset - next_nonce was already advanced in PrepareTask
1750+
// We still validate taskRunNonce to ensure we're updating the correct epoch
1751+
err = tc.metaDomain.TaskDb(txCtx).UpdateCompletionOffset(taskID, taskRunNonce, completionOffset)
1752+
if err != nil {
1753+
return err
1754+
}
1755+
1756+
return nil
1757+
})
1758+
1759+
if err != nil {
1760+
return nil, err
1761+
}
1762+
1763+
// Populate task fields with authoritative values from database
1764+
flushCollectionInfo.TaskCompletionOffset = &completionOffset
1765+
1766+
log.Info("FlushCollectionCompactionAndTask",
1767+
zap.String("collection_id", flushCollectionCompaction.ID.String()),
1768+
zap.String("task_id", taskID.String()),
1769+
zap.Int64("completion_offset", completionOffset))
1770+
1771+
return flushCollectionInfo, nil
1772+
}
1773+
17221774
func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVersionFile, collectionID string, version int64) error {
17231775
if versionFile.GetCollectionInfoImmutable().GetCollectionId() != collectionID {
17241776
log.Error("collection id mismatch", zap.String("collection_id", collectionID), zap.String("version_file_collection_id", versionFile.GetCollectionInfoImmutable().GetCollectionId()))

0 commit comments

Comments
 (0)