Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Sort tasks when scheduler fetch tasks from storage #73

Merged
merged 3 commits into from
Mar 4, 2025

Conversation

sitaowang1998
Copy link
Collaborator

@sitaowang1998 sitaowang1998 commented Mar 4, 2025

Description

Right now scheduler sorts the tasks each time a schedule task request is served, which is inefficient.

This pr sorts tasks when scheduler fetch available tasks from storage, so that schedule task request can be served by just finding the first task which meets locality requirement. Scheduler also cache data locality info to avoid unnecessary storage query.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • GitHub workflows pass
  • Unit tests pass in dev container
  • Integration tests pass in dev container

Summary by CodeRabbit

  • Refactor

    • Enhanced the scheduling system to optimise task processing and improve performance.
    • Updated the task retrieval and selection processes for a more efficient and reliable experience.
  • Chore

    • Removed outdated caching components to streamline internal task management.
    • Eliminated the SchedulerTaskCache class and its related functionality.

@sitaowang1998 sitaowang1998 requested a review from a team as a code owner March 4, 2025 01:56
Copy link
Contributor

coderabbitai bot commented Mar 4, 2025

Walkthrough

This change removes the SchedulerTaskCache component from the spider scheduler. The build configuration in the CMakeLists.txt excludes its source files. In parallel, the FifoPolicy class (both implementation and header) has been substantially refactored by removing the get_next_task method, adding a fetch_tasks method, and updating task locality and caching logic via new member variables such as m_tasks and m_data_cache. The SchedulerTaskCache files have been completely removed from the source tree.

Changes

File(s) Change Summary
src/spider/CMakeLists.txt Removed scheduler/SchedulerTaskCache.cpp and scheduler/SchedulerTaskCache.hpp from the build sources.
src/spider/scheduler/FifoPolicy.cpp
src/spider/scheduler/FifoPolicy.hpp
Refactored the FifoPolicy class: updated task scheduling logic; updated task_locality_satisfied to use member variables; restructured schedule_next with reverse iterators and added a new fetch_tasks method; removed the get_next_task method and outdated cache member variables; introduced new members (m_tasks and m_data_cache).
src/spider/scheduler/SchedulerTaskCache.cpp
src/spider/scheduler/SchedulerTaskCache.hpp
Removed the SchedulerTaskCache class and its associated task cache management methods.

Sequence Diagram(s)

sequenceDiagram
    participant Worker
    participant FifoPolicy
    participant MetadataStore

    Worker->>FifoPolicy: schedule_next(worker_id, worker_addr)
    alt m_tasks is empty
        FifoPolicy->>FifoPolicy: fetch_tasks()
        FifoPolicy->>MetadataStore: Retrieve ready tasks
        MetadataStore-->>FifoPolicy: Return tasks list
    end
    FifoPolicy->>FifoPolicy: Evaluate task_locality_satisfied(task, worker_addr)
    FifoPolicy->>Worker: Return selected task UUID
Loading

Possibly related PRs

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/spider/scheduler/FifoPolicy.cpp (1)

100-133: Potential for more granular caching.
Clearing and refilling m_data_cache on every fetch_tasks call might force re-fetches for tasks already known.

Consider persisting relevant cache entries until they are invalidated, possibly improving performance further.

🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 116-116: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2d6163d and 0fc22a0.

📒 Files selected for processing (5)
  • src/spider/CMakeLists.txt (0 hunks)
  • src/spider/scheduler/FifoPolicy.cpp (4 hunks)
  • src/spider/scheduler/FifoPolicy.hpp (2 hunks)
  • src/spider/scheduler/SchedulerTaskCache.cpp (0 hunks)
  • src/spider/scheduler/SchedulerTaskCache.hpp (0 hunks)
💤 Files with no reviewable changes (3)
  • src/spider/CMakeLists.txt
  • src/spider/scheduler/SchedulerTaskCache.hpp
  • src/spider/scheduler/SchedulerTaskCache.cpp
🧰 Additional context used
🪛 Cppcheck (2.10-2)
src/spider/scheduler/FifoPolicy.cpp

[performance] 97-97: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


[performance] 116-116: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: non-storage-unit-tests (ubuntu-24.04)
  • GitHub Check: lint
  • GitHub Check: non-storage-unit-tests (ubuntu-22.04)
🔇 Additional comments (12)
src/spider/scheduler/FifoPolicy.cpp (9)

5-5: Header inclusion looks fine.
No issues found with adding <iterator> here.


10-10: Tuple header inclusion is acceptable.
No concerns regarding <tuple>.


13-13: Good use of absl::flat_hash_map.
Adopting a flat hash map can offer performance benefits.


25-25: Namespace remark
The new namespace usage is clear and consistent.


27-28: Locality check method is concise.
This refactoring improves cohesion by encapsulating locality checks in the FifoPolicy class.


38-47: Effective caching logic for input data.
Storing data in m_data_cache eliminates repeated storage queries, which may improve performance.


70-70: Initialization in constructor
Passing conn by reference in the member initializer list is suitable.


72-81: Efficient lazy fetching of tasks.
Fetching tasks only when needed can improve scheduling performance.


82-87: Reverse iteration approach looks correct.
Reversing the sorted vector ensures older tasks are scheduled first, aligning with FIFO.

src/spider/scheduler/FifoPolicy.hpp (3)

9-9: No immediate concerns with new absl header.
Using absl::flat_hash_map can improve lookup performance.


32-33: Helper methods properly scoped.
Introducing fetch_tasks and task_locality_satisfied as private methods enhances encapsulation.


40-42: Clear naming for member variables.
The addition of m_tasks and m_data_cache clarifies management of tasks and cached data within this policy.

Comment on lines 90 to 98
m_tasks.erase(it.base());
for (core::TaskInput const& input : it->get_inputs()) {
std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
if (data_id.has_value()) {
m_data_cache.erase(data_id.value());
}
}
return it->get_id();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential use-after-erase error and incorrect erase with reverse iterators.
Erasing with it.base() removes the wrong element in forward iteration, and continuing to reference it after erasing from m_tasks leads to undefined behaviour. Store the task in a local variable before erasing and use std::prev(it.base()) to remove the correct element:

-    m_tasks.erase(it.base());
-    for (core::TaskInput const& input : it->get_inputs()) {
-        std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
-        if (data_id.has_value()) {
-            m_data_cache.erase(data_id.value());
-        }
-    }
-    return it->get_id();
+    auto selected_task = *it;
+    m_tasks.erase(std::prev(it.base()));
+    for (core::TaskInput const& input : selected_task.get_inputs()) {
+        std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
+        if (data_id.has_value()) {
+            m_data_cache.erase(data_id.value());
+        }
+    }
+    return selected_task.get_id();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
m_tasks.erase(it.base());
for (core::TaskInput const& input : it->get_inputs()) {
std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
if (data_id.has_value()) {
m_data_cache.erase(data_id.value());
}
}
return it->get_id();
}
auto selected_task = *it;
m_tasks.erase(std::prev(it.base()));
for (core::TaskInput const& input : selected_task.get_inputs()) {
std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
if (data_id.has_value()) {
m_data_cache.erase(data_id.value());
}
}
return selected_task.get_id();
}
🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 97-97: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/spider/scheduler/FifoPolicy.cpp (3)

27-61: Improved task locality check with caching mechanism

The refactoring of task_locality_satisfied to use a cache improves performance by reducing repeated data access calls. The implementation is sound with proper error handling.

Consider these minor improvements:

-        if (false == m_data_store->get_data(m_conn, data_id, &data).success()) {
+        if (!m_data_store->get_data(m_conn, data_id, &data).success()) {

Also, adding a check for m_data_store nullness before dereferencing would make the code more robust.


116-130: Cache job metadata more efficiently

The lambda function get_task_job_creation_time has to perform two storage operations when a job's metadata isn't cached. Consider prefetching all job metadata for the tasks in a single batch operation to reduce the number of storage accesses.

Instead of fetching job metadata one by one, consider adding a bulk retrieval method to the metadata store that can fetch multiple job metadata entries at once. This would significantly reduce the number of storage operations when there are many tasks from different jobs.

🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 116-116: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


97-97: Potential performance optimization for erase operation

Line 97 uses std::next(it).base() for erasing the element, which is correct but could be optimized in some cases.

For standard containers like vector, you could store the element to be erased before finding it and then use the more direct std::erase algorithm:

-    m_tasks.erase(std::next(it).base());
+    m_tasks.erase(m_tasks.begin() + std::distance(reverse_end, it) - 1);

Or even better, if you switch to using indices instead of iterators, the erase operation becomes clearer.

🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 97-97: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0fc22a0 and e8ccbe8.

📒 Files selected for processing (1)
  • src/spider/scheduler/FifoPolicy.cpp (4 hunks)
🧰 Additional context used
🪛 Cppcheck (2.10-2)
src/spider/scheduler/FifoPolicy.cpp

[performance] 97-97: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


[performance] 116-116: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: non-storage-unit-tests (ubuntu-24.04)
  • GitHub Check: non-storage-unit-tests (ubuntu-22.04)
🔇 Additional comments (5)
src/spider/scheduler/FifoPolicy.cpp (5)

5-5: New includes support the refactored implementation

Good addition of necessary headers to support the implementation changes:

  • <iterator> for using iterator operations like std::next
  • <tuple> for task instance storage in fetch_tasks
  • <absl/container/flat_hash_map.h> for the job metadata cache implementation

Also applies to: 10-10, 13-13


70-70: Good use of initialization list in constructor

The member variable m_conn is properly initialized in the initialization list.


72-99: Fixed the previously identified use-after-erase issue in schedule_next

The implementation correctly uses reverse iterators to find tasks that satisfy locality requirements. The fix for the erase operation looks appropriate.

The erase operation now correctly uses std::next(it).base() to remove the task from m_tasks after capturing the task ID, which resolves the use-after-erase issue identified in the previous review.

🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 97-97: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


76-81: Efficient task fetching approach

Fetching tasks only when needed is an efficient approach that aligns with the PR objective of sorting tasks at fetch time rather than at each scheduling request.


131-133:

❓ Verification inconclusive

Consider optimizing the sorting operation

The current sorting approach calls get_task_job_creation_time twice for each comparison, which could lead to repeated lookups for the same tasks during sorting.


🏁 Script executed:

#!/bin/bash
# Verify the complexity of task metadata retrieval

# Check if there's a batch method for retrieving job metadata
echo "Checking for batch retrieval methods in MetadataStorage:"
rg -A 3 "get_job_metadata.*batch|batch.*get_job_metadata" --glob "*.hpp" --glob "*.cpp"

# Check sort implementation patterns in the codebase
echo "Checking for sort optimization patterns in the codebase:"
rg "sort.*\[.*\].*get_\w+.*\(" --glob "*.cpp"

Length of output: 383


Action Required: Review and optimise sorting performance in FifoPolicy.cpp

It appears that the sorting lambda calls get_task_job_creation_time twice for each comparison. Given that no existing batch retrieval method or sort optimisation pattern has been detected in the codebase, please verify whether these repeated calls could lead to a noticeable performance impact. If the task list grows large or if get_task_job_creation_time is computationally intensive, it might be beneficial to precompute the creation time for each task (or use a batch retrieval approach, should one exist in the future) and then sort using the cached values.

  • Location: src/spider/scheduler/FifoPolicy.cpp (Lines 131-133)
  • Suggestion: Refactor the comparison so that each task's creation time is computed once rather than repeatedly during sorting.

Comment on lines +101 to 134
auto FifoPolicy::fetch_tasks() -> void {
m_data_cache.clear();
m_metadata_store->get_ready_tasks(m_conn, &m_tasks);
std::vector<std::tuple<core::TaskInstance, core::Task>> instances;
m_metadata_store->get_task_timeout(m_conn, &instances);
for (auto const& [instance, task] : instances) {
m_tasks.emplace_back(task);
}

// Sort tasks based on job creation time in descending order.
// NOLINTNEXTLINE(misc-include-cleaner)
absl::flat_hash_map<boost::uuids::uuid, core::JobMetadata, std::hash<boost::uuids::uuid>>
job_metadata_map;
auto get_task_job_creation_time
= [&](boost::uuids::uuid const task_id) -> std::chrono::system_clock::time_point {
boost::uuids::uuid job_id;
if (false == m_metadata_store->get_task_job_id(m_conn, task_id, &job_id).success()) {
throw std::runtime_error(fmt::format("Task with id {} not exists.", to_string(task_id))
);
}
if (job_metadata_map.contains(job_id)) {
return job_metadata_map[job_id].get_creation_time();
}
core::JobMetadata job_metadata;
if (false == m_metadata_store->get_job_metadata(m_conn, job_id, &job_metadata).success()) {
throw std::runtime_error(fmt::format("Job with id {} not exists.", to_string(job_id)));
}
job_metadata_map[job_id] = job_metadata;
return job_metadata.get_creation_time();
};
std::ranges::sort(m_tasks, [&](core::Task const& a, core::Task const& b) {
return get_task_job_creation_time(a.get_id()) > get_task_job_creation_time(b.get_id());
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider error handling improvements in fetch_tasks

The new method for fetching and sorting tasks looks well-implemented, but there are potential error paths that could be handled more gracefully:

  1. The error handling in the job metadata retrieval throws exceptions if tasks or jobs don't exist. This could cause the entire scheduling operation to fail if there's data inconsistency. Consider:
        if (false == m_metadata_store->get_task_job_id(m_conn, task_id, &job_id).success()) {
-            throw std::runtime_error(fmt::format("Task with id {} not exists.", to_string(task_id))
-            );
+            // Log error and use a default timestamp
+            return std::chrono::system_clock::now();
        }
  1. Consider using a more efficient sorting approach by pre-computing job creation times to avoid multiple lookups for the same job during sorting.
// Pre-compute job creation times once for all tasks
std::unordered_map<boost::uuids::uuid, std::chrono::system_clock::time_point, std::hash<boost::uuids::uuid>> task_creation_times;
for (const auto& task : m_tasks) {
    task_creation_times[task.get_id()] = get_task_job_creation_time(task.get_id());
}

// Then sort using the pre-computed times
std::ranges::sort(m_tasks, [&](core::Task const& a, core::Task const& b) {
    return task_creation_times[a.get_id()] > task_creation_times[b.get_id()];
});
🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 116-116: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

@sitaowang1998 sitaowang1998 merged commit de086c5 into y-scope:main Mar 4, 2025
5 checks passed
@sitaowang1998 sitaowang1998 deleted the scheduler_sort branch March 4, 2025 03:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant