Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Enhancement 8277989680: symbol concatenation poc #2142

Draft
wants to merge 36 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e4d94e3
Minimal class and methods defined
alexowens90 Jan 20, 2025
cc89694
Remove C++ changes
alexowens90 Jan 20, 2025
b533622
Reworked Python layer
alexowens90 Jan 20, 2025
28eda08
Revert whitespace change
alexowens90 Jan 20, 2025
e71b5eb
Sketch out API for reading multiple symbols with a joi
alexowens90 Jan 20, 2025
08126c3
Wiring down to local_version_engine.cpp
alexowens90 Jan 21, 2025
d9107e5
Use Poly Clause from VersionStoreApi level downwards
alexowens90 Jan 21, 2025
634247a
Batch read and get set of entity IDs on a single ComponentManager for…
alexowens90 Jan 21, 2025
4aafe67
Fix schedule_clause_processing when there are zero clauses
alexowens90 Jan 22, 2025
ec189c7
Produce one SegmentInMemory from output of multiple symbol's pipelines
alexowens90 Jan 22, 2025
b5e10a0
First, most basic, test passing
alexowens90 Jan 22, 2025
400facf
Refactor schedule_remaining_iterations into own method
alexowens90 Jan 22, 2025
82b2290
Added ConcatClause
alexowens90 Jan 24, 2025
727cd34
Fix ConcatClause::structure_for_processing
alexowens90 Jan 24, 2025
139f3af
First end-to-end test passing!
alexowens90 Jan 24, 2025
548e5b7
Removed TODOs
alexowens90 Jan 27, 2025
f466fe2
Verify that clause for join is multi-symbol
alexowens90 Jan 27, 2025
096ff09
Added failing test with column slicing
alexowens90 Jan 27, 2025
4b4cad6
Simplify column slicing test
alexowens90 Jan 27, 2025
d8de667
Remove unneeded segments in ConcatClause::structure_for_processing
alexowens90 Jan 27, 2025
7718bec
Column slicing test passing
alexowens90 Jan 27, 2025
f089c7c
Refactor to only call ComponentManager::replace_entities once
alexowens90 Jan 27, 2025
fb46e03
Test with row slicing
alexowens90 Jan 27, 2025
70a67fa
Use folly::enumerate over index-based loop
alexowens90 Jan 27, 2025
0fd27e3
Improve schema checking
alexowens90 Jan 27, 2025
91c0d57
Extra tests for non-identical columns between symbols
alexowens90 Jan 29, 2025
cf13b8f
Test exception thrown with mixed index types
alexowens90 Jan 29, 2025
3dc2020
Test concatenating multiindexed data
alexowens90 Jan 29, 2025
e9f5eeb
Test pickled data behaviour
alexowens90 Jan 29, 2025
9ed8d7c
Test date_range provided via read arg and via clause
alexowens90 Jan 29, 2025
7c7e52f
Add failing test with column slicing
alexowens90 Jan 29, 2025
4280c56
Return list of symbol/version pairs of input symbols when joining
alexowens90 Jan 30, 2025
24b62e2
Per-symbol metadatas returned to user
alexowens90 Jan 30, 2025
98b261f
Refactor to not separate ConcatClause from other clauses
alexowens90 Jan 30, 2025
7722ebb
Add QueryBuilder syntax
alexowens90 Jan 30, 2025
831b364
Unpin threadpool core counts
alexowens90 Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cpp/arcticdb/entity/read_result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ namespace arcticdb {

struct ARCTICDB_VISIBILITY_HIDDEN ReadResult {
ReadResult(
const VersionedItem& versioned_item,
const std::variant<VersionedItem, std::vector<VersionedItem>>& versioned_item,
pipelines::PythonOutputFrame&& frame_data,
const arcticdb::proto::descriptors::NormalizationMetadata& norm_meta,
const arcticdb::proto::descriptors::UserDefinedMetadata& user_meta,
const std::variant<arcticdb::proto::descriptors::UserDefinedMetadata, std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>& user_meta,
const arcticdb::proto::descriptors::UserDefinedMetadata& multi_key_meta,
std::vector<entity::AtomKey>&& multi_keys) :
item(versioned_item),
Expand All @@ -35,19 +35,20 @@ struct ARCTICDB_VISIBILITY_HIDDEN ReadResult {
multi_keys(std::move(multi_keys)) {

}
VersionedItem item;
std::variant<VersionedItem, std::vector<VersionedItem>> item;
pipelines::PythonOutputFrame frame_data;
arcticdb::proto::descriptors::NormalizationMetadata norm_meta;
arcticdb::proto::descriptors::UserDefinedMetadata user_meta;
std::variant<arcticdb::proto::descriptors::UserDefinedMetadata, std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>> user_meta;
arcticdb::proto::descriptors::UserDefinedMetadata multi_key_meta;
std::vector <entity::AtomKey> multi_keys;

ARCTICDB_MOVE_ONLY_DEFAULT(ReadResult)
};

inline ReadResult create_python_read_result(
const VersionedItem& version,
FrameAndDescriptor&& fd) {
const std::variant<VersionedItem, std::vector<VersionedItem>>& version,
FrameAndDescriptor&& fd,
std::optional<std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>&& user_meta = std::nullopt) {
auto result = std::move(fd);
// Very old (pre Nov-2020) PandasIndex protobuf messages had no "start" or "step" fields. If is_physically_stored
// (renamed from is_not_range_index) was false, the index was always RangeIndex(num_rows, 1)
Expand All @@ -73,7 +74,13 @@ inline ReadResult create_python_read_result(
util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__);

const auto& desc_proto = result.desc_.proto();
std::variant<arcticdb::proto::descriptors::UserDefinedMetadata, std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>> metadata;
if (user_meta.has_value()) {
metadata = *user_meta;
} else {
metadata = desc_proto.user_meta();
}
return {version, std::move(python_frame), desc_proto.normalization(),
desc_proto.user_meta(), desc_proto.multi_key_meta(), std::move(result.keys_)};
metadata, desc_proto.multi_key_meta(), std::move(result.keys_)};
}
} //namespace arcticdb
54 changes: 53 additions & 1 deletion cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
SegmentInMemory seg;
RowRange output_row_range(row_slices.front().row_ranges_->at(0)->start(),
row_slices.front().row_ranges_->at(0)->start() + output_index_column->row_count());
ColRange output_col_range(1, aggregators_.size() + 1);
seg.add_column(scalar_field(DataType::NANOSECONDS_UTC64, index_column_name), output_index_column);
seg.descriptor().set_index(IndexDescriptorImpl(1, IndexDescriptor::Type::TIMESTAMP));
auto& string_pool = seg.string_pool();
Expand Down Expand Up @@ -675,7 +676,7 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
seg.add_column(scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value), aggregated_column);
}
seg.set_row_data(output_index_column->row_count() - 1);
return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range)));
return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range), std::move(output_col_range)));
}

template<ResampleBoundary closed_boundary>
Expand Down Expand Up @@ -1217,4 +1218,55 @@ std::string DateRangeClause::to_string() const {
return fmt::format("DATE RANGE {} - {}", start_, end_);
}

ConcatClause::ConcatClause() {
clause_info_.input_structure_ = ProcessingStructure::MULTI_SYMBOL;
clause_info_.multi_symbol_ = true;
}

std::vector<std::vector<EntityId>> ConcatClause::structure_for_processing(std::vector<std::vector<EntityId>>&& entity_ids_vec) {
// Similar logic to RowRangeClause::structure_for_processing but as input row ranges come from multiple symbols it is slightly different
std::vector<RangesAndEntity> ranges_and_entities;
std::vector<std::shared_ptr<RowRange>> new_row_ranges;
bool first_range{true};
size_t prev_range_end{0};
for (const auto& entity_ids: entity_ids_vec) {
auto [old_row_ranges, col_ranges] = component_manager_->get_entities<std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(entity_ids, false);
// Map from old row ranges WITHIN THIS SYMBOL to new ones
std::map<RowRange, RowRange> row_range_mapping;
for (const auto& row_range: old_row_ranges) {
// Value is same as key initially
row_range_mapping.insert({*row_range, *row_range});
}
for (auto& [old_range, new_range]: row_range_mapping) {
if (first_range) {
// Make the first row-range start from zero
new_range.first = 0;
new_range.second = old_range.diff();
first_range = false;
} else {
new_range.first = prev_range_end;
new_range.second = new_range.first + old_range.diff();
}
prev_range_end = new_range.second;
}

for (size_t idx=0; idx<entity_ids.size(); ++idx) {
auto new_row_range = std::make_shared<RowRange>(row_range_mapping.at(*old_row_ranges[idx]));
ranges_and_entities.emplace_back(entity_ids[idx], new_row_range, col_ranges[idx]);
new_row_ranges.emplace_back(std::move(new_row_range));
}
}
component_manager_->replace_entities<std::shared_ptr<RowRange>>(flatten_entities(std::move(entity_ids_vec)), new_row_ranges);
auto new_structure_offsets = structure_by_row_slice(ranges_and_entities);
return offsets_to_entity_ids(new_structure_offsets, ranges_and_entities);
}

std::vector<EntityId> ConcatClause::process(std::vector<EntityId>&& entity_ids) const {
return std::move(entity_ids);
}

std::string ConcatClause::to_string() const {
return "CONCAT";
}

}
33 changes: 33 additions & 0 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,4 +669,37 @@ struct DateRangeClause {
[[nodiscard]] std::string to_string() const;
};

struct ConcatClause {

ClauseInfo clause_info_;
std::shared_ptr<ComponentManager> component_manager_;

ConcatClause();

ARCTICDB_MOVE_COPY_DEFAULT(ConcatClause)

[[nodiscard]] std::vector<std::vector<size_t>> structure_for_processing(ARCTICDB_UNUSED std::vector<RangesAndKey>& ranges_and_keys) {
// TODO: This isn't a long term requirement, for simplicity at the moment we force all input symbols through their own
// processing pipeline right now, even if they have no clauses
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("ConcatClause should never be first in the pipeline");
}

[[nodiscard]] std::vector<std::vector<EntityId>> structure_for_processing(std::vector<std::vector<EntityId>>&& entity_ids_vec);

[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] const ClauseInfo& clause_info() const {
return clause_info_;
}

void set_processing_config(ARCTICDB_UNUSED const ProcessingConfig& processing_config) {
}

void set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = component_manager;
}

[[nodiscard]] std::string to_string() const;
};

}//namespace arcticdb
5 changes: 4 additions & 1 deletion cpp/arcticdb/processing/clause_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ enum class ProcessingStructure {
ROW_SLICE,
TIME_BUCKETED,
HASH_BUCKETED,
ALL
ALL,
MULTI_SYMBOL
};

struct KeepCurrentIndex{};
Expand All @@ -52,6 +53,8 @@ struct ClauseInfo {
std::variant<KeepCurrentIndex, KeepCurrentTopLevelIndex, NewIndex> index_{KeepCurrentIndex()};
// Whether this clause modifies the output descriptor
bool modifies_output_descriptor_{false};
// Whether this clause operates on one or multiple symbols
bool multi_symbol_{false};
};

// Changes how the clause behaves based on information only available after it is constructed
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/processing/query_planner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ using ClauseVariant = std::variant<std::shared_ptr<FilterClause>,
std::shared_ptr<ResampleClause<ResampleBoundary::LEFT>>,
std::shared_ptr<ResampleClause<ResampleBoundary::RIGHT>>,
std::shared_ptr<RowRangeClause>,
std::shared_ptr<DateRangeClause>>;
std::shared_ptr<DateRangeClause>,
std::shared_ptr<ConcatClause>>;

std::vector<ClauseVariant> plan_query(std::vector<ClauseVariant>&& clauses);

Expand Down
13 changes: 12 additions & 1 deletion cpp/arcticdb/python/adapt_read_dataframe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ namespace arcticdb {

inline auto adapt_read_df = [](ReadResult && ret) -> py::tuple{
auto pynorm = python_util::pb_to_python(ret.norm_meta);
auto pyuser_meta = python_util::pb_to_python(ret.user_meta);
auto pyuser_meta = util::variant_match(
ret.user_meta,
[](const arcticdb::proto::descriptors::UserDefinedMetadata& metadata) -> py::object {
return python_util::pb_to_python(metadata);
},
[](const std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>& metadatas) -> py::object {
py::list py_metadatas;
for (const auto& metadata: metadatas) {
py_metadatas.append(python_util::pb_to_python(metadata));
}
return py_metadatas;
});
auto multi_key_meta = python_util::pb_to_python(ret.multi_key_meta);
return py::make_tuple(ret.item, std::move(ret.frame_data), pynorm, pyuser_meta, multi_key_meta, ret.multi_keys);
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/python/python_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ inline py::list adapt_read_dfs(std::vector<std::variant<ReadResult, DataError>>&
res,
[&lst] (ReadResult& read_result) {
auto pynorm = python_util::pb_to_python(read_result.norm_meta);
auto pyuser_meta = python_util::pb_to_python(read_result.user_meta);
auto pyuser_meta = python_util::pb_to_python(std::get<arcticdb::proto::descriptors::UserDefinedMetadata>(read_result.user_meta));
auto multi_key_meta = python_util::pb_to_python(read_result.multi_key_meta);
lst.append(py::make_tuple(read_result.item, std::move(read_result.frame_data), pynorm, pyuser_meta, multi_key_meta,
read_result.multi_keys));
Expand Down
118 changes: 118 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,124 @@ std::vector<std::variant<ReadVersionOutput, DataError>> LocalVersionedEngine::ba
return read_versions_or_errors;
}

StreamDescriptor descriptor_from_segments(std::vector<SliceAndKey>& slice_and_keys) {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(!slice_and_keys.empty(), "No slice and keys in batch_read_with_join_internal");
std::sort(std::begin(slice_and_keys), std::end(slice_and_keys), [] (const auto& left, const auto& right) {
return std::tie(left.slice_.row_range, left.slice_.col_range) < std::tie(right.slice_.row_range, right.slice_.col_range);
});
// TODO: Move this check to <join clause>::structure_for_processing, as it could be different for different clauses
// TODO: Make more memory efficient by only keeping first element in memory, and comparing others on the fly
// Track the field collections for each row-slice
std::vector<FieldCollection> field_collections;
RowRange current_row_range(std::numeric_limits<size_t>::max(), std::numeric_limits<size_t>::max());
for (const auto& slice_and_key: slice_and_keys) {
const auto& desc = slice_and_key.segment_->descriptor();
if (slice_and_key.slice().rows() != current_row_range) {
field_collections.emplace_back();
for (size_t field_idx = 0; field_idx < desc.data().index().field_count(); ++field_idx) {
field_collections.back().add(desc.fields().ref_at(field_idx));
}
current_row_range = slice_and_key.slice().rows();
}
for (size_t field_idx = desc.data().index().field_count(); field_idx < desc.field_count(); ++field_idx) {
field_collections.back().add(desc.fields().ref_at(field_idx));
}
}
// Ensure they are all the same
// TODO: Relax this requirement:
// - columns in different orders
// - columns of different but promotable types
// - extra/missing columns in different row slices
for (size_t idx = 1; idx < field_collections.size(); ++idx) {
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
field_collections[0] == field_collections[idx],
"Mismatching fields in multi-symbol join: {} != {}",
field_collections[0],
field_collections[idx]
);
}

// Set the output descriptor based on the first row slice
StreamDescriptor res = slice_and_keys[0].segment_->descriptor().clone();
res.fields_ = std::make_shared<FieldCollection>(std::move(field_collections[0]));

return res;
}

MultiSymbolReadOutput LocalVersionedEngine::batch_read_with_join_internal(
const std::vector<StreamId>& stream_ids,
const std::vector<VersionQuery>& version_queries,
std::vector<std::shared_ptr<ReadQuery>>& read_queries,
const ReadOptions& read_options,
std::vector<std::shared_ptr<Clause>>&& clauses,
std::any& handler_data) {
py::gil_scoped_release release_gil;
auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries);
std::vector<folly::Future<std::vector<EntityId>>> symbol_entities_futs;
symbol_entities_futs.reserve(opt_index_key_futs.size());
auto component_manager = std::make_shared<ComponentManager>();
auto pipeline_context = std::make_shared<PipelineContext>();
auto norm_meta_mtx = std::make_shared<std::mutex>();
// TODO: Generate these in a less hacky way
auto res_versioned_items = std::make_shared<std::vector<VersionedItem>>(stream_ids.size());
auto res_metadatas = std::make_shared<std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>(stream_ids.size());
DecodePathData shared_data;
for (auto&& [idx, opt_index_key_fut]: folly::enumerate(opt_index_key_futs)) {
symbol_entities_futs.emplace_back(
std::move(opt_index_key_fut).thenValue([store = store(),
read_query = read_queries.empty() ? std::make_shared<ReadQuery>(): read_queries[idx],
idx,
&read_options,
&component_manager,
pipeline_context,
norm_meta_mtx,
res_versioned_items,
res_metadatas](auto&& opt_index_key) mutable {
std::variant<VersionedItem, StreamId> version_info;
// TODO: Add support for symbols that only have incomplete segments
internal::check<ErrorCode::E_ASSERTION_FAILURE>(opt_index_key.has_value(), "batch_read_with_join_internal not supported with non-indexed data");
// TODO: Only read the index segment once
auto index_key_seg = store->read_sync(*opt_index_key).second;
{
std::lock_guard<std::mutex> lock(*norm_meta_mtx);
// TODO: Construct this at the end of the pipeline, instead of reusing input data
pipeline_context->norm_meta_ = std::make_unique<arcticdb::proto::descriptors::NormalizationMetadata>(std::move(*index_key_seg.mutable_index_descriptor().mutable_proto().mutable_normalization()));
}
// Shouldn't need mutex protecting as vectors are presized and each element is only accessed once
res_versioned_items->at(idx) = VersionedItem(*opt_index_key);
res_metadatas->at(idx) = index_key_seg.mutable_index_descriptor().user_metadata();
version_info = VersionedItem(std::move(*opt_index_key));
return read_entity_ids_for_version(store, version_info, read_query, read_options, component_manager);
})
);
}
auto entity_ids_vec_fut = folly::collect(symbol_entities_futs).via(&async::io_executor());
for (auto& clause: clauses) {
clause->set_component_manager(component_manager);
}
auto clauses_ptr = std::make_shared<std::vector<std::shared_ptr<Clause>>>(std::move(clauses));

return schedule_remaining_iterations(std::move(entity_ids_vec_fut), clauses_ptr, true)
.thenValueInline([component_manager](std::vector<EntityId>&& processed_entity_ids) {
auto proc = gather_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(*component_manager, std::move(processed_entity_ids));
return collect_segments(std::move(proc));
}).thenValueInline([store=store(), &handler_data, pipeline_context](std::vector<SliceAndKey>&& slice_and_keys) {
// TODO: Make constructing the output descriptor a standard end of pipeline operation
pipeline_context->set_descriptor(descriptor_from_segments(slice_and_keys));
return prepare_output_frame(std::move(slice_and_keys), pipeline_context, store, ReadOptions{}, handler_data);
}).thenValueInline([&handler_data, pipeline_context, shared_data, res_versioned_items, res_metadatas](SegmentInMemory&& frame) mutable {
return reduce_and_fix_columns(pipeline_context, frame, ReadOptions{}, handler_data)
.thenValue([pipeline_context, frame, shared_data, res_versioned_items, res_metadatas](auto&&) mutable {
return MultiSymbolReadOutput{std::move(*res_versioned_items),
std::move(*res_metadatas),
{frame,
timeseries_descriptor_from_pipeline_context(pipeline_context, {}, pipeline_context->bucketize_dynamic_),
{},
shared_data.buffers()}};
});
}).get();
}

void LocalVersionedEngine::write_version_and_prune_previous(
bool prune_previous_versions,
const AtomKey& new_version,
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ class LocalVersionedEngine : public VersionedEngine {
const ReadOptions& read_options,
std::any& handler_data);

MultiSymbolReadOutput batch_read_with_join_internal(
const std::vector<StreamId>& stream_ids,
const std::vector<VersionQuery>& version_queries,
std::vector<std::shared_ptr<ReadQuery>>& read_queries,
const ReadOptions& read_options,
std::vector<std::shared_ptr<Clause>>&& clauses,
std::any& handler_data);

std::vector<std::variant<DescriptorItem, DataError>> batch_read_descriptor_internal(
const std::vector<StreamId>& stream_ids,
const std::vector<VersionQuery>& version_queries,
Expand Down
Loading
Loading