Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add worker that request tasks from scheduler and execute tasks #38

Merged
merged 4 commits into from
Dec 18, 2024

Conversation

sitaowang1998
Copy link
Collaborator

@sitaowang1998 sitaowang1998 commented Dec 18, 2024

Description

As title. Fix an infrequent bug in scheduler server unit test that connection fails by sleeping after server restart and add more server restart.

Validation performed

  • GitHub workflows pass.
  • All unit tests pass in devcontainer.

Summary by CodeRabbit

  • New Features

    • Added a new method for handling task failures in multiple storage classes.
    • Introduced enhanced result handling and serialization capabilities in the FunctionManager.
    • Implemented a task loop for managing the task execution lifecycle in the worker.
    • Enhanced logging and error handling capabilities across various components.
    • Introduced a warmup time constant to improve server stability in tests.
  • Bug Fixes

    • Improved error handling in task scheduling and connection processes.
  • Documentation

    • Updated tests to enhance reliability and clarity.
  • Tests

    • Added new constants and updated assertions in test cases to improve accuracy and maintainability.

Copy link

coderabbitai bot commented Dec 18, 2024

Walkthrough

This pull request introduces several enhancements to the Spider project, focusing on task management, error handling, and serialization across multiple components. Key modifications include updates to the metadata storage interface, MySQL storage implementation, function management, task execution, and worker client functionality. The changes improve error handling, logging, and flexibility in processing tasks and results, with a particular emphasis on supporting more complex serialization and result retrieval mechanisms.

Changes

File Change Summary
src/spider/CMakeLists.txt Added utils/StopToken.hpp to SPIDER_WORKER_SOURCES and SPIDER_SCHEDULER_SOURCES
src/spider/storage/MetadataStorage.hpp Added virtual task_fail method for handling task failures
src/spider/storage/MysqlStorage.cpp Implemented task_fail method and regex-based URL parsing in connect methods
src/spider/storage/MysqlStorage.hpp Added task_fail method for MySqlMetadataStorage
src/spider/worker/FunctionManager.hpp Enhanced result handling with new template-based serialization methods
src/spider/worker/TaskExecutor.cpp Updated completed() and wait() methods; added get_result_buffers() method
src/spider/worker/TaskExecutor.hpp Added new constructor, get_result_buffers method, and modified synchronization mechanism
src/spider/worker/WorkerClient.cpp Updated get_next_task method for improved socket management and error handling
src/spider/worker/WorkerClient.hpp Removed m_context member variable
src/spider/worker/worker.cpp Introduced modular task processing with improved error handling and logging
tests/* Updated test cases to reflect new serialization and result handling methods

Possibly Related PRs


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1b16fbf and 41fcd5b.

📒 Files selected for processing (1)
  • tests/scheduler/test-SchedulerServer.cpp (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/scheduler/test-SchedulerServer.cpp

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (2)
src/spider/worker/TaskExecutor.hpp (1)

Line range hint 38-74: Consider reducing code duplication and ensuring exception safety

  1. The constructor has complex logic (spawning processes, sending messages). If an exception arises (e.g., environment variables missing, etc.), ensuring the partial construction state is properly handled is crucial.
  2. The portion that builds arguments and calls create_args_request closely mirrors logic in the second constructor. Extracting a common helper method would reduce code repetition.
src/spider/CMakeLists.txt (1)

Line range hint 98-98: Remove duplicate StopToken.hpp inclusion

Once moved to core headers, this duplicate inclusion can be removed.

-    utils/StopToken.hpp
🧹 Nitpick comments (17)
src/spider/worker/task_executor.cpp (3)

60-63: Consider externalizing logging configuration

While the logging setup is comprehensive, consider these improvements:

  1. Move the logging pattern to a configuration file for easier maintenance
  2. Standardize the component name format (e.g., "spider.executor" instead of "spider][executor")
-    spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] [spider][executor] %v");
+    spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] [spider.executor] %v");

89-90: Enhance debug logging with task context

Consider adding more context to help with debugging, such as the libraries loaded and any relevant task metadata.

-    spdlog::debug("Function to run: {}", func_name);
+    spdlog::debug("Executing task - function: {}, loaded libraries: {}", func_name, fmt::join(libs, ", "));

Line range hint 92-133: Add task execution safeguards

The task execution could benefit from additional safety measures:

  1. Implement a timeout mechanism for long-running tasks
  2. Add memory limits for result buffers
  3. Consider adding task cancellation support

Would you like me to provide an example implementation of these safeguards?

src/spider/worker/TaskExecutor.hpp (2)

76-109: Duplicate code for process initialization
The constructor replicates the same overall logic as the variadic template constructor, differing mainly in the way arguments are prepared. Consider consolidating the duplicated process setup (m_process, co_spawn, etc.) into a shared function.


133-133: Returning potentially large result buffers
get_result_buffers() returns a copy of the underlying vector. If these sbuffers can be large, consider returning a reference or a smart pointer to avoid extra copies. However, if performance usage is minimal, this approach is acceptable.

src/spider/worker/FunctionManager.hpp (1)

Line range hint 199-218: create_result_response enhancements
The extension to handle tuples is cleanly implemented. Similarly to the tuple-based get_result approach, consider the readability and test coverage for multi-field returns.

src/spider/worker/worker.cpp (3)

2-75: parse_args and environment setup
The parse_args function is straightforward. If the application can handle optional parameters or defaults, consider descriptive usage text in the --help option for better clarity.


121-132: fetch_task infinite loop
fetch_task uses a while(true) with a sleep of cFetchTaskTimeout. Validate that an exceptionally long or permanent scheduler outage will not cause side effects. You may consider a backoff or a maximum wait limit.


194-289: task_loop concurrency & error handling

  1. The approach is robust in marking tasks as running/fail/success.
  2. The while(!stop_requested()) loop could lead to indefinite waits if the scheduler stops sending tasks but does not trigger a stop_token. Consider a graceful shutdown mechanism.
  3. The metadata_store->task_fail call transitions tasks to 'fail' only after an instance. Ensure that repeated partial failures are covered if the system restarts.
src/spider/storage/MysqlStorage.cpp (2)

11-11: Potential large includes
Consider whether is the most efficient approach for URL parsing. If performance is an issue, a manual parse or a simpler library might be more efficient; if not, this is fine.


246-260: Regex-based JDBC URL parsing
This block checks user/password from the URL. Good approach. Consider adding more robust checking for edge cases, e.g., URL with multiple parameters.

tests/worker/worker-test.cpp (1)

8-8: Consider using a more test-appropriate logging approach.

Writing to standard error during tests could interfere with test runners and continuous integration systems. Consider these alternatives:

  • Use Catch2's built-in INFO macro for test-specific logging
  • Create a dedicated test logger that can be controlled via test configuration
-    std::cerr << x << " + " << y << " = " << x + y << "\n";
+    INFO("Computing sum: " << x << " + " << y << " = " << x + y);
src/spider/worker/WorkerClient.cpp (1)

73-74: Consider connection pooling for better performance.

Creating a new io_context and socket for each task request could impact performance, especially under high load. Consider implementing a connection pool to reuse connections to schedulers.

tests/worker/test-FunctionManager.cpp (1)

34-38: Improve test readability with constant and explicit serialization

The introduction of cExpected and explicit msgpack serialization enhances test clarity and maintainability. However, consider extracting the msgpack serialization into a test helper function for reuse across test cases.

+namespace {
+template<typename T>
+auto serialize_expected(T const& value) -> msgpack::sbuffer {
+    msgpack::sbuffer buffer{};
+    msgpack::pack(buffer, value);
+    return buffer;
+}
+}  // namespace

 constexpr int cExpected = 2 + 3;
 msgpack::sbuffer const result = (*function)(args_buffers);
-msgpack::sbuffer buffer{};
-msgpack::pack(buffer, cExpected);
+auto const expected_buffer = serialize_expected(cExpected);
tests/scheduler/test-SchedulerServer.cpp (1)

28-29: Consider documenting the rationale for the warmup time value

The 5ms warmup time constant helps address server connection failures, but it would be beneficial to document why this specific duration was chosen.

 constexpr int cServerWarmupTime = 5;
+// Allow 5ms for server to stabilize after state changes to prevent connection failures
src/spider/worker/TaskExecutor.cpp (2)

55-60: Consider adding timeout to condition variable wait

While the synchronization is correct, consider adding a timeout to prevent indefinite waiting.

-    m_complete_cv.wait(lock, [this] {
+    if (!m_complete_cv.wait_for(lock, std::chrono::seconds(30), [this] {
         return TaskExecutorState::Succeed == m_state || TaskExecutorState::Error == m_state
                || TaskExecutorState::Cancelled == m_state;
-    });
+    })) {
+        m_state = TaskExecutorState::Error;
+        core::create_error_buffer(
+            core::FunctionInvokeError::FunctionExecutionError,
+            "Task execution timed out",
+            m_result_buffer
+        );
+    }

91-96: Ensure consistent state transition patterns

The state transitions and notifications follow a consistent pattern across different response types. Consider extracting the common pattern into a helper method to reduce code duplication.

+    private:
+    void transition_state(TaskExecutorState new_state, const msgpack::sbuffer& response) {
+        {
+            std::lock_guard const lock(m_state_mutex);
+            m_state = new_state;
+            m_result_buffer.write(response.data(), response.size());
+        }
+        m_complete_cv.notify_all();
+    }

Also applies to: 102-107, 111-116

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 88f6232 and 570f7fb.

📒 Files selected for processing (15)
  • src/spider/CMakeLists.txt (1 hunks)
  • src/spider/storage/MetadataStorage.hpp (1 hunks)
  • src/spider/storage/MysqlStorage.cpp (5 hunks)
  • src/spider/storage/MysqlStorage.hpp (1 hunks)
  • src/spider/worker/FunctionManager.hpp (5 hunks)
  • src/spider/worker/TaskExecutor.cpp (5 hunks)
  • src/spider/worker/TaskExecutor.hpp (4 hunks)
  • src/spider/worker/WorkerClient.cpp (2 hunks)
  • src/spider/worker/WorkerClient.hpp (0 hunks)
  • src/spider/worker/task_executor.cpp (4 hunks)
  • src/spider/worker/worker.cpp (1 hunks)
  • tests/scheduler/test-SchedulerServer.cpp (3 hunks)
  • tests/worker/test-FunctionManager.cpp (2 hunks)
  • tests/worker/test-MessagePipe.cpp (1 hunks)
  • tests/worker/worker-test.cpp (1 hunks)
💤 Files with no reviewable changes (1)
  • src/spider/worker/WorkerClient.hpp
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/spider/worker/worker.cpp

[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


[performance] 107-107: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

🔇 Additional comments (22)
src/spider/worker/TaskExecutor.hpp (3)

4-4: Include looks fine
No concerns here; the inclusion of <condition_variable> is appropriate for the synchronization need.


129-129: Ensure thread-safe read of result buffer
Marking get_result() as const is a good addition, but confirm that m_result_buffer access is safe if called concurrently while the task is still running. If concurrency could occur, consider locking or clarifying usage expectations for calling this method.


141-141: m_complete_cv declared but usage not shown
Ensure that m_complete_cv is properly used—paired with a unique_lock on m_state_mutex and wait/notify calls. Otherwise, it may be superfluous.

src/spider/worker/FunctionManager.hpp (4)

Line range hint 108-125: Check exception handling
The function response_get_result() appropriately handles msgpack::type_error exceptions, returning nullopt. Verify that your calling code logs these issues or otherwise handles unexpected null returns gracefully.


132-159: Flexible tuple deserialization
The multiple-type tuple support is a solid addition. The for_n helper is well utilized to unpack the data. Keep in mind that large tuples can degrade readability; ensure call sites handle these results carefully.


161-197: Use of spdlog::error is helpful
Logging errors for incorrect response types or sizes is excellent for debugging. Double-check that these logs do not risk exposing sensitive information in production.


240-251: Concatenate argument buffers carefully
Creating a single msgpack array from many buffers is powerful. However, ensure all call sites provide valid, non-empty buffers, as any partial or empty buffer would be directly appended. Safeguard or clarify usage.

src/spider/worker/worker.cpp (4)

77-119: heartbeat_loop's retry mechanism
The fail_count logic is clear. Just ensure that failing the heartbeat due to transient conditions won't repeatedly kill the worker. Currently, cRetryCount is 5, which seems reasonable.

🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


[performance] 107-107: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


134-160: Preparation of argument buffers
The logic in get_args_buffers is direct but be cautious with raw string writes for large or binary data. If inputs can be large or embedded with null bytes, consider a more specialized encoding rather than .write(value.data(), value.size()).


162-192: parse_outputs handles multiple returns
Good approach for processing each result buffer. For Data type parsing, watch for potential mismatch between expected and actual binary content in result_buffers[i].


293-384: main function
The registration of the worker, creation of threads for heartbeat and tasks, and final join is a standard approach. Make sure the worker receives a graceful cancellation signal so that threads exit cleanly.

src/spider/storage/MysqlStorage.cpp (2)

1088-1123: task_fail method

  1. Removing the task instance first, then conditionally marking the task as fail is a good design for concurrency.
  2. Logging with spdlog::error is helpful. If a partial DB rollback is needed, confirm the rollback state if an exception occurs mid-logic.

1314-1329: MySqlDataStorage::connect
Again, your approach matches the MySqlMetadataStorage connect method. Ensure that connecting without credentials is safe and that the pointer is always set if the connection is successful.

tests/worker/test-MessagePipe.cpp (1)

47-47: LGTM! Improved type safety in response handling.

The change from tuple-based to individual type parameters provides better type safety and clearer intent.

src/spider/worker/WorkerClient.cpp (2)

96-98: LGTM! Robust task ID validation.

The added null check for task ID improves the robustness of the response handling.


73-74: Verify existing connection management patterns.

Let's check if there are similar connection management patterns in the codebase that we could leverage.

✅ Verification successful

Let me gather more specific information about the connection management patterns.


Let me check the actual implementation of SchedulerServer and WorkerClient to understand the connection management patterns better.


The current socket management approach is appropriate for this use case.

The local context and socket creation in WorkerClient is correct because:

  1. The WorkerClient establishes short-lived connections to schedulers for task scheduling
  2. Each connection is used for a single request-response cycle and then closed
  3. The SchedulerServer also follows this pattern by accepting connections and processing them independently

The code follows a consistent pattern across the codebase where transient connections are used for individual message exchanges, rather than maintaining long-lived connections or connection pools.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for connection pooling or socket management patterns
echo "Searching for connection pooling patterns..."
rg -l "io_context|tcp::socket" --type cpp

echo "Searching for socket reuse patterns..."
ast-grep --pattern 'class $_ {
  $$$
  boost::asio::io_context
  $$$
}'

Length of output: 759


Script:

#!/bin/bash
# Check BoostAsio.hpp for any connection management patterns
echo "Checking BoostAsio.hpp..."
cat src/spider/io/BoostAsio.hpp

# Check TaskExecutor implementation for connection handling
echo -e "\nChecking TaskExecutor implementation..."
cat src/spider/worker/TaskExecutor.hpp

# Check SchedulerServer for connection management
echo -e "\nChecking SchedulerServer implementation..."
cat src/spider/scheduler/SchedulerServer.hpp

Length of output: 9488


Script:

#!/bin/bash
# Check SchedulerServer and WorkerClient implementations
echo "Checking SchedulerServer.cpp..."
cat src/spider/scheduler/SchedulerServer.cpp

echo -e "\nChecking WorkerClient.cpp..."
cat src/spider/worker/WorkerClient.cpp

Length of output: 8827

tests/worker/test-FunctionManager.cpp (1)

70-73: LGTM: Improved type safety in tuple result handling

The change to use separate template parameters for response_get_result improves type safety by making the expected types explicit.

src/spider/storage/MetadataStorage.hpp (1)

53-54: LGTM: Well-designed task failure handling interface

The new task_fail virtual method provides a clean interface for handling task failures, maintaining consistency with existing error handling patterns. The method appropriately takes both the task instance and error message as parameters.

tests/scheduler/test-SchedulerServer.cpp (1)

57-58: Verify if the warmup time is sufficient across different environments

While the warmup time helps prevent connection failures, consider implementing a more robust solution:

  1. Add logging when connections fail
  2. Consider implementing a retry mechanism with exponential backoff

Also applies to: 86-87

✅ Verification successful

The previous git log command wasn't suitable as it's resource-intensive. Let's examine the current codebase state to understand the connection handling and warmup time implementation.


The current warmup time implementation appears adequate for test scenarios

The codebase shows:

  • A constant warmup time of 5ms is used consistently across test cases
  • No historical connection failures or errors are evident in the test file
  • The connection is made using boost::asio, which has built-in error handling
  • This is a test-only concern and doesn't affect production code

Since this is specifically for test scenarios and there's no evidence of connection failures, the current implementation is sufficient. Adding retry mechanisms would unnecessarily complicate the test code.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for any existing connection-related test failures
git log -p | rg -B 5 -A 5 "connection.*fail|connect.*error" tests/scheduler/test-SchedulerServer.cpp

Length of output: 0


Script:

#!/bin/bash
# Search for the warmup time constant and its usage
rg -A 2 "cServerWarmupTime" --type cpp

# Look for any test failures or connection issues in test logs
rg "fail|error|connect" tests/scheduler/test-SchedulerServer.cpp

# Find any retry mechanisms or connection handling patterns
ast-grep --pattern 'connect($$$)' --lang cpp

Length of output: 889

src/spider/worker/TaskExecutor.cpp (2)

22-23: LGTM: Proper handling of cancelled state

The addition of TaskExecutorState::Cancelled to completion check ensures proper task lifecycle management.


127-129: LGTM: Clean implementation of result buffer retrieval

The new method properly delegates to the core functionality while maintaining const correctness.

src/spider/storage/MysqlStorage.hpp (1)

57-57: LGTM: Clean interface extension for task failure handling

The addition of task_fail method properly extends the storage interface to handle task failure scenarios, which is essential for robust task execution in the worker implementation.

✅ Verification successful

Verified: Task failure handling is properly implemented across storage layers

The task_fail method is correctly declared as a pure virtual function in the MetadataStorage base class and properly overridden in MysqlStorage. The method signatures match exactly, ensuring consistent task failure handling across the storage implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify consistent implementation across storage types
ast-grep --pattern 'class $_ : public MetadataStorage {
  $$$
  auto task_fail($_, $_) -> StorageErr override;
  $$$
}'

Length of output: 121


Script:

#!/bin/bash
# First, let's find all storage implementation files
fd "Storage\.(hpp|cpp)$"

# Then, let's check for the base class definition
rg "class\s+MetadataStorage" -A 5

# Finally, let's search for task_fail method declarations
rg "task_fail.*StorageErr" -A 1

Length of output: 1315

@@ -105,6 +111,7 @@ auto main(int const argc, char** argv) -> int {
msgpack::sbuffer args_buffer;
msgpack::packer packer{args_buffer};
packer.pack(args_object);
spdlog::debug("Args buffer parsed");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance execution logging for better observability

The current execution logging could be more comprehensive to aid in debugging and monitoring:

  1. Add execution timing
  2. Log result buffer size
  3. Add more detailed error logging
+    auto start_time = std::chrono::steady_clock::now();
     spdlog::debug("Args buffer parsed");
     // ... execution ...
-    spdlog::debug("Function executed");
+    auto end_time = std::chrono::steady_clock::now();
+    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
+    spdlog::debug("Function executed - duration: {}ms, result size: {} bytes", 
+                  duration.count(), result_buffer.size());

Also applies to: 130-130

@@ -51,6 +51,7 @@ set(SPIDER_WORKER_SOURCES
worker/message_pipe.hpp
worker/WorkerClient.hpp
worker/WorkerClient.cpp
utils/StopToken.hpp
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider moving shared utilities to core component

StopToken.hpp is used by both worker and scheduler components, suggesting it's a core utility. Consider moving it to the SPIDER_CORE_HEADERS list to avoid duplication and better reflect its shared nature.

-    utils/StopToken.hpp

Add to SPIDER_CORE_HEADERS:

 set(SPIDER_CORE_HEADERS
     core/Error.hpp
     core/Data.hpp
+    utils/StopToken.hpp
     ...
 )

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
src/spider/worker/worker.cpp (2)

50-50: Consider making retry count configurable

The hardcoded retry count might not be suitable for all deployment environments. Consider making this configurable through command-line arguments or environment variables.

-constexpr int cRetryCount = 5;
+// Read from environment variable with a default value
+constexpr int cRetryCount = []() {
+    if (const char* env = std::getenv("SPIDER_RETRY_COUNT")) {
+        return std::stoi(env);
+    }
+    return 5;
+}();

362-362: Consider periodic refresh of environment variables

Environment variables are only fetched once at startup. If they change during runtime (e.g., PATH updates), these changes won't be reflected.

Consider implementing a periodic refresh mechanism or watching for environment changes if this is a requirement for your deployment environment.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 570f7fb and 1b16fbf.

📒 Files selected for processing (3)
  • src/spider/scheduler/scheduler.cpp (1 hunks)
  • src/spider/worker/task_executor.cpp (4 hunks)
  • src/spider/worker/worker.cpp (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/spider/scheduler/scheduler.cpp
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/spider/worker/task_executor.cpp
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/spider/worker/worker.cpp

[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


[performance] 107-107: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

Comment on lines +123 to +132
auto fetch_task(spider::worker::WorkerClient& client) -> boost::uuids::uuid {
spdlog::debug("Fetching task");
while (true) {
std::optional<boost::uuids::uuid> const optional_task_id = client.get_next_task();
if (optional_task_id.has_value()) {
return optional_task_id.value();
}
std::this_thread::sleep_for(std::chrono::milliseconds(cFetchTaskTimeout));
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add timeout handling to fetch_task

The infinite loop in fetch_task could potentially block indefinitely. Consider adding a maximum retry count or timeout mechanism.

-auto fetch_task(spider::worker::WorkerClient& client) -> boost::uuids::uuid {
+auto fetch_task(
+    spider::worker::WorkerClient& client,
+    std::chrono::seconds timeout = std::chrono::seconds(300)
+) -> std::optional<boost::uuids::uuid> {
     spdlog::debug("Fetching task");
+    auto start_time = std::chrono::steady_clock::now();
     while (true) {
         std::optional<boost::uuids::uuid> const optional_task_id = client.get_next_task();
         if (optional_task_id.has_value()) {
             return optional_task_id.value();
         }
+        if (std::chrono::steady_clock::now() - start_time > timeout) {
+            spdlog::warn("Timeout while fetching task");
+            return std::nullopt;
+        }
         std::this_thread::sleep_for(std::chrono::milliseconds(cFetchTaskTimeout));
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto fetch_task(spider::worker::WorkerClient& client) -> boost::uuids::uuid {
spdlog::debug("Fetching task");
while (true) {
std::optional<boost::uuids::uuid> const optional_task_id = client.get_next_task();
if (optional_task_id.has_value()) {
return optional_task_id.value();
}
std::this_thread::sleep_for(std::chrono::milliseconds(cFetchTaskTimeout));
}
}
auto fetch_task(
spider::worker::WorkerClient& client,
std::chrono::seconds timeout = std::chrono::seconds(300)
) -> std::optional<boost::uuids::uuid> {
spdlog::debug("Fetching task");
auto start_time = std::chrono::steady_clock::now();
while (true) {
std::optional<boost::uuids::uuid> const optional_task_id = client.get_next_task();
if (optional_task_id.has_value()) {
return optional_task_id.value();
}
if (std::chrono::steady_clock::now() - start_time > timeout) {
spdlog::warn("Timeout while fetching task");
return std::nullopt;
}
std::this_thread::sleep_for(std::chrono::milliseconds(cFetchTaskTimeout));
}
}

return cStorageErr;
}

spider::core::StopToken stop_token;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add signal handling for graceful shutdown

The application should handle SIGTERM/SIGINT signals to ensure graceful shutdown of running tasks.

+    std::signal(SIGTERM, [](int) {
+        spdlog::info("Received SIGTERM, initiating graceful shutdown");
+        stop_token.request_stop();
+    });
+    std::signal(SIGINT, [](int) {
+        spdlog::info("Received SIGINT, initiating graceful shutdown");
+        stop_token.request_stop();
+    });

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +242 to +243
context.run();
executor.wait();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add timeout handling to task execution

The blocking wait on task execution could potentially hang indefinitely. Consider adding a timeout mechanism.

-        context.run();
-        executor.wait();
+        auto future = std::async(std::launch::async, [&]() {
+            context.run();
+            executor.wait();
+        });
+        
+        if (future.wait_for(std::chrono::minutes(30)) == std::future_status::timeout) {
+            spdlog::error("Task execution timed out");
+            metadata_store->task_fail(
+                instance,
+                fmt::format("Task {} timed out", task.get_function_name())
+            );
+            continue;
+        }

Committable suggestion skipped: line range outside the PR's diff.

@sitaowang1998 sitaowang1998 merged commit 9169591 into y-scope:main Dec 18, 2024
4 checks passed
@sitaowang1998 sitaowang1998 deleted the worker branch December 18, 2024 07:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant