Skip to content

Conversation

@tanujnay112
Copy link
Contributor

@tanujnay112 tanujnay112 commented Oct 20, 2025

Description of changes

This change implements the TaskRunner that pulls scheduled Tasks from the heap and runs them. This reuses a large part of the CompactionManager's code with the following additions:

  • The CompactionManager takes in a strongly typed binary enum to specify whether it is intended to service Tasks or usual collection Compactions. All the changes are meant to take effect in Tasks mode.
  • The scheduler regularly pulls tasks from the S3 heap and adds them to its internal queue that the overlying CompactionManager polls.
  • CompactionOrchestrator has been refactored to not just take in an input_collection to compact but an input_collection and output_collection. Its preexisting operators have been refactored as such as well.
  • For each scheduled task the Task-related CompactionManager finds, it kicks off a CompactionOrchestrator with PrepareTask as the initial operator. PrepareTask does the following:
    • It takes in a task uuid and task nonce to resolve a SysDB Task object. It creates an output collection if needed.
    • It marks the beginning of this nonce's execution by generating a new value for next_nonce​ in the Tasks​ table in the SysDB.
      • It detects if this step had already been done before by seeing whether Tasks.lowest_live_nonce != Tasks.next_nonce​.
        • If the two are not equal, the previously described increment to next_nonce​ does not occur. This case implies that there was an incomplete execution of lowest_live_nonce​. PrepareTask compares the scout_logs​ result of the given input collection with Tasks.completion_offset​ to determine whether this Task needs to be re-executed or whether it can skip to the FinishTask operator.
  • The Register operator has been edited to update the current Task's completion offset in the same transaction where the output collection's records are flushed to SysDB.
  • A FinishTask operator has been added to run after the Register operator in Task-related CompactionOrchestrators. This operator rechecks scout_logs for the given input collection. If the resulting offset is too far ahead of the completion_offset​ that we sent to SysDB in Register, we will schedule a new item in the heap to do this task again at a later time. We then unconditionally set lowest_live_nonce​ to next_nonce​ in the SysDB, marking lowest_live_nonce​ as completed and allowing all heap entries below the new value of lowest_live_nonce​ to be pruned.
  • Improvements & Bug fixes
    • ^^^^
  • New functionality
    • The TaskRunner's operation as a separate system component in the Compactor node is disabled by default it can be enabled by the following configuration in the compaction_service config.
  task_runner:
    enabled: true

Relevant TODOs after this:

  • Adjust scheduler to be aware of lowest_live_nonce
  • Send a new heap item in FinishTask​ if the scout_logs​ check deems it necessary.
  • Adjust create_task​ to do a 2-phase commit and adjust the TaskRunner logic to handle partially created tasks. create_task​ must also send over an initial backfill item to the heap if its input collection existed before this Task.
  • Validate operator parameters and pull in operator data during TaskRunner execution.
  • TaskRunner only pulls data from compacted segments of the input collection. It is possible to also pull log data in this process.
  • Support for incremental tasks.

Test plan

How are these changes tested?

An integration test has been added in compact.rs.

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?

Observability plan

What is the plan to instrument and monitor this change?

Documentation Changes

Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_

Copy link
Contributor Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

@github-actions
Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

@tanujnay112 tanujnay112 changed the title [ENH]: Add operator to finalize a task's completion [ENH]: TaskRunner based off CompactionManager Oct 20, 2025
@tanujnay112 tanujnay112 marked this pull request as ready for review October 21, 2025 00:01
@propel-code-bot
Copy link
Contributor

propel-code-bot bot commented Oct 21, 2025

Introduction of TaskRunner based on CompactionManager

This pull request introduces the TaskRunner, a new component designed to execute background tasks by pulling scheduled jobs from an S3-based heap. The implementation heavily reuses and refactors the existing CompactionManager, generalizing it to handle both standard collection compactions and new task executions, distinguished by a JobMode enum.

The core of the change is a new task execution lifecycle managed by a series of operators. A PrepareTask operator fetches the task definition, validates an execution nonce, creates an output collection if needed, and advances the task's state in SysDB by updating next_nonce. It can also detect incomplete previous runs and determine if the current execution can be skipped. The Register operator has been modified to transactionally flush task output data while updating the task's completion_offset. Finally, a new FinishTask operator finalizes the run by re-checking for new data (and potentially rescheduling the task) before marking the current nonce as complete. This entire feature is disabled by default and can be activated via a configuration flag.

Key Changes

• Introduced TaskRunner, a new component for executing background tasks, built upon a refactored CompactionManager.
• Generalized CompactionManager to handle both Compaction and Task jobs via a new JobMode enum in rust/worker/src/compactor/compaction_manager.rs.
• Created new operators PrepareTask and FinishTask to manage the lifecycle of a task execution, located in rust/worker/src/execution/operators/.
• Implemented a nonce-based state management system (next_nonce, lowest_live_nonce) in the Tasks table to ensure correct execution and handle failures.
• Refactored CompactionOrchestrator to support distinct input_collection and output_collection parameters.
• Modified the Register operator to transactionally update a task's completion_offset alongside flushing data via a new flush_compaction_and_task SysDB call.
• Added new SysDB RPCs (AdvanceTask, FinishTask, GetTaskByUuid, CreateOutputCollectionForTask) and database schema changes to support task state management.
• The TaskRunner is disabled by default and can be enabled via the task_runner.enabled configuration in rust/worker/src/config.rs.

Affected Areas

• rust/worker/src/compactor/
• rust/worker/src/execution/orchestration/
• rust/worker/src/execution/operators/
• go/pkg/sysdb/coordinator/
• go/pkg/sysdb/metastore/db/dao/
• go/pkg/sysdb/metastore/db/migrations/
• idl/chromadb/proto/
• rust/types/
• rust/s3heap-service/

This summary was automatically generated by @propel-code-bot

Comment on lines +193 to +194
JobMode::Compaction => {
let jobs_iter = self.scheduler.get_jobs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[PerformanceOptimization]

Potential N+1 query pattern:

let jobs_iter = self.scheduler.get_jobs();
for job in jobs_iter {
    // ... process each job
}

While not a direct database query, verify that get_jobs() doesn't internally make individual queries for each job. If it does, consider fetching all jobs in a single batch operation before the loop.

Context for Agents
[**PerformanceOptimization**]

Potential N+1 query pattern:

```rust
let jobs_iter = self.scheduler.get_jobs();
for job in jobs_iter {
    // ... process each job
}
```

While not a direct database query, verify that `get_jobs()` doesn't internally make individual queries for each job. If it does, consider fetching all jobs in a single batch operation before the loop.

File: rust/worker/src/compactor/compaction_manager.rs
Line: 194

Comment on lines +1967 to +1968
std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_micros(task.next_run_at);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Potential integer overflow in timestamp conversion:

let next_run = std::time::UNIX_EPOCH + std::time::Duration::from_micros(task.next_run_at);

from_micros() can panic if task.next_run_at is too large to fit in a Duration. This could happen with invalid or corrupted database values. Use checked_add or validate the timestamp range before conversion:

let duration = std::time::Duration::from_micros(task.next_run_at);
let next_run = std::time::UNIX_EPOCH.checked_add(duration)
    .ok_or_else(|| GetTaskError::ServerReturnedInvalidData)?;
Context for Agents
[**BestPractice**]

Potential integer overflow in timestamp conversion:

```rust
let next_run = std::time::UNIX_EPOCH + std::time::Duration::from_micros(task.next_run_at);
```

`from_micros()` can panic if `task.next_run_at` is too large to fit in a Duration. This could happen with invalid or corrupted database values. Use `checked_add` or validate the timestamp range before conversion:

```rust
let duration = std::time::Duration::from_micros(task.next_run_at);
let next_run = std::time::UNIX_EPOCH.checked_add(duration)
    .ok_or_else(|| GetTaskError::ServerReturnedInvalidData)?;
```

File: rust/sysdb/src/sysdb.rs
Line: 1968

suite.Require().Equal(common.ErrTaskNotFound, err)

suite.db.Unscoped().Delete(&dbmodel.Task{}, "task_id = ?", task.ID)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[TestCoverage]

The new functions FinishTask and UpdateCompletionOffset have been added to taskDb, but only UpdateCompletionOffset has a test. It would be beneficial to add a test case for FinishTask as well to ensure the nonce logic for completing a task epoch is fully covered.

Context for Agents
[**TestCoverage**]

The new functions `FinishTask` and `UpdateCompletionOffset` have been added to `taskDb`, but only `UpdateCompletionOffset` has a test. It would be beneficial to add a test case for `FinishTask` as well to ensure the nonce logic for completing a task epoch is fully covered.

File: go/pkg/sysdb/metastore/db/dao/task_test.go
Line: 520

MinRecordsForTask: uint64(task.MinRecordsForTask),
TenantId: task.TenantID,
DatabaseId: task.DatabaseID,
NextRunAt: uint64(task.NextRun.UnixMicro()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

In GetTaskByName, NextRunAt is converted using UnixMicro(). However, in PeekScheduleByCollectionId (line 539) and AdvanceTask (line 491), UnixMilli() is used for similar timestamp fields. For consistency, should this also be UnixMilli() unless microsecond precision is specifically required here?

Context for Agents
[**BestPractice**]

In `GetTaskByName`, `NextRunAt` is converted using `UnixMicro()`. However, in `PeekScheduleByCollectionId` (line 539) and `AdvanceTask` (line 491), `UnixMilli()` is used for similar timestamp fields. For consistency, should this also be `UnixMilli()` unless microsecond precision is specifically required here?

File: go/pkg/sysdb/coordinator/task.go
Line: 210

Copy link
Contributor Author

tanujnay112 commented Oct 21, 2025

3 Jobs Failed:

PR checks / all-required-pr-checks-passed failed on "Decide whether the needed jobs succeeded or failed"
[...]
EOM
)"
shell: /usr/bin/bash --noprofile --norc -e -o pipefail {0}
env:
  GITHUB_REPO_NAME: chroma-core/chroma
  PYTHONPATH: /home/runner/_work/_actions/re-actors/alls-green/release/v1/src
# ❌ Some of the required to succeed jobs failed 😢😢😢

📝 Job statuses:
📝 python-tests → ✓ success [required to succeed or be skipped]
📝 python-vulnerability-scan → ✓ success [required to succeed or be skipped]
📝 javascript-client-tests → ✓ success [required to succeed or be skipped]
📝 rust-tests → ✓ success [required to succeed or be skipped]
📝 rust-feature-tests → ✓ success [required to succeed or be skipped]
📝 go-tests → ❌ failure [required to succeed or be skipped]
📝 lint → ❌ failure [required to succeed]
📝 check-helm-version-bump → ⬜ skipped [required to succeed or be skipped]
📝 delete-helm-comment → ✓ success [required to succeed or be skipped]
Error: Process completed with exit code 1.
PR checks / Go tests / cluster-test failed on "Run bin/cluster-test.sh bash -c 'cd go && make test'"
[...]
	have (uuid.UUID, uuid.UUID)
	want (uuid.UUID, uuid.UUID, int64, uint64)
Error: pkg/sysdb/metastore/db/dao/task_test.go:440:8: assignment mismatch: 1 variable but suite.Db.AdvanceTask returns 2 values
Error: pkg/sysdb/metastore/db/dao/task_test.go:440:37: not enough arguments in call to suite.Db.AdvanceTask
	have (uuid.UUID, uuid.UUID)
	want (uuid.UUID, uuid.UUID, int64, uint64)
Error: pkg/sysdb/metastore/db/dao/task_test.go:448:9: assignment mismatch: 1 variable but suite.Db.AdvanceTask returns 2 values
FAIL	github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dao [build failed]
	github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dao/daotest		coverage: 0.0% of statements
	github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbcore		coverage: 0.0% of statements
	github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel		coverage: 0.0% of statements
	github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel/mocks		coverage: 0.0% of statements
	github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3		coverage: 0.0% of statements
	github.com/chroma-core/chroma/go/pkg/types		coverage: 0.0% of statements
ok  	github.com/chroma-core/chroma/go/pkg/utils	1.058s	coverage: 31.2% of statements
	github.com/chroma-core/chroma/go/shared/libs		coverage: 0.0% of statements
	github.com/chroma-core/chroma/go/shared/otel		coverage: 0.0% of statements
FAIL
make: *** [Makefile:34: test] Error 1
Error: Process completed with exit code 2.
PR checks / Lint failed on "Clippy"
[...]
2170 | |             &mut sysdb,
2171 | |             output_collection_id,
2172 | |             &test_segments.blockfile_provider,
2173 | |         )
     | |_________^
     |
     = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#large_futures
help: consider `Box::pin` on it
     |
2169 ~         let total_count_2 = Box::pin(get_total_count_output(
2170 +             &mut sysdb,
2171 +             output_collection_id,
2172 +             &test_segments.blockfile_provider,
2173 +         ))
     |

error: could not compile `worker` (lib test) due to 3 previous errors
warning: build failed, waiting for other jobs to finish...
    Checking chroma-load v0.1.0 (/home/runner/_work/chroma/chroma/rust/load)
Error: Process completed with exit code 101.

Summary: 1 successful workflow, 1 failed workflow

Last updated: 2025-10-21 00:45:57 UTC

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants