Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resampling using dynamic schema [Part 1] #2201

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#ifdef __APPLE__
#include <cstdio>
#endif
#include <folly/Function.h>
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>

Expand Down
15 changes: 8 additions & 7 deletions cpp/arcticdb/column_store/column_data.hpp
Copy link
Collaborator Author

@vasil-pashov vasil-pashov Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline is the default when the implementation is inside a class/struct body, typename is not needed.

Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,18 @@ struct ColumnData {
ssize_t idx_{0};
RawType* ptr_{nullptr};

[[nodiscard]] inline ssize_t idx() const {
Enumeration() = default;

[[nodiscard]] ssize_t idx() const {
return idx_;
}

inline RawType& value() {
RawType& value() {
debug::check<ErrorCode::E_ASSERTION_FAILURE>(ptr_ != nullptr, "Dereferencing nullptr in enumerating ColumnDataIterator");
return *ptr_;
};

inline const RawType& value() const {
const RawType& value() const {
debug::check<ErrorCode::E_ASSERTION_FAILURE>(ptr_ != nullptr, "Dereferencing nullptr in enumerating ColumnDataIterator");
return *ptr_;
};
Expand All @@ -192,14 +194,14 @@ struct ColumnData {
};

template <class T, IteratorType iterator_type>
using IteratorValueType_t = typename std::conditional_t<
using IteratorValueType_t = std::conditional_t<
iterator_type == IteratorType::ENUMERATED,
Enumeration<T>,
PointerWrapper<T>
>;

template <class T, IteratorType iterator_type, bool constant>
using IteratorReferenceType_t = typename std::conditional_t<
using IteratorReferenceType_t = std::conditional_t<
iterator_type == IteratorType::ENUMERATED,
std::conditional_t<constant, const Enumeration<T>, Enumeration<T>>,
std::conditional_t<constant, const T, T>
Expand All @@ -221,8 +223,7 @@ struct ColumnData {
using base_type = base_iterator_type<TDT, iterator_type, iterator_density, constant>;
using RawType = typename TDT::DataTypeTag::raw_type;
public:
ColumnDataIterator() = delete;

ColumnDataIterator() = default;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was intentional change. If ColumnDataIterator has a default constructor it implements the concept needed by most of the algorithms and classes in std::ranges. I used this in particular to create a std::ranges::subrange in sorted_aggregation.cpp and avoid passing iterators around.

// Used to construct [c]begin iterators
explicit ColumnDataIterator(ColumnData* parent):
parent_(parent)
Expand Down
44 changes: 44 additions & 0 deletions cpp/arcticdb/entity/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,48 @@ std::size_t data_type_size(const TypeDescriptor& td, OutputFormat output_format,
return mode == DataTypeMode::EXTERNAL ? external_data_type_size(td, output_format) : internal_data_type_size(td);
}

TypeDescriptor FieldRef::type() const {
return type_;
}

std::string_view FieldRef::name() const {
return name_;
}

bool operator==(const FieldRef &left, const FieldRef &right) {
return left.type_ == right.type_ && left.name_ == right.name_;
}

FieldWrapper::FieldWrapper(TypeDescriptor type, std::string_view name) :
data_(Field::calc_size(name)) {
mutable_field().set(type, name);
}

const Field& FieldWrapper::field() const {
return *reinterpret_cast<const Field *>(data_.data());
}

const TypeDescriptor& FieldWrapper::type() const {
return field().type();
}

std::string_view FieldWrapper::name() const {
return field().name();
}

Field& FieldWrapper::mutable_field() {
return *reinterpret_cast<Field *>(data_.data());
}

FieldRef scalar_field(DataType type, std::string_view name) {
return {TypeDescriptor{type, Dimension::Dim0}, name};
}

bool operator==(const Field &l, const Field &r) {
return l.type() == r.type() && l.name() == r.name();
}

bool operator!=(const Field &l, const Field &r) {
return !(l == r);
}
} // namespace arcticdb
52 changes: 11 additions & 41 deletions cpp/arcticdb/entity/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,17 +611,9 @@ struct FieldRef {
TypeDescriptor type_;
std::string_view name_;

[[nodiscard]] TypeDescriptor type() const {
return type_;
}

[[nodiscard]] std::string_view name() const {
return name_;
}

friend bool operator==(const FieldRef &left, const FieldRef &right) {
return left.type_ == right.type_ && left.name_ == right.name_;
}
[[nodiscard]] TypeDescriptor type() const;
[[nodiscard]] std::string_view name() const;
friend bool operator==(const FieldRef &left, const FieldRef &right);
};

#pragma pack(push, 1)
Expand Down Expand Up @@ -694,45 +686,23 @@ struct Field {

struct FieldWrapper {
std::vector<uint8_t> data_;
FieldWrapper(TypeDescriptor type, std::string_view name) :
data_(Field::calc_size(name)) {
mutable_field().set(type, name);
}

const Field &field() const {
return *reinterpret_cast<const Field *>(data_.data());
}

const TypeDescriptor& type() const {
return field().type();
}

const std::string_view name() const {
return field().name();
}

FieldWrapper(TypeDescriptor type, std::string_view name);
const Field &field() const;
const TypeDescriptor& type() const;
std::string_view name() const;
private:
Field &mutable_field() {
return *reinterpret_cast<Field *>(data_.data());
}
Field &mutable_field();
};

inline FieldRef scalar_field(DataType type, std::string_view name) {
return {TypeDescriptor{type, Dimension::Dim0}, name};
}
FieldRef scalar_field(DataType type, std::string_view name);

template<typename Callable>
auto visit_field(const Field &field, Callable &&c) {
return field.type().visit_tag(std::forward<Callable>(c));
}

inline bool operator==(const Field &l, const Field &r) {
return l.type() == r.type() && l.name() == r.name();
}

inline bool operator!=(const Field &l, const Field &r) {
return !(l == r);
}
bool operator==(const Field &l, const Field &r);
bool operator!=(const Field &l, const Field &r);

} // namespace entity

Expand Down
30 changes: 20 additions & 10 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
#include <arcticdb/stream/segment_aggregator.hpp>
#include <arcticdb/util/test/random_throw.hpp>
#include <ankerl/unordered_dense.h>
#include <arcticdb/util/movable_priority_queue.hpp>
#include <ranges>



namespace arcticdb {

namespace ranges = std::ranges;
Expand Down Expand Up @@ -589,7 +588,7 @@ std::vector<std::vector<EntityId>> ResampleClause<closed_boundary>::structure_fo
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
idx < expected_fetch_counts.size(),
"Index {} in new_structure_offsets out of bounds >{}", idx, expected_fetch_counts.size() - 1);
expected_fetch_counts[idx]++;
++expected_fetch_counts[idx];
}
}
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
Expand Down Expand Up @@ -621,7 +620,7 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
// slice is being computed by the call to process dealing with the row slices above these. Otherwise, this call
// should do it
const auto& front_slice = row_slices.front();
bool responsible_for_first_overlapping_bucket = front_slice.entity_fetch_count_->at(0) == 1;
const bool responsible_for_first_overlapping_bucket = front_slice.entity_fetch_count_->at(0) == 1;
// Find the iterators into bucket_boundaries_ of the start of the first and the end of the last bucket this call to process is
// responsible for calculating
// All segments in a given row slice contain the same index column, so just grab info from the first one
Expand Down Expand Up @@ -653,26 +652,37 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit

ARCTICDB_DEBUG_THROW(5)
for (const auto& aggregator: aggregators_) {
std::vector<std::optional<ColumnWithStrings>> input_agg_columns;
std::vector<ColumnWithStrings> input_agg_columns;
input_agg_columns.reserve(row_slices.size());
size_t slice_index = 0;
bm::bvector<> existing_columns;
existing_columns.init();
existing_columns.resize(row_slices.size());
for (auto& row_slice: row_slices) {
auto variant_data = row_slice.get(aggregator.get_input_column_name());
util::variant_match(variant_data,
[&input_agg_columns](const ColumnWithStrings& column_with_strings) {
[&input_agg_columns, &existing_columns, &slice_index](const ColumnWithStrings& column_with_strings) {
input_agg_columns.emplace_back(column_with_strings);
existing_columns.set(slice_index);
},
[&input_agg_columns](const EmptyResult&) {
[](const EmptyResult&) {
// Dynamic schema, missing column from this row-slice
// Not currently supported, but will be, hence the argument to aggregate being a vector of optionals
input_agg_columns.emplace_back();
},
[](const auto&) {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unexpected return type from ProcessingUnit::get, expected column-like");
}
);
++slice_index;
}
if (!input_agg_columns.empty()) {
const bool has_missing_columns = !existing_columns.is_all_one_range(0, row_slices.size() - 1);
auto aggregated_column = has_missing_columns ?
std::make_shared<Column>(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool, existing_columns)) :
std::make_shared<Column>(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool));
const auto field = scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value);
seg.add_column(field, std::move(aggregated_column));
}
auto aggregated_column = std::make_shared<Column>(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool));
seg.add_column(scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value), aggregated_column);
}
seg.set_row_data(output_index_column->row_count() - 1);
return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range)));
Expand Down
9 changes: 1 addition & 8 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

#pragma once

#include <arcticdb/entity/key.hpp>
#include <arcticdb/column_store/column.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/pipeline/value.hpp>
#include <arcticdb/processing/expression_context.hpp>
#include <arcticdb/processing/expression_node.hpp>
#include <arcticdb/entity/types.hpp>
Expand All @@ -19,20 +17,15 @@
#include <arcticdb/processing/aggregation_interface.hpp>
#include <arcticdb/processing/processing_unit.hpp>
#include <arcticdb/processing/sorted_aggregation.hpp>
#include <arcticdb/processing/grouper.hpp>
#include <arcticdb/stream/aggregator.hpp>
#include <arcticdb/util/movable_priority_queue.hpp>
#include <arcticdb/pipeline/index_utils.hpp>

#include <folly/Poly.h>
#include <folly/futures/Future.h>

#include <vector>
#include <unordered_map>
#include <string>
#include <variant>
#include <memory>
#include <atomic>


namespace arcticdb {

Expand Down
3 changes: 0 additions & 3 deletions cpp/arcticdb/processing/processing_unit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
#include <arcticdb/processing/expression_node.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/pipeline/filter_segment.hpp>
#include <arcticdb/util/composite.hpp>
#include <arcticdb/util/string_utils.hpp>
#include <arcticdb/util/variant.hpp>

namespace arcticdb {
enum class PipelineOptimisation : uint8_t {
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/processing/query_planner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

#include <variant>
#include <vector>
#include <memory>

#include <arcticdb/processing/clause.hpp>
#include <arcticdb/processing/grouper.hpp>

namespace arcticdb {

Expand Down
Loading
Loading