Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage mover port (#2039) #2127

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -305,6 +305,7 @@ set(arcticdb_srcs
storage/storage.hpp
storage/storage_override.hpp
storage/store.hpp
storage/storage_utils.hpp
stream/aggregator.hpp
stream/aggregator-inl.hpp
stream/append_map.hpp
@@ -325,6 +326,7 @@ set(arcticdb_srcs
stream/stream_utils.hpp
stream/stream_writer.hpp
toolbox/library_tool.hpp
toolbox/storage_mover.hpp
util/allocator.hpp
util/bitset.hpp
util/buffer.hpp
@@ -481,6 +483,7 @@ set(arcticdb_srcs
storage/s3/s3_storage.cpp
storage/s3/s3_storage_tool.cpp
storage/storage_factory.cpp
storage/storage_utils.cpp
stream/aggregator.cpp
stream/append_map.cpp
stream/index.cpp
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
@@ -51,11 +51,11 @@ class AsyncStore : public Store {
public:
AsyncStore(
std::shared_ptr<storage::Library> library,
const arcticdb::proto::encoding::VariantCodec &codec,
const proto::encoding::VariantCodec &codec,
EncodingVersion encoding_version
) :
library_(std::move(library)),
codec_(std::make_shared<arcticdb::proto::encoding::VariantCodec>(codec)),
codec_(std::make_shared<proto::encoding::VariantCodec>(codec)),
encoding_version_(encoding_version) {
}

4 changes: 2 additions & 2 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
@@ -300,13 +300,13 @@ inline auto& io_executor() {
}

template <typename Task>
inline auto submit_cpu_task(Task&& task) {
auto submit_cpu_task(Task&& task) {
return TaskScheduler::instance()->submit_cpu_task(std::forward<decltype(task)>(task));
}


template <typename Task>
inline auto submit_io_task(Task&& task) {
auto submit_io_task(Task&& task) {
return TaskScheduler::instance()->submit_io_task(std::forward<decltype(task)>(task));
}

4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/atom_key.hpp
Original file line number Diff line number Diff line change
@@ -91,8 +91,8 @@ class AtomKeyImpl {
}

friend bool operator<(const AtomKeyImpl &l, const AtomKeyImpl &r) {
auto lt = std::tie(l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_);
auto rt = std::tie(r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_);
const auto lt = std::tie(l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_);
const auto rt = std::tie(r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_);
return lt < rt;
}

4 changes: 4 additions & 0 deletions cpp/arcticdb/entity/key.cpp
Original file line number Diff line number Diff line change
@@ -89,6 +89,10 @@ KeyClass key_class_from_key_type(KeyType key_type) {
return get_key_data(key_type).key_class_;
}

const char* get_key_description(KeyType key_type) {
return get_key_data(key_type).description_;
}

bool is_string_key_type(KeyType key_type){
return variant_type_from_key_type(key_type) == VariantType::STRING_TYPE;
}
34 changes: 17 additions & 17 deletions cpp/arcticdb/entity/key.hpp
Original file line number Diff line number Diff line change
@@ -16,6 +16,10 @@
#include <memory>
#include <algorithm>
#include <variant>
#include <array>
#include <ranges>

namespace rng = std::ranges;

namespace arcticdb::entity {

@@ -191,10 +195,10 @@ enum class KeyType : int {
UNDEFINED
};

inline std::vector<KeyType> key_types_write_precedence() {
consteval auto key_types_write_precedence() {
// TOMBSTONE[_ALL] keys are not included because they're not written to the storage,
// they just exist inside version keys
return {
return std::array {
KeyType::LIBRARY_CONFIG,
KeyType::TABLE_DATA,
KeyType::TABLE_INDEX,
@@ -213,9 +217,9 @@ inline std::vector<KeyType> key_types_write_precedence() {
};
}

inline std::vector<KeyType> key_types_read_precedence() {
consteval auto key_types_read_precedence() {
auto output = key_types_write_precedence();
std::reverse(std::begin(output), std::end(output));
rng::reverse(output);
return output;
}

@@ -245,7 +249,7 @@ enum class VariantType : char {

VariantType variant_type_from_key_type(KeyType key_type);

inline bool is_index_key_type(KeyType key_type) {
constexpr bool is_index_key_type(KeyType key_type) {
// TODO: Change name probably.
return (key_type == KeyType::TABLE_INDEX) || (key_type == KeyType::MULTI_KEY);
}
@@ -256,30 +260,26 @@ bool is_ref_key_class(KeyType k);

bool is_block_ref_key_class(KeyType k);

inline KeyType get_key_type_for_data_stream(const StreamId &) {
constexpr KeyType get_key_type_for_data_stream(const StreamId &) {
return KeyType::TABLE_DATA;
}

inline KeyType get_key_type_for_index_stream(const StreamId &) {
constexpr KeyType get_key_type_for_index_stream(const StreamId &) {
return KeyType::TABLE_INDEX;
}

const char* get_key_description(KeyType type);

template <typename Function>
auto foreach_key_type_read_precedence(Function&& func) {
auto types = key_types_read_precedence();
for(auto type : types) {
func(KeyType(type));
}
constexpr auto foreach_key_type_read_precedence(Function&& func) {
rng::for_each(key_types_read_precedence(), func);
}

template <typename Function>
auto foreach_key_type_write_precedence(Function&& func) {
auto types = key_types_write_precedence();
for(auto type : types) {
func(KeyType(type));
}
constexpr auto foreach_key_type_write_precedence(Function&& func) {
rng::for_each(key_types_write_precedence(), func);
}

inline KeyType key_type_from_int(int type_num) {
util::check(type_num > 0 && type_num < int(KeyType::UNDEFINED), "Unrecognized key type number {}", type_num);
return KeyType(type_num);
4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/metrics.hpp
Original file line number Diff line number Diff line change
@@ -32,8 +32,8 @@ namespace arcticdb {

const std::string MONGO_INSTANCE_LABEL = "mongo_instance";
const std::string PROMETHEUS_ENV_LABEL = "env";
const int SUMMARY_MAX_AGE = 30;
const int SUMMARY_AGE_BUCKETS = 5;
constexpr int SUMMARY_MAX_AGE = 30;
constexpr int SUMMARY_AGE_BUCKETS = 5;

class MetricsConfig {
public:
147 changes: 147 additions & 0 deletions cpp/arcticdb/storage/storage_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/pipeline/index_writer.hpp>
#include <arcticdb/stream/index_aggregator.hpp>
#include <arcticdb/async/tasks.hpp>

namespace arcticdb {

std::vector<VariantKey> filter_keys_on_existence(
const std::vector<VariantKey>& keys,
const std::shared_ptr<Store>& store,
bool pred
){
auto key_existence = folly::collect(store->batch_key_exists(keys)).get();
std::vector<VariantKey> res;
for (size_t i = 0; i != keys.size(); i++) {
if (key_existence[i] == pred) {
res.push_back(keys[i]);
}
}
return res;
}

void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred) {
std::vector<VariantKey> var_vector;
var_vector.reserve(keys.size());
rng::copy(keys, std::back_inserter(var_vector));

auto key_existence = store->batch_key_exists(var_vector);

auto keys_itr = keys.begin();
for (size_t i = 0; i != var_vector.size(); i++) {
bool resolved = key_existence[i].wait().value();
if (resolved == pred) {
*keys_itr = std::move(std::get<AtomKey>(var_vector[i]));
++keys_itr;
}
}
keys.erase(keys_itr, keys.end());
}

AtomKey write_table_index_tree_from_source_to_target(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id
) {
ARCTICDB_SAMPLE(WriteIndexSourceToTarget, 0)
// In
auto [_, index_seg] = source_store->read_sync(index_key);
index::IndexSegmentReader index_segment_reader{std::move(index_seg)};
// Out
index::IndexWriter<stream::RowCountIndex> writer(target_store,
{index_key.id(), new_version_id.value_or(index_key.version_id())},
std::move(index_segment_reader.mutable_tsd()));
std::vector<folly::Future<async::CopyCompressedInterStoreTask::ProcessingResult>> futures;
// Process
for (auto iter = index_segment_reader.begin(); iter != index_segment_reader.end(); ++iter) {
auto& sk = *iter;
auto& key = sk.key();
std::optional<entity::AtomKey> key_to_write = atom_key_builder()
.version_id(new_version_id.value_or(key.version_id()))
.creation_ts(util::SysClock::nanos_since_epoch())
.start_index(key.start_index())
.end_index(key.end_index())
.content_hash(key.content_hash())
.build(key.id(), key.type());

writer.add(*key_to_write, sk.slice()); // Both const ref
futures.emplace_back(submit_io_task(async::CopyCompressedInterStoreTask{
sk.key(),
std::move(key_to_write),
false,
false,
source_store,
{target_store}}));
}
const std::vector<async::CopyCompressedInterStoreTask::ProcessingResult> store_results = collect(futures).get();
for (const async::CopyCompressedInterStoreTask::ProcessingResult& res: store_results) {
util::variant_match(
res,
[&](const async::CopyCompressedInterStoreTask::FailedTargets& failed) {
log::storage().error("Failed to move targets: {} from {} to {}", failed, source_store->name(), target_store->name());
},
[](const auto&){});
}
// FUTURE: clean up already written keys if exception
return to_atom(writer.commit().get());
}

AtomKey copy_multi_key_from_source_to_target(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id) {
using namespace arcticdb::stream;
auto fut_index = source_store->read(index_key);
auto [_, index_seg] = std::move(fut_index).get();
std::vector<AtomKey> keys;
for (size_t idx = 0; idx < index_seg.row_count(); idx++) {
keys.push_back(stream::read_key_row(index_seg, static_cast<ssize_t>(idx)));
}
// Recurse on the index keys inside MULTI_KEY
std::vector<VariantKey> new_data_keys;
for (const auto &k: keys) {
auto new_key = copy_index_key_recursively(source_store, target_store, k, new_version_id);
new_data_keys.emplace_back(std::move(new_key));
}
// Write new MULTI_KEY

folly::Future<VariantKey> multi_key_fut = folly::Future<VariantKey>::makeEmpty();
IndexAggregator<RowCountIndex> multi_index_agg(index_key.id(), [&new_version_id, &index_key, &multi_key_fut, &target_store](auto &&segment) {
multi_key_fut = target_store->write(KeyType::MULTI_KEY,
new_version_id.value_or(index_key.version_id()), // version_id
index_key.id(),
0, // start_index
0, // end_index
std::forward<SegmentInMemory>(segment)).wait();
});
for (auto &key: new_data_keys) {
multi_index_agg.add_key(to_atom(key));
}
if (index_seg.has_metadata()) {
google::protobuf::Any metadata = *index_seg.metadata();
multi_index_agg.set_metadata(std::move(metadata));
}
if (index_seg.has_index_descriptor()) {
multi_index_agg.set_timeseries_descriptor(index_seg.index_descriptor());
}
multi_index_agg.commit();
return to_atom(multi_key_fut.value());
}

AtomKey copy_index_key_recursively(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id) {
ARCTICDB_SAMPLE(RecurseIndexKey, 0)
if (index_key.type() == KeyType::TABLE_INDEX) {
return write_table_index_tree_from_source_to_target(source_store, target_store, index_key, new_version_id);
} else if (index_key.type() == KeyType::MULTI_KEY) {
return copy_multi_key_from_source_to_target(source_store, target_store, index_key, new_version_id);
}
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Cannot copy index recursively. Unsupported index key type {}", index_key.type());
}

}
41 changes: 8 additions & 33 deletions cpp/arcticdb/storage/storage_utils.hpp
Original file line number Diff line number Diff line change
@@ -18,38 +18,13 @@ inline auto stream_id_prefix_matcher(const std::string &prefix) {
std::get<std::string>(id).compare(0u, prefix.size(), prefix) == 0); };
}

inline std::vector<VariantKey> filter_keys_on_existence(
const std::vector<VariantKey>& keys,
const std::shared_ptr<Store>& store,
bool pred
){
auto key_existence = folly::collect(store->batch_key_exists(keys)).get();
std::vector<VariantKey> res;
for (size_t i = 0; i != keys.size(); i++) {
if (key_existence[i] == pred) {
res.push_back(keys[i]);
}
}
return res;
}

inline void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred) {
std::vector<VariantKey> var_vector;
var_vector.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(var_vector),
[](auto&& k) { return VariantKey(std::move(k)); });

auto key_existence = store->batch_key_exists(var_vector);

auto keys_itr = keys.begin();
for (size_t i = 0; i != var_vector.size(); i++) {
bool resolved = key_existence[i].wait().value();
if (resolved == pred) {
*keys_itr = std::move(std::get<AtomKey>(var_vector[i]));
++keys_itr;
}
}
keys.erase(keys_itr, keys.end());
}
std::vector<VariantKey> filter_keys_on_existence(const std::vector<VariantKey>& keys, const std::shared_ptr<Store>& store, bool pred);
void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred);

AtomKey copy_index_key_recursively(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id);

} //namespace arcticdb
8 changes: 8 additions & 0 deletions cpp/arcticdb/stream/index_aggregator.hpp
Original file line number Diff line number Diff line change
@@ -60,6 +60,10 @@ class FlatIndexingPolicy {
segment_.set_timeseries_descriptor(timeseries_descriptor);
}

void set_metadata(google::protobuf::Any&& metadata) {
segment_.set_metadata(std::move(metadata));
}

private:
Callback callback_;
FixedSchema schema_;
@@ -89,6 +93,10 @@ class IndexAggregator {
indexing_policy_.set_timeseries_descriptor(timeseries_descriptor);
}

void set_metadata(google::protobuf::Any&& metadata) {
indexing_policy_.set_metadata(std::move(metadata));
}

private:
IndexingPolicy indexing_policy_;
};
1 change: 0 additions & 1 deletion cpp/arcticdb/stream/segment_aggregator.hpp
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@
#include <arcticdb/pipeline/frame_utils.hpp>
#include <arcticdb/util/format_date.hpp>
#include <arcticdb/util/memory_tracing.hpp>
#include <arcticdb/pipeline/filter_segment.hpp>
#include <arcticdb/stream/merge_utils.hpp>

namespace arcticdb::stream {
Loading

Unchanged files with check annotations Beta