Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Sort tasks when scheduler fetch tasks from storage #73

Merged
merged 3 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ set(SPIDER_SCHEDULER_SOURCES
scheduler/SchedulerMessage.hpp
scheduler/SchedulerServer.cpp
scheduler/SchedulerServer.hpp
scheduler/SchedulerTaskCache.cpp
scheduler/SchedulerTaskCache.hpp
utils/StopToken.hpp
CACHE INTERNAL
"spider scheduler source files"
Expand Down
152 changes: 71 additions & 81 deletions src/spider/scheduler/FifoPolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

#include <algorithm>
#include <chrono>
#include <iterator>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <tuple>
#include <vector>

#include <absl/container/flat_hash_map.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fmt/format.h>
Expand All @@ -18,16 +21,11 @@
#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"
#include "../storage/StorageConnection.hpp"
#include "SchedulerTaskCache.hpp"

namespace {
namespace spider::scheduler {

auto task_locality_satisfied(
std::shared_ptr<spider::core::DataStorage> const& data_store,
spider::core::StorageConnection& conn,
spider::core::Task const& task,
std::string const& addr
) -> bool {
auto FifoPolicy::task_locality_satisfied(spider::core::Task const& task, std::string const& addr)
-> bool {
for (auto const& input : task.get_inputs()) {
if (input.get_value().has_value()) {
continue;
Expand All @@ -37,11 +35,16 @@ auto task_locality_satisfied(
continue;
}
boost::uuids::uuid const data_id = optional_data_id.value();
spider::core::Data data;
if (false == data_store->get_data(conn, data_id, &data).success()) {
throw std::runtime_error(
fmt::format("Data with id {} not exists.", boost::uuids::to_string((data_id)))
);
core::Data data;
if (m_data_cache.contains(data_id)) {
data = m_data_cache[data_id];
} else {
if (false == m_data_store->get_data(m_conn, data_id, &data).success()) {
throw std::runtime_error(
fmt::format("Data with id {} not exists.", to_string((data_id)))
);
}
m_data_cache.emplace(data_id, data);
}
if (false == data.is_hard_locality()) {
continue;
Expand All @@ -57,89 +60,76 @@ auto task_locality_satisfied(
return true;
}

} // namespace

namespace spider::scheduler {

FifoPolicy::FifoPolicy(
std::shared_ptr<core::MetadataStorage> const& metadata_store,
std::shared_ptr<core::DataStorage> const& data_store,
core::StorageConnection& conn
)
: m_metadata_store{metadata_store},
m_data_store{data_store},
m_conn{conn},
m_task_cache{
metadata_store,
data_store,
conn,
[&](std::vector<core::Task>& tasks,
boost::uuids::uuid const& worker_id,
std::string const& worker_addr) -> std::optional<boost::uuids::uuid> {
return get_next_task(tasks, worker_id, worker_addr);
}
} {}
m_conn{conn} {}

auto FifoPolicy::get_next_task(
std::vector<core::Task>& tasks,
boost::uuids::uuid const& /*worker_id*/,
auto FifoPolicy::schedule_next(
boost::uuids::uuid const /*worker_id*/,
std::string const& worker_addr
) -> std::optional<boost::uuids::uuid> {
std::erase_if(tasks, [this, worker_addr](core::Task const& task) -> bool {
return !task_locality_satisfied(m_data_store, m_conn, task, worker_addr);
if (m_tasks.empty()) {
fetch_tasks();
if (m_tasks.empty()) {
return std::nullopt;
}
}
auto const reverse_begin = std::reverse_iterator(m_tasks.end());
auto const reverse_end = std::reverse_iterator(m_tasks.begin());
auto const it = std::find_if(reverse_begin, reverse_end, [&](core::Task const& task) {
return task_locality_satisfied(task, worker_addr);
});

if (tasks.empty()) {
if (it == reverse_end) {
return std::nullopt;
}

auto const earliest_task = std::ranges::min_element(
tasks,
{},
[this](core::Task const& task) -> std::chrono::system_clock::time_point {
boost::uuids::uuid const task_id = task.get_id();
boost::uuids::uuid job_id;
std::optional<boost::uuids::uuid> const optional_job_id
= m_task_job_cache.get(task_id);
if (optional_job_id.has_value()) {
job_id = optional_job_id.value();
} else {
if (false
== m_metadata_store->get_task_job_id(m_conn, task_id, &job_id).success()) {
throw std::runtime_error(fmt::format(
"Task with id {} not exists.",
boost::uuids::to_string(task_id)
));
}
m_task_job_cache.put(task_id, job_id);
}

std::optional<std::chrono::system_clock::time_point> const optional_time
= m_job_time_cache.get(job_id);
if (optional_time.has_value()) {
return optional_time.value();
}

core::JobMetadata job_metadata;
if (false
== m_metadata_store->get_job_metadata(m_conn, job_id, &job_metadata).success())
{
throw std::runtime_error(fmt::format(
"Job with id {} not exists.",
boost::uuids::to_string(job_id)
));
}
m_job_time_cache.put(job_id, job_metadata.get_creation_time());
return job_metadata.get_creation_time();
}
);

return earliest_task->get_id();
m_tasks.erase(it.base());
for (core::TaskInput const& input : it->get_inputs()) {
std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
if (data_id.has_value()) {
m_data_cache.erase(data_id.value());
}
}
return it->get_id();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential use-after-erase error and incorrect erase with reverse iterators.
Erasing with it.base() removes the wrong element in forward iteration, and continuing to reference it after erasing from m_tasks leads to undefined behaviour. Store the task in a local variable before erasing and use std::prev(it.base()) to remove the correct element:

-    m_tasks.erase(it.base());
-    for (core::TaskInput const& input : it->get_inputs()) {
-        std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
-        if (data_id.has_value()) {
-            m_data_cache.erase(data_id.value());
-        }
-    }
-    return it->get_id();
+    auto selected_task = *it;
+    m_tasks.erase(std::prev(it.base()));
+    for (core::TaskInput const& input : selected_task.get_inputs()) {
+        std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
+        if (data_id.has_value()) {
+            m_data_cache.erase(data_id.value());
+        }
+    }
+    return selected_task.get_id();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
m_tasks.erase(it.base());
for (core::TaskInput const& input : it->get_inputs()) {
std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
if (data_id.has_value()) {
m_data_cache.erase(data_id.value());
}
}
return it->get_id();
}
auto selected_task = *it;
m_tasks.erase(std::prev(it.base()));
for (core::TaskInput const& input : selected_task.get_inputs()) {
std::optional<boost::uuids::uuid> const data_id = input.get_data_id();
if (data_id.has_value()) {
m_data_cache.erase(data_id.value());
}
}
return selected_task.get_id();
}
🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 97-97: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


auto FifoPolicy::schedule_next(boost::uuids::uuid const worker_id, std::string const& worker_addr)
-> std::optional<boost::uuids::uuid> {
return m_task_cache.get_ready_task(worker_id, worker_addr);
auto FifoPolicy::fetch_tasks() -> void {
m_data_cache.clear();
m_metadata_store->get_ready_tasks(m_conn, &m_tasks);
std::vector<std::tuple<core::TaskInstance, core::Task>> instances;
m_metadata_store->get_task_timeout(m_conn, &instances);
for (auto const& [instance, task] : instances) {
m_tasks.emplace_back(task);
}

// Sort tasks based on job creation time in descending order.
// NOLINTNEXTLINE(misc-include-cleaner)
absl::flat_hash_map<boost::uuids::uuid, core::JobMetadata, std::hash<boost::uuids::uuid>>
job_metadata_map;
auto get_task_job_creation_time
= [&](boost::uuids::uuid const task_id) -> std::chrono::system_clock::time_point {
boost::uuids::uuid job_id;
if (false == m_metadata_store->get_task_job_id(m_conn, task_id, &job_id).success()) {
throw std::runtime_error(fmt::format("Task with id {} not exists.", to_string(task_id))
);
}
if (job_metadata_map.contains(job_id)) {
return job_metadata_map[job_id].get_creation_time();
}
core::JobMetadata job_metadata;
if (false == m_metadata_store->get_job_metadata(m_conn, job_id, &job_metadata).success()) {
throw std::runtime_error(fmt::format("Job with id {} not exists.", to_string(job_id)));
}
job_metadata_map[job_id] = job_metadata;
return job_metadata.get_creation_time();
};
std::ranges::sort(m_tasks, [&](core::Task const& a, core::Task const& b) {
return get_task_job_creation_time(a.get_id()) > get_task_job_creation_time(b.get_id());
});
}
Comment on lines +101 to 134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider error handling improvements in fetch_tasks

The new method for fetching and sorting tasks looks well-implemented, but there are potential error paths that could be handled more gracefully:

  1. The error handling in the job metadata retrieval throws exceptions if tasks or jobs don't exist. This could cause the entire scheduling operation to fail if there's data inconsistency. Consider:
        if (false == m_metadata_store->get_task_job_id(m_conn, task_id, &job_id).success()) {
-            throw std::runtime_error(fmt::format("Task with id {} not exists.", to_string(task_id))
-            );
+            // Log error and use a default timestamp
+            return std::chrono::system_clock::now();
        }
  1. Consider using a more efficient sorting approach by pre-computing job creation times to avoid multiple lookups for the same job during sorting.
// Pre-compute job creation times once for all tasks
std::unordered_map<boost::uuids::uuid, std::chrono::system_clock::time_point, std::hash<boost::uuids::uuid>> task_creation_times;
for (const auto& task : m_tasks) {
    task_creation_times[task.get_id()] = get_task_job_creation_time(task.get_id());
}

// Then sort using the pre-computed times
std::ranges::sort(m_tasks, [&](core::Task const& a, core::Task const& b) {
    return task_creation_times[a.get_id()] > task_creation_times[b.get_id()];
});
🧰 Tools
🪛 Cppcheck (2.10-2)

[performance] 116-116: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


} // namespace spider::scheduler
19 changes: 6 additions & 13 deletions src/spider/scheduler/FifoPolicy.hpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
#ifndef SPIDER_SCHEDULER_FIFOPOLICY_HPP
#define SPIDER_SCHEDULER_FIFOPOLICY_HPP

#include <chrono>
#include <memory>
#include <optional>
#include <string>
#include <vector>

#include <absl/container/flat_hash_map.h>
#include <boost/uuid/uuid.hpp>

#include "../core/Task.hpp"
#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"
#include "../storage/StorageConnection.hpp"
#include "../utils/LruCache.hpp"
#include "SchedulerPolicy.hpp"
#include "SchedulerTaskCache.hpp"

namespace spider::scheduler {

Expand All @@ -25,28 +23,23 @@ class FifoPolicy final : public SchedulerPolicy {
std::shared_ptr<core::MetadataStorage> const& metadata_store,
std::shared_ptr<core::DataStorage> const& data_store,
core::StorageConnection& conn

);

auto schedule_next(boost::uuids::uuid worker_id, std::string const& worker_addr)
-> std::optional<boost::uuids::uuid> override;

private:
auto get_next_task(
std::vector<core::Task>& tasks,
boost::uuids::uuid const& worker_id,
std::string const& worker_addr
) -> std::optional<boost::uuids::uuid>;
auto fetch_tasks() -> void;
auto task_locality_satisfied(core::Task const& task, std::string const& addr) -> bool;

std::shared_ptr<core::MetadataStorage> m_metadata_store;
std::shared_ptr<core::DataStorage> m_data_store;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
core::StorageConnection& m_conn;

SchedulerTaskCache m_task_cache;

core::LruCache<boost::uuids::uuid, boost::uuids::uuid> m_task_job_cache;
core::LruCache<boost::uuids::uuid, std::chrono::system_clock::time_point> m_job_time_cache;
std::vector<core::Task> m_tasks;
// NOLINTNEXTLINE(misc-include-cleaner)
absl::flat_hash_map<boost::uuids::uuid, core::Data, std::hash<boost::uuids::uuid>> m_data_cache;
};

} // namespace spider::scheduler
Expand Down
99 changes: 0 additions & 99 deletions src/spider/scheduler/SchedulerTaskCache.cpp

This file was deleted.

Loading
Loading