Skip to content

Commit

Permalink
Limit scope of storage connection in worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sitaowang1998 committed Mar 4, 2025
1 parent fe17823 commit 9e10cff
Showing 1 changed file with 40 additions and 25 deletions.
65 changes: 40 additions & 25 deletions src/spider/worker/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,39 +230,44 @@ auto task_loop(
std::optional<boost::uuids::uuid> fail_task_id = std::nullopt;
while (!stop_token.stop_requested()) {
boost::asio::io_context context;
std::variant<spider::core::MySqlConnection, spider::core::StorageErr> conn_result
= spider::core::MySqlConnection::create(metadata_store->get_url());
if (std::holds_alternative<spider::core::StorageErr>(conn_result)) {
spdlog::error(
"Failed to connection to storage: {}",
std::get<spider::core::StorageErr>(conn_result).description
);
continue;
}
auto& conn = std::get<spider::core::MySqlConnection>(conn_result);

auto const [task_id, task_instance_id] = fetch_task(client, fail_task_id);
spider::core::TaskInstance const instance{task_instance_id, task_id};
spdlog::debug("Fetched task {}", boost::uuids::to_string(task_id));
fail_task_id = std::nullopt;
// Fetch task detail from metadata storage
spider::core::Task task{""};
spider::core::StorageErr err = metadata_store->get_task(conn, task_id, &task);
if (!err.success()) {
spdlog::error("Failed to fetch task detail: {}", err.description);
continue;
}
std::optional<std::vector<msgpack::sbuffer>> optional_args_buffers;
spider::core::StorageErr err;

{
// Keep the scope of RAII storage connection
std::variant<spider::core::MySqlConnection, spider::core::StorageErr> conn_result
= spider::core::MySqlConnection::create(metadata_store->get_url());
if (std::holds_alternative<spider::core::StorageErr>(conn_result)) {
spdlog::error(
"Failed to connection to storage: {}",
std::get<spider::core::StorageErr>(conn_result).description
);
continue;
}
auto& conn = std::get<spider::core::MySqlConnection>(conn_result);
err = metadata_store->get_task(conn, task_id, &task);
if (!err.success()) {
spdlog::error("Failed to fetch task detail: {}", err.description);
continue;
}

// Set up arguments
std::optional<std::vector<msgpack::sbuffer>> const optional_args_buffers
= get_args_buffers(task);
if (!optional_args_buffers.has_value()) {
metadata_store->task_fail(
conn,
instance,
fmt::format("Task {} failed to parse arguments", task.get_function_name())
);
continue;
// Set up arguments
optional_args_buffers = get_args_buffers(task);
if (!optional_args_buffers.has_value()) {
metadata_store->task_fail(
conn,
instance,
fmt::format("Task {} failed to parse arguments", task.get_function_name())
);
continue;
}
}
std::vector<msgpack::sbuffer> const& args_buffers = optional_args_buffers.value();

Expand All @@ -280,6 +285,16 @@ auto task_loop(
context.run();
executor.wait();

std::variant<spider::core::MySqlConnection, spider::core::StorageErr> conn_result
= spider::core::MySqlConnection::create(metadata_store->get_url());
if (std::holds_alternative<spider::core::StorageErr>(conn_result)) {
spdlog::error(
"Failed to connection to storage: {}",
std::get<spider::core::StorageErr>(conn_result).description
);
continue;
}
auto& conn = std::get<spider::core::MySqlConnection>(conn_result);
if (!executor.succeed()) {
spdlog::warn("Task {} failed", task.get_function_name());
metadata_store->task_fail(
Expand Down

0 comments on commit 9e10cff

Please sign in to comment.