From 2405d7de727a1e3acf11abe2ab935121db20f9fe Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Tue, 4 Feb 2025 17:47:03 +0200 Subject: [PATCH 01/10] Move things to .hpp --- .../processing/sorted_aggregation.cpp | 149 ++++++++++++++++-- .../processing/sorted_aggregation.hpp | 102 +----------- 2 files changed, 141 insertions(+), 110 deletions(-) diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 21c8e0f881..37fac20e4c 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -8,9 +8,115 @@ #include #include #include +#include namespace arcticdb { +template +void push_to_aggregator(Aggregator& bucket_aggregator, T value, ARCTICDB_UNUSED const ColumnWithStrings& column_with_strings) { + if constexpr(is_time_type(input_data_type) && aggregation_operator == AggregationOperator::COUNT) { + bucket_aggregator.template push(value); + } else if constexpr (is_numeric_type(input_data_type) || is_bool_type(input_data_type)) { + bucket_aggregator.push(value); + } else if constexpr (is_sequence_type(input_data_type)) { + bucket_aggregator.push(column_with_strings.string_at_offset(value)); + } +} + +template +requires arcticdb::util::is_instantiation_of_v +[[nodiscard]] auto get_bucket_aggregator() { + if constexpr (aggregation_operator == AggregationOperator::SUM) { + if constexpr (is_bool_type(scalar_type_info::data_type)) { + // Sum of bool column is just the count of true values + return SumAggregatorSorted(); + } else { + return SumAggregatorSorted(); + } + } else if constexpr (aggregation_operator == AggregationOperator::MEAN) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return MeanAggregatorSorted(); + } else { + return MeanAggregatorSorted(); + } + } else if constexpr (aggregation_operator == AggregationOperator::MIN) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return MinAggregatorSorted(); + } else { + return MinAggregatorSorted(); + } + } else if constexpr (aggregation_operator == AggregationOperator::MAX) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return MaxAggregatorSorted(); + } else { + return MaxAggregatorSorted(); + } + } else if constexpr (aggregation_operator == AggregationOperator::FIRST) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return FirstAggregatorSorted(); + } else if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) { + return FirstAggregatorSorted(); + } else if constexpr (is_sequence_type(scalar_type_info::data_type)) { + return FirstAggregatorSorted>(); + } + } else if constexpr (aggregation_operator == AggregationOperator::LAST) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return LastAggregatorSorted(); + } else if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) { + return LastAggregatorSorted(); + } else if constexpr (is_sequence_type(scalar_type_info::data_type)) { + return LastAggregatorSorted>(); + } + } else if constexpr (aggregation_operator == AggregationOperator::COUNT) { + return CountAggregatorSorted(); + } +} + +template +requires arcticdb::util::is_instantiation_of_v +consteval bool is_output_type_allowed() { + return is_numeric_type(output_type_info::data_type) || + is_bool_type(output_type_info::data_type) || + (is_sequence_type(output_type_info::data_type) && (aggregation_operator == AggregationOperator::FIRST || aggregation_operator == AggregationOperator::LAST)); +} + +template +requires arcticdb::util::is_instantiation_of_v && arcticdb::util::is_instantiation_of_v +consteval bool are_input_output_operation_allowed() { + return (is_numeric_type(input_type_info::data_type) && is_numeric_type(output_type_info::data_type)) || + (is_sequence_type(input_type_info::data_type) && (is_sequence_type(output_type_info::data_type) || aggregation_operator == AggregationOperator::COUNT)) || + (is_bool_type(input_type_info::data_type) && (is_bool_type(output_type_info::data_type) || is_numeric_type(output_type_info::data_type))); +} + +template +SortedAggregator::SortedAggregator(ColumnName input_column_name, ColumnName output_column_name) + : input_column_name_(std::move(input_column_name)) + , output_column_name_(std::move(output_column_name)) +{} + +template +ColumnName SortedAggregator::get_input_column_name() const { return input_column_name_; } + +template +ColumnName SortedAggregator::get_output_column_name() const { return output_column_name_; } + +template +auto finalize_aggregator( + Aggregator& bucket_aggregator, + [[maybe_unused]] StringPool& string_pool +) { + if constexpr (is_numeric_type(output_data_type) || is_bool_type(output_data_type) || aggregation_operator == AggregationOperator::COUNT) { + return bucket_aggregator.finalize(); + } else if constexpr (is_sequence_type(output_data_type)) { + auto opt_string_view = bucket_aggregator.finalize(); + if (ARCTICDB_LIKELY(opt_string_view.has_value())) { + return string_pool.get(*opt_string_view).offset(); + } else { + return string_none; + } + } +} + template Column SortedAggregator::aggregate(const std::vector>& input_index_columns, const std::vector>& input_agg_columns, @@ -33,13 +139,8 @@ Column SortedAggregator::aggregate(const auto output_it = output_data.begin(); auto output_end_it = output_data.end(); // Need this here to only generate valid get_bucket_aggregator code, exception will have been thrown earlier at runtime - constexpr bool supported_aggregation_type_combo = is_numeric_type(output_type_info::data_type) || - is_bool_type(output_type_info::data_type) || - (is_sequence_type(output_type_info::data_type) && - (aggregation_operator == AggregationOperator::FIRST || - aggregation_operator == AggregationOperator::LAST)); - if constexpr (supported_aggregation_type_combo) { - auto bucket_aggregator = get_bucket_aggregator(); + if constexpr (is_output_type_allowed()) { + auto bucket_aggregator = get_bucket_aggregator(); bool reached_end_of_buckets{false}; auto bucket_start_it = bucket_boundaries.cbegin(); auto bucket_end_it = std::next(bucket_start_it); @@ -64,9 +165,7 @@ Column SortedAggregator::aggregate(const &reached_end_of_buckets](auto input_type_desc_tag) { using input_type_info = ScalarTypeInfo; // Again, only needed to generate valid code below, exception will have been thrown earlier at runtime - if constexpr ((is_numeric_type(input_type_info::data_type) && is_numeric_type(output_type_info::data_type)) || - (is_sequence_type(input_type_info::data_type) && (is_sequence_type(output_type_info::data_type) || aggregation_operator == AggregationOperator::COUNT)) || - (is_bool_type(input_type_info::data_type) && (is_bool_type(output_type_info::data_type) || is_numeric_type(output_type_info::data_type)))) { + if constexpr (are_input_output_operation_allowed()) { schema::check( !agg_column.column_->is_sparse() && agg_column.column_->row_count() == input_index_column->row_count(), "Resample: Cannot aggregate column '{}' as it is sparse", @@ -78,11 +177,11 @@ Column SortedAggregator::aggregate(const bool bucket_has_values = false; for (auto index_it = index_data.template cbegin(); index_it != index_cend && !reached_end_of_buckets; ++index_it, ++agg_it) { if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator(bucket_aggregator, *agg_it, agg_column); + push_to_aggregator(bucket_aggregator, *agg_it, agg_column); bucket_has_values = true; } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it)) && output_it != output_end_it) { if (bucket_has_values) { - *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); + *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); } // The following code is equivalent to: // if constexpr (closed_boundary == ResampleBoundary::LEFT) { @@ -110,7 +209,7 @@ Column SortedAggregator::aggregate(const bucket_has_values = false; current_bucket.set_boundaries(*bucket_start_it, *bucket_end_it); if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator(bucket_aggregator, *agg_it, agg_column); + push_to_aggregator(bucket_aggregator, *agg_it, agg_column); bucket_has_values = true; } } @@ -123,7 +222,7 @@ Column SortedAggregator::aggregate(const } // We were in the middle of aggregating a bucket when we ran out of index values if (output_it != output_end_it) { - *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); + *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); } } } @@ -215,4 +314,26 @@ template class SortedAggregator; template class SortedAggregator; +template +Bucket::Bucket(timestamp start, timestamp end): + start_(start), end_(end){} + +template +void Bucket::set_boundaries(timestamp start, timestamp end) { + start_ = start; + end_ = end; +} + +template +[[nodiscard]] bool Bucket::contains(timestamp ts) const { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return ts >= start_ && ts < end_; + } else { + // closed_boundary == ResampleBoundary::RIGHT + return ts > start_ && ts <= end_; + } +} + +template class Bucket; +template class Bucket; } \ No newline at end of file diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index 440d58cfd7..aadf12c663 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -46,23 +46,9 @@ using SortedAggregatorInterface = folly::Poly; template class Bucket { public: - Bucket(timestamp start, timestamp end): - start_(start), end_(end){} - - void set_boundaries(timestamp start, timestamp end) { - start_ = start; - end_ = end; - } - - bool contains(timestamp ts) const { - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - return ts >= start_ && ts < end_; - } else { - // closed_boundary == ResampleBoundary::RIGHT - return ts > start_ && ts <= end_; - } - } - + Bucket(timestamp start, timestamp end); + void set_boundaries(timestamp start, timestamp end); + [[nodiscard]] bool contains(timestamp ts) const; private: timestamp start_; timestamp end_; @@ -345,14 +331,11 @@ class SortedAggregator { public: - explicit SortedAggregator(ColumnName input_column_name, ColumnName output_column_name) - : input_column_name_(std::move(input_column_name)) - , output_column_name_(std::move(output_column_name)) - {} + explicit SortedAggregator(ColumnName input_column_name, ColumnName output_column_name); ARCTICDB_MOVE_COPY_DEFAULT(SortedAggregator) - [[nodiscard]] ColumnName get_input_column_name() const { return input_column_name_; } - [[nodiscard]] ColumnName get_output_column_name() const { return output_column_name_; } + [[nodiscard]] ColumnName get_input_column_name() const; + [[nodiscard]] ColumnName get_output_column_name() const; [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, const std::vector>& input_agg_columns, @@ -365,79 +348,6 @@ class SortedAggregator [[nodiscard]] DataType generate_output_data_type(DataType common_input_data_type) const; [[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const; - template - void push_to_aggregator(Aggregator& bucket_aggregator, T value, ARCTICDB_UNUSED const ColumnWithStrings& column_with_strings) const { - if constexpr(is_time_type(input_data_type) && aggregation_operator == AggregationOperator::COUNT) { - bucket_aggregator.template push(value); - } else if constexpr (is_numeric_type(input_data_type) || is_bool_type(input_data_type)) { - bucket_aggregator.push(value); - } else if constexpr (is_sequence_type(input_data_type)) { - bucket_aggregator.push(column_with_strings.string_at_offset(value)); - } - } - - template - [[nodiscard]] auto finalize_aggregator(Aggregator& bucket_aggregator, ARCTICDB_UNUSED StringPool& string_pool) const { - if constexpr (is_numeric_type(output_data_type) || is_bool_type(output_data_type) || aggregation_operator == AggregationOperator::COUNT) { - return bucket_aggregator.finalize(); - } else if constexpr (is_sequence_type(output_data_type)) { - auto opt_string_view = bucket_aggregator.finalize(); - if (ARCTICDB_LIKELY(opt_string_view.has_value())) { - return string_pool.get(*opt_string_view).offset(); - } else { - return string_none; - } - } - } - - template - [[nodiscard]] auto get_bucket_aggregator() const { - if constexpr (aggregation_operator == AggregationOperator::SUM) { - if constexpr (is_bool_type(scalar_type_info::data_type)) { - // Sum of bool column is just the count of true values - return SumAggregatorSorted(); - } else { - return SumAggregatorSorted(); - } - } else if constexpr (aggregation_operator == AggregationOperator::MEAN) { - if constexpr (is_time_type(scalar_type_info::data_type)) { - return MeanAggregatorSorted(); - } else { - return MeanAggregatorSorted(); - } - } else if constexpr (aggregation_operator == AggregationOperator::MIN) { - if constexpr (is_time_type(scalar_type_info::data_type)) { - return MinAggregatorSorted(); - } else { - return MinAggregatorSorted(); - } - } else if constexpr (aggregation_operator == AggregationOperator::MAX) { - if constexpr (is_time_type(scalar_type_info::data_type)) { - return MaxAggregatorSorted(); - } else { - return MaxAggregatorSorted(); - } - } else if constexpr (aggregation_operator == AggregationOperator::FIRST) { - if constexpr (is_time_type(scalar_type_info::data_type)) { - return FirstAggregatorSorted(); - } else if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) { - return FirstAggregatorSorted(); - } else if constexpr (is_sequence_type(scalar_type_info::data_type)) { - return FirstAggregatorSorted>(); - } - } else if constexpr (aggregation_operator == AggregationOperator::LAST) { - if constexpr (is_time_type(scalar_type_info::data_type)) { - return LastAggregatorSorted(); - } else if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) { - return LastAggregatorSorted(); - } else if constexpr (is_sequence_type(scalar_type_info::data_type)) { - return LastAggregatorSorted>(); - } - } else if constexpr (aggregation_operator == AggregationOperator::COUNT) { - return CountAggregatorSorted(); - } - } - ColumnName input_column_name_; ColumnName output_column_name_; }; From b0c6da9e96ff4482f117a2a2175c3f9f7d759ebf Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Tue, 4 Feb 2025 19:33:12 +0200 Subject: [PATCH 02/10] Add unit test for dynamic schema --- .../processing/sorted_aggregation.cpp | 70 +++++++++++++------ .../processing/sorted_aggregation.hpp | 18 ++++- .../arcticdb/version_store/test_resample.py | 21 +++++- 3 files changed, 85 insertions(+), 24 deletions(-) diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 37fac20e4c..f889c87243 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -118,13 +118,15 @@ auto finalize_aggregator( } template -Column SortedAggregator::aggregate(const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, - const Column& output_index_column, - StringPool& string_pool) const { +[[nodiscard]] Column SortedAggregator::aggregate_static_schema( + std::span> input_index_columns, + std::span> input_agg_columns, + std::span bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool, + DataType common_input_type +) const { using IndexTDT = ScalarTagType>; - auto common_input_type = generate_common_input_type(input_agg_columns); Column res(TypeDescriptor(generate_output_data_type(common_input_type), Dimension::Dim0), output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED); details::visit_type( res.type().data_type(), @@ -142,10 +144,10 @@ Column SortedAggregator::aggregate(const if constexpr (is_output_type_allowed()) { auto bucket_aggregator = get_bucket_aggregator(); bool reached_end_of_buckets{false}; - auto bucket_start_it = bucket_boundaries.cbegin(); + auto bucket_start_it = bucket_boundaries.begin(); auto bucket_end_it = std::next(bucket_start_it); Bucket current_bucket(*bucket_start_it, *bucket_end_it); - const auto bucket_boundaries_end = bucket_boundaries.cend(); + const auto bucket_boundaries_end = bucket_boundaries.end(); for (auto [idx, input_agg_column]: folly::enumerate(input_agg_columns)) { // Always true right now due to earlier check if (input_agg_column.has_value()) { @@ -156,7 +158,7 @@ Column SortedAggregator::aggregate(const &output_end_it, &bucket_aggregator, &agg_column = *input_agg_column, - &input_index_column = input_index_columns.at(idx), + &input_index_column = input_index_columns[idx], &bucket_boundaries_end, &string_pool, &bucket_start_it, @@ -231,26 +233,52 @@ Column SortedAggregator::aggregate(const } template -DataType SortedAggregator::generate_common_input_type( +[[nodiscard]] Column SortedAggregator::aggregate_dynamic_schema( + [[maybe_unused]] std::span> input_index_columns, + [[maybe_unused]] std::span> input_agg_columns, + [[maybe_unused]] std::span bucket_boundaries, + [[maybe_unused]] const Column& output_index_column, + [[maybe_unused]] StringPool& string_pool +) const { + return {}; +} + +template +Column SortedAggregator::aggregate(const std::vector>& input_index_columns, + const std::vector>& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool) const { + if (const std::optional common_input_type = generate_common_input_type(input_agg_columns)) { + return aggregate_static_schema( + input_index_columns, + input_agg_columns, + bucket_boundaries, + output_index_column, + string_pool, + *common_input_type); + } + return aggregate_dynamic_schema(input_index_columns, + input_agg_columns, + bucket_boundaries, + output_index_column, + string_pool); + +} + +template +std::optional SortedAggregator::generate_common_input_type( const std::vector>& input_agg_columns - ) const { +) const { std::optional common_input_type; for (const auto& opt_input_agg_column: input_agg_columns) { if (opt_input_agg_column.has_value()) { - auto input_data_type = opt_input_agg_column->column_->type().data_type(); + const auto input_data_type = opt_input_agg_column->column_->type().data_type(); check_aggregator_supported_with_data_type(input_data_type); add_data_type_impl(input_data_type, common_input_type); - } else { - // Column is missing from this row-slice due to dynamic schema, currently unsupported - schema::raise("Resample: Cannot aggregate column '{}' as it is missing from some row slices", - get_input_column_name().value); } } - // Column is missing from all row-slices due to dynamic schema, currently unsupported - schema::check(common_input_type.has_value(), - "Resample: Cannot aggregate column '{}' as it is missing from some row slices", - get_input_column_name().value); - return *common_input_type; + return common_input_type; } template diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index aadf12c663..f32b365c76 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -7,9 +7,10 @@ #pragma once -#include +#include #include #include +#include #include @@ -343,7 +344,20 @@ class SortedAggregator const Column& output_index_column, StringPool& string_pool) const; private: - [[nodiscard]] DataType generate_common_input_type(const std::vector>& input_agg_columns) const; + [[nodiscard]] Column aggregate_static_schema( + std::span> input_index_columns, + std::span> input_agg_columns, + std::span bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool, + DataType common_input_type) const; + [[nodiscard]] Column aggregate_dynamic_schema( + std::span> input_index_columns, + std::span> input_agg_columns, + std::span bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool) const; + [[nodiscard]] std::optional generate_common_input_type(const std::vector>& input_agg_columns) const; void check_aggregator_supported_with_data_type(DataType data_type) const; [[nodiscard]] DataType generate_output_data_type(DataType common_input_data_type) const; [[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const; diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 6b41090f87..bb1d5afe9b 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -843,4 +843,23 @@ def test_min_with_one_infinity_element(lmdb_version_store_v1): lib.write(sym, pd.DataFrame({"col": [-np.inf]}, index=pd.DatetimeIndex([pd.Timestamp("2024-01-01")]))) q = QueryBuilder() q = q.resample('1min').agg({"col_min":("col", "min")}) - assert np.isneginf(lib.read(sym, query_builder=q).data['col_min'][0]) \ No newline at end of file + assert np.isneginf(lib.read(sym, query_builder=q).data['col_min'][0]) + +class TestDynamicSchema: + def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_dynamic_schema_v1): + lib = lmdb_version_store_dynamic_schema_v1 + sym = "sym" + + idx = pd.date_range(pd.Timestamp(0), periods=20, freq='ns') + lib.write(sym, pd.DataFrame({"a": range(len(idx))}, index=idx)) + + idx = pd.date_range(pd.Timestamp(20), periods=20, freq='ns') + lib.append(sym, pd.DataFrame({"a": range(len(idx)), "b": np.array(range(len(idx)), dtype=np.float32)}, index=idx)) + + data = lib.read(sym).data + print(data) + #res = data.resample("10ns").agg(None, **{"col_count":("b", "count")}) + #print(res) + q = QueryBuilder() + q = q.resample('10ns').agg({"b_count": ("b", "min")}) + lib.read(sym, query_builder=q) \ No newline at end of file From e828294ee9f4377d9aa9c4b2a474129c7f66a7dc Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Wed, 5 Feb 2025 17:30:24 +0200 Subject: [PATCH 03/10] Minor refactor --- cpp/arcticdb/processing/clause.cpp | 7 ++-- cpp/arcticdb/processing/clause.hpp | 9 +--- cpp/arcticdb/processing/query_planner.hpp | 2 + .../processing/sorted_aggregation.cpp | 42 ++++++++++++++++--- .../processing/sorted_aggregation.hpp | 28 +------------ .../processing/test/benchmark_clause.cpp | 3 +- cpp/arcticdb/processing/test/test_clause.cpp | 1 + cpp/arcticdb/version/version_core.cpp | 9 ++-- 8 files changed, 52 insertions(+), 49 deletions(-) diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index a69d9a671a..1d324b004d 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -19,10 +19,9 @@ #include #include #include +#include #include - - namespace arcticdb { namespace ranges = std::ranges; @@ -589,7 +588,7 @@ std::vector> ResampleClause::structure_fo internal::check( idx < expected_fetch_counts.size(), "Index {} in new_structure_offsets out of bounds >{}", idx, expected_fetch_counts.size() - 1); - expected_fetch_counts[idx]++; + ++expected_fetch_counts[idx]; } } internal::check( @@ -621,7 +620,7 @@ std::vector ResampleClause::process(std::vectorat(0) == 1; + const bool responsible_for_first_overlapping_bucket = front_slice.entity_fetch_count_->at(0) == 1; // Find the iterators into bucket_boundaries_ of the start of the first and the end of the last bucket this call to process is // responsible for calculating // All segments in a given row slice contain the same index column, so just grab info from the first one diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 134780d8db..8b2a54c47b 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -7,10 +7,8 @@ #pragma once -#include #include #include -#include #include #include #include @@ -19,20 +17,15 @@ #include #include #include -#include #include -#include -#include #include -#include #include -#include #include #include #include -#include + namespace arcticdb { diff --git a/cpp/arcticdb/processing/query_planner.hpp b/cpp/arcticdb/processing/query_planner.hpp index 99106654e1..2844663046 100644 --- a/cpp/arcticdb/processing/query_planner.hpp +++ b/cpp/arcticdb/processing/query_planner.hpp @@ -9,8 +9,10 @@ #include #include +#include #include +#include namespace arcticdb { diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index f889c87243..733eedc9ce 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -127,7 +127,12 @@ template>; - Column res(TypeDescriptor(generate_output_data_type(common_input_type), Dimension::Dim0), output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED); + Column res( + TypeDescriptor(generate_output_data_type(common_input_type), Dimension::Dim0), + output_index_column.row_count(), + AllocationType::PRESIZED, + Sparsity::NOT_PERMITTED + ); details::visit_type( res.type().data_type(), [this, @@ -173,11 +178,11 @@ templatedata(); - const auto index_cend = index_data.template cend(); + const auto index_cend = index_data.cend(); auto agg_data = agg_column.column_->data(); - auto agg_it = agg_data.template cbegin(); + auto agg_it = agg_data.cbegin(); bool bucket_has_values = false; - for (auto index_it = index_data.template cbegin(); index_it != index_cend && !reached_end_of_buckets; ++index_it, ++agg_it) { + for (auto index_it = index_data.cbegin(); index_it != index_cend && !reached_end_of_buckets; ++index_it, ++agg_it) { if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { push_to_aggregator(bucket_aggregator, *agg_it, agg_column); bucket_has_values = true; @@ -364,4 +369,31 @@ template template class Bucket; template class Bucket; -} \ No newline at end of file +} + +template<> +struct fmt::formatter { + template + constexpr auto parse(ParseContext &ctx) { return ctx.begin(); } + + template + auto format(const arcticdb::AggregationOperator& agg, FormatContext &ctx) const { + switch(agg) { + case arcticdb::AggregationOperator::SUM: + return fmt::format_to(ctx.out(), "SUM"); + case arcticdb::AggregationOperator::MEAN: + return fmt::format_to(ctx.out(), "MEAN"); + case arcticdb::AggregationOperator::MIN: + return fmt::format_to(ctx.out(), "MIN"); + case arcticdb::AggregationOperator::MAX: + return fmt::format_to(ctx.out(), "MAX"); + case arcticdb::AggregationOperator::FIRST: + return fmt::format_to(ctx.out(), "FIRST"); + case arcticdb::AggregationOperator::LAST: + return fmt::format_to(ctx.out(), "LAST"); + case arcticdb::AggregationOperator::COUNT: + default: + return fmt::format_to(ctx.out(), "COUNT"); + } + } +}; \ No newline at end of file diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index f32b365c76..9cb49ea729 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -368,31 +368,5 @@ class SortedAggregator } // namespace arcticdb -namespace fmt { template<> -struct formatter { - template - constexpr auto parse(ParseContext &ctx) { return ctx.begin(); } - - template - auto format(const arcticdb::AggregationOperator& agg, FormatContext &ctx) const { - switch(agg) { - case arcticdb::AggregationOperator::SUM: - return fmt::format_to(ctx.out(), "SUM"); - case arcticdb::AggregationOperator::MEAN: - return fmt::format_to(ctx.out(), "MEAN"); - case arcticdb::AggregationOperator::MIN: - return fmt::format_to(ctx.out(), "MIN"); - case arcticdb::AggregationOperator::MAX: - return fmt::format_to(ctx.out(), "MAX"); - case arcticdb::AggregationOperator::FIRST: - return fmt::format_to(ctx.out(), "FIRST"); - case arcticdb::AggregationOperator::LAST: - return fmt::format_to(ctx.out(), "LAST"); - case arcticdb::AggregationOperator::COUNT: - default: - return fmt::format_to(ctx.out(), "COUNT"); - } - } -}; -} //namespace fmt \ No newline at end of file +struct fmt::formatter; \ No newline at end of file diff --git a/cpp/arcticdb/processing/test/benchmark_clause.cpp b/cpp/arcticdb/processing/test/benchmark_clause.cpp index d306e234f3..d9e792423c 100644 --- a/cpp/arcticdb/processing/test/benchmark_clause.cpp +++ b/cpp/arcticdb/processing/test/benchmark_clause.cpp @@ -12,8 +12,7 @@ #include #include #include -#include -#include +#include using namespace arcticdb; diff --git a/cpp/arcticdb/processing/test/test_clause.cpp b/cpp/arcticdb/processing/test/test_clause.cpp index e21847503a..22827d6e01 100644 --- a/cpp/arcticdb/processing/test/test_clause.cpp +++ b/cpp/arcticdb/processing/test/test_clause.cpp @@ -11,6 +11,7 @@ #include #include #include +#include template void segment_scalar_assert_all_values_equal(const arcticdb::ProcessingUnit& proc_unit, const arcticdb::ColumnName& name, const std::unordered_set& expected, size_t expected_row_count) { diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 1d3097f349..eb442671f9 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -27,9 +27,12 @@ #include #include #include +#include namespace arcticdb::version_store { +namespace ranges = std::ranges; + void modify_descriptor(const std::shared_ptr& pipeline_context, const ReadOptions& read_options) { if (opt_false(read_options.force_strings_to_object_) || opt_false(read_options.force_strings_to_fixed_)) @@ -895,8 +898,8 @@ folly::Future> read_and_process( const ReadOptions& read_options ) { auto component_manager = std::make_shared(); - ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; - for (auto& clause: read_query->clauses_) { + const ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; + for (const auto& clause: read_query->clauses_) { clause->set_processing_config(processing_config); clause->set_component_manager(component_manager); } @@ -920,7 +923,7 @@ folly::Future> read_and_process( .thenValue([component_manager, read_query, pipeline_context](std::vector&& processed_entity_ids) { auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); - if (std::any_of(read_query->clauses_.begin(), read_query->clauses_.end(), [](const std::shared_ptr& clause) { + if (ranges::any_of(read_query->clauses_, [](const std::shared_ptr& clause) { return clause->clause_info().modifies_output_descriptor_; })) { set_output_descriptors(proc, read_query->clauses_, pipeline_context); From 0bd300d166e2708125b85a005707aff18532c8b9 Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Thu, 6 Feb 2025 12:38:44 +0200 Subject: [PATCH 04/10] Cleanup code and exted testing for other types Test for (un)signed ints and floats --- cpp/arcticdb/entity/types.cpp | 44 +++++ cpp/arcticdb/entity/types.hpp | 52 ++--- cpp/arcticdb/pipeline/read_frame.cpp | 2 +- cpp/arcticdb/processing/clause.cpp | 12 +- cpp/arcticdb/processing/processing_unit.hpp | 3 - .../processing/sorted_aggregation.cpp | 187 +++++++----------- .../processing/sorted_aggregation.hpp | 19 +- cpp/arcticdb/util/sparse_utils.hpp | 6 +- cpp/arcticdb/version/version_core.cpp | 7 +- .../arcticdb/version_store/test_resample.py | 52 ++++- 10 files changed, 188 insertions(+), 196 deletions(-) diff --git a/cpp/arcticdb/entity/types.cpp b/cpp/arcticdb/entity/types.cpp index 57901e7a35..a7b0ad2eaa 100644 --- a/cpp/arcticdb/entity/types.cpp +++ b/cpp/arcticdb/entity/types.cpp @@ -56,4 +56,48 @@ std::size_t data_type_size(const TypeDescriptor& td, DataTypeMode mode) { return mode == DataTypeMode::EXTERNAL ? external_data_type_size(td) : internal_data_type_size(td); } +TypeDescriptor FieldRef::type() const { + return type_; +} + +std::string_view FieldRef::name() const { + return name_; +} + +bool operator==(const FieldRef &left, const FieldRef &right) { + return left.type_ == right.type_ && left.name_ == right.name_; +} + +FieldWrapper::FieldWrapper(TypeDescriptor type, std::string_view name) : + data_(Field::calc_size(name)) { + mutable_field().set(type, name); +} + +const Field& FieldWrapper::field() const { + return *reinterpret_cast(data_.data()); +} + +const TypeDescriptor& FieldWrapper::type() const { + return field().type(); +} + +std::string_view FieldWrapper::name() const { + return field().name(); +} + +Field& FieldWrapper::mutable_field() { + return *reinterpret_cast(data_.data()); +} + +FieldRef scalar_field(DataType type, std::string_view name) { + return {TypeDescriptor{type, Dimension::Dim0}, name}; +} + +bool operator==(const Field &l, const Field &r) { + return l.type() == r.type() && l.name() == r.name(); +} + +bool operator!=(const Field &l, const Field &r) { + return !(l == r); +} } // namespace arcticdb diff --git a/cpp/arcticdb/entity/types.hpp b/cpp/arcticdb/entity/types.hpp index 8dd5e6c099..f367bc5e41 100644 --- a/cpp/arcticdb/entity/types.hpp +++ b/cpp/arcticdb/entity/types.hpp @@ -607,17 +607,9 @@ struct FieldRef { TypeDescriptor type_; std::string_view name_; - [[nodiscard]] TypeDescriptor type() const { - return type_; - } - - [[nodiscard]] std::string_view name() const { - return name_; - } - - friend bool operator==(const FieldRef &left, const FieldRef &right) { - return left.type_ == right.type_ && left.name_ == right.name_; - } + [[nodiscard]] TypeDescriptor type() const; + [[nodiscard]] std::string_view name() const; + friend bool operator==(const FieldRef &left, const FieldRef &right); }; #pragma pack(push, 1) @@ -690,45 +682,23 @@ struct Field { struct FieldWrapper { std::vector data_; - FieldWrapper(TypeDescriptor type, std::string_view name) : - data_(Field::calc_size(name)) { - mutable_field().set(type, name); - } - - const Field &field() const { - return *reinterpret_cast(data_.data()); - } - - const TypeDescriptor& type() const { - return field().type(); - } - - const std::string_view name() const { - return field().name(); - } - + FieldWrapper(TypeDescriptor type, std::string_view name); + const Field &field() const; + const TypeDescriptor& type() const; + std::string_view name() const; private: - Field &mutable_field() { - return *reinterpret_cast(data_.data()); - } + Field &mutable_field(); }; -inline FieldRef scalar_field(DataType type, std::string_view name) { - return {TypeDescriptor{type, Dimension::Dim0}, name}; -} +FieldRef scalar_field(DataType type, std::string_view name); template auto visit_field(const Field &field, Callable &&c) { return field.type().visit_tag(std::forward(c)); } -inline bool operator==(const Field &l, const Field &r) { - return l.type() == r.type() && l.name() == r.name(); -} - -inline bool operator!=(const Field &l, const Field &r) { - return !(l == r); -} +bool operator==(const Field &l, const Field &r); +bool operator!=(const Field &l, const Field &r); } // namespace entity diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index de65763748..ba8ab15912 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -641,7 +641,7 @@ folly::Future reduce_and_fix_columns( if(frame.empty()) return folly::Unit{}; - bool dynamic_schema = opt_false(read_options.dynamic_schema_); + const bool dynamic_schema = opt_false(read_options.dynamic_schema_); auto slice_map = std::make_shared(context, dynamic_schema); DecodePathData shared_data; diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 1d324b004d..f82914a938 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -652,7 +652,7 @@ std::vector ResampleClause::process(std::vector> input_agg_columns; + std::vector input_agg_columns; input_agg_columns.reserve(row_slices.size()); for (auto& row_slice: row_slices) { auto variant_data = row_slice.get(aggregator.get_input_column_name()); @@ -660,18 +660,20 @@ std::vector ResampleClause::process(std::vector("Unexpected return type from ProcessingUnit::get, expected column-like"); } ); } - auto aggregated_column = std::make_shared(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool)); - seg.add_column(scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value), aggregated_column); + if (!input_agg_columns.empty()) { + auto aggregated_column = std::make_shared(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool)); + const auto field = scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value); + seg.add_column(field, std::move(aggregated_column)); + } } seg.set_row_data(output_index_column->row_count() - 1); return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range))); diff --git a/cpp/arcticdb/processing/processing_unit.hpp b/cpp/arcticdb/processing/processing_unit.hpp index 98892afac9..66b1a1664d 100644 --- a/cpp/arcticdb/processing/processing_unit.hpp +++ b/cpp/arcticdb/processing/processing_unit.hpp @@ -19,9 +19,6 @@ #include #include #include -#include -#include -#include namespace arcticdb { enum class PipelineOptimisation : uint8_t { diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 733eedc9ce..e38c8ad2e1 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -118,17 +118,15 @@ auto finalize_aggregator( } template -[[nodiscard]] Column SortedAggregator::aggregate_static_schema( - std::span> input_index_columns, - std::span> input_agg_columns, - std::span bucket_boundaries, - const Column& output_index_column, - StringPool& string_pool, - DataType common_input_type -) const { +Column SortedAggregator::aggregate(const std::vector>& input_index_columns, + const std::vector& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool) const { + std::optional common_input_type = generate_common_input_type(input_agg_columns); using IndexTDT = ScalarTagType>; Column res( - TypeDescriptor(generate_output_data_type(common_input_type), Dimension::Dim0), + TypeDescriptor(generate_output_data_type(*common_input_type), Dimension::Dim0), output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED @@ -154,78 +152,75 @@ template current_bucket(*bucket_start_it, *bucket_end_it); const auto bucket_boundaries_end = bucket_boundaries.end(); for (auto [idx, input_agg_column]: folly::enumerate(input_agg_columns)) { - // Always true right now due to earlier check - if (input_agg_column.has_value()) { - details::visit_type( - input_agg_column->column_->type().data_type(), - [this, - &output_it, - &output_end_it, - &bucket_aggregator, - &agg_column = *input_agg_column, - &input_index_column = input_index_columns[idx], - &bucket_boundaries_end, - &string_pool, - &bucket_start_it, - &bucket_end_it, - ¤t_bucket, - &reached_end_of_buckets](auto input_type_desc_tag) { - using input_type_info = ScalarTypeInfo; - // Again, only needed to generate valid code below, exception will have been thrown earlier at runtime - if constexpr (are_input_output_operation_allowed()) { - schema::check( - !agg_column.column_->is_sparse() && agg_column.column_->row_count() == input_index_column->row_count(), - "Resample: Cannot aggregate column '{}' as it is sparse", - get_input_column_name().value); - auto index_data = input_index_column->data(); - const auto index_cend = index_data.cend(); - auto agg_data = agg_column.column_->data(); - auto agg_it = agg_data.cbegin(); - bool bucket_has_values = false; - for (auto index_it = index_data.cbegin(); index_it != index_cend && !reached_end_of_buckets; ++index_it, ++agg_it) { - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator(bucket_aggregator, *agg_it, agg_column); - bucket_has_values = true; - } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it)) && output_it != output_end_it) { - if (bucket_has_values) { - *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); - } - // The following code is equivalent to: - // if constexpr (closed_boundary == ResampleBoundary::LEFT) { - // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it); - // } else { - // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it, std::less_equal{}); - // } - // bucket_start_it = std::prev(bucket_end_it); - // reached_end_of_buckets = bucket_end_it == bucket_boundaries_end; - // The above code will be more performant when the vast majority of buckets are empty - // See comment in ResampleClause::advance_boundary_past_value for mathematical and experimental bounds - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - } else { - while (ARCTICDB_UNLIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it))) { - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - break; - } + details::visit_type( + input_agg_column.column_->type().data_type(), + [this, + &output_it, + &output_end_it, + &bucket_aggregator, + &agg_column = input_agg_column, + &input_index_column = input_index_columns[idx], + &bucket_boundaries_end, + &string_pool, + &bucket_start_it, + &bucket_end_it, + ¤t_bucket, + &reached_end_of_buckets](auto input_type_desc_tag) { + using input_type_info = ScalarTypeInfo; + // Again, only needed to generate valid code below, exception will have been thrown earlier at runtime + if constexpr (are_input_output_operation_allowed()) { + schema::check( + !agg_column.column_->is_sparse() && agg_column.column_->row_count() == input_index_column->row_count(), + "Resample: Cannot aggregate column '{}' as it is sparse", + get_input_column_name().value); + auto index_data = input_index_column->data(); + const auto index_cend = index_data.cend(); + auto agg_data = agg_column.column_->data(); + auto agg_it = agg_data.cbegin(); + bool bucket_has_values = false; + for (auto index_it = index_data.cbegin(); index_it != index_cend && !reached_end_of_buckets; ++index_it, ++agg_it) { + if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { + push_to_aggregator(bucket_aggregator, *agg_it, agg_column); + bucket_has_values = true; + } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it)) && output_it != output_end_it) { + if (bucket_has_values) { + *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); + } + // The following code is equivalent to: + // if constexpr (closed_boundary == ResampleBoundary::LEFT) { + // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it); + // } else { + // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it, std::less_equal{}); + // } + // bucket_start_it = std::prev(bucket_end_it); + // reached_end_of_buckets = bucket_end_it == bucket_boundaries_end; + // The above code will be more performant when the vast majority of buckets are empty + // See comment in ResampleClause::advance_boundary_past_value for mathematical and experimental bounds + ++bucket_start_it; + if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { + reached_end_of_buckets = true; + } else { + while (ARCTICDB_UNLIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it))) { + ++bucket_start_it; + if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { + reached_end_of_buckets = true; + break; } } - if (ARCTICDB_LIKELY(!reached_end_of_buckets)) { - bucket_has_values = false; - current_bucket.set_boundaries(*bucket_start_it, *bucket_end_it); - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator(bucket_aggregator, *agg_it, agg_column); - bucket_has_values = true; - } + } + if (ARCTICDB_LIKELY(!reached_end_of_buckets)) { + bucket_has_values = false; + current_bucket.set_boundaries(*bucket_start_it, *bucket_end_it); + if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { + push_to_aggregator(bucket_aggregator, *agg_it, agg_column); + bucket_has_values = true; } } } } } - ); - } + } + ); } // We were in the middle of aggregating a bucket when we ran out of index values if (output_it != output_end_it) { @@ -237,51 +232,15 @@ template -[[nodiscard]] Column SortedAggregator::aggregate_dynamic_schema( - [[maybe_unused]] std::span> input_index_columns, - [[maybe_unused]] std::span> input_agg_columns, - [[maybe_unused]] std::span bucket_boundaries, - [[maybe_unused]] const Column& output_index_column, - [[maybe_unused]] StringPool& string_pool -) const { - return {}; -} - -template -Column SortedAggregator::aggregate(const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, - const Column& output_index_column, - StringPool& string_pool) const { - if (const std::optional common_input_type = generate_common_input_type(input_agg_columns)) { - return aggregate_static_schema( - input_index_columns, - input_agg_columns, - bucket_boundaries, - output_index_column, - string_pool, - *common_input_type); - } - return aggregate_dynamic_schema(input_index_columns, - input_agg_columns, - bucket_boundaries, - output_index_column, - string_pool); - -} - template std::optional SortedAggregator::generate_common_input_type( - const std::vector>& input_agg_columns + std::span input_agg_columns ) const { std::optional common_input_type; for (const auto& opt_input_agg_column: input_agg_columns) { - if (opt_input_agg_column.has_value()) { - const auto input_data_type = opt_input_agg_column->column_->type().data_type(); - check_aggregator_supported_with_data_type(input_data_type); - add_data_type_impl(input_data_type, common_input_type); - } + const auto input_data_type = opt_input_agg_column.column_->type().data_type(); + check_aggregator_supported_with_data_type(input_data_type); + add_data_type_impl(input_data_type, common_input_type); } return common_input_type; } diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index 9cb49ea729..fa23eae24e 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -30,7 +30,7 @@ struct ISortedAggregator { [[nodiscard]] ColumnName get_input_column_name() const { return folly::poly_call<0>(*this); }; [[nodiscard]] ColumnName get_output_column_name() const { return folly::poly_call<1>(*this); }; [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, + const std::vector& input_agg_columns, const std::vector& bucket_boundaries, const Column& output_index_column, StringPool& string_pool) const { @@ -339,25 +339,12 @@ class SortedAggregator [[nodiscard]] ColumnName get_output_column_name() const; [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, + const std::vector& input_agg_columns, const std::vector& bucket_boundaries, const Column& output_index_column, StringPool& string_pool) const; private: - [[nodiscard]] Column aggregate_static_schema( - std::span> input_index_columns, - std::span> input_agg_columns, - std::span bucket_boundaries, - const Column& output_index_column, - StringPool& string_pool, - DataType common_input_type) const; - [[nodiscard]] Column aggregate_dynamic_schema( - std::span> input_index_columns, - std::span> input_agg_columns, - std::span bucket_boundaries, - const Column& output_index_column, - StringPool& string_pool) const; - [[nodiscard]] std::optional generate_common_input_type(const std::vector>& input_agg_columns) const; + [[nodiscard]] std::optional generate_common_input_type(std::span input_agg_columns) const; void check_aggregator_supported_with_data_type(DataType data_type) const; [[nodiscard]] DataType generate_output_data_type(DataType common_input_data_type) const; [[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const; diff --git a/cpp/arcticdb/util/sparse_utils.hpp b/cpp/arcticdb/util/sparse_utils.hpp index 29c29fcaf2..1ba2bc05e4 100644 --- a/cpp/arcticdb/util/sparse_utils.hpp +++ b/cpp/arcticdb/util/sparse_utils.hpp @@ -70,11 +70,11 @@ inline void expand_dense_buffer_using_bitmap(const util::BitMagic &bv, const uin } template -inline void default_initialize(uint8_t* data, size_t bytes) { +void default_initialize(uint8_t* data, size_t bytes) { using RawType = typename TagType::DataTypeTag::raw_type; - const auto num_rows ARCTICDB_UNUSED = bytes / sizeof(RawType); + [[maybe_unused]] const auto num_rows = bytes / sizeof(RawType); constexpr auto data_type = TagType::DataTypeTag::data_type; - auto type_ptr ARCTICDB_UNUSED = reinterpret_cast(data); + [[maybe_unused]] auto type_ptr = reinterpret_cast(data); if constexpr (is_sequence_type(data_type)) { std::fill_n(type_ptr, num_rows, not_a_string()); } else if constexpr (is_floating_point_type(data_type)) { diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index eb442671f9..5c4449e1e1 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -921,8 +921,9 @@ folly::Future> read_and_process( std::make_shared>>(read_query->clauses_)) .via(&async::cpu_executor()) .thenValue([component_manager, read_query, pipeline_context](std::vector&& processed_entity_ids) { - auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); - + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>( + *component_manager, + std::move(processed_entity_ids)); if (ranges::any_of(read_query->clauses_, [](const std::shared_ptr& clause) { return clause->clause_info().modifies_output_descriptor_; })) { @@ -940,7 +941,7 @@ void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDes std::vector index_columns_to_add; for(const auto& index_column : index_columns) { - if(std::find(std::begin(*read_query.columns), std::end(*read_query.columns), index_column) == std::end(*read_query.columns)) + if(ranges::find(*read_query.columns, index_column) == std::end(*read_query.columns)) index_columns_to_add.emplace_back(index_column); } read_query.columns->insert(std::begin(*read_query.columns), std::begin(index_columns_to_add), std::end(index_columns_to_add)); diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index bb1d5afe9b..9b45baedd7 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -6,6 +6,8 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ from functools import partial +from typing import Union + import numpy as np import pandas as pd import datetime as dt @@ -17,12 +19,26 @@ from packaging.version import Version from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION import itertools +from pandas.api.types import is_float_dtype pytestmark = pytest.mark.pipeline ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"] +def default_aggregation_value(aggregation: str, dtype: Union[np.dtype, str]): + assert aggregation in ALL_AGGREGATIONS + if is_float_dtype(dtype): + return 0 if aggregation == "count" else np.nan + elif np.issubdtype(dtype, np.integer): + return np.nan if aggregation == "mean" else 0 + elif np.issubdtype(dtype, np.datetime64): + pass + elif np.issubdtype(dtype, np.str_): + pass + else: + raise "Unknown dtype" + def all_aggregations_dict(col): return {f"to_{agg}": (col, agg) for agg in ALL_AGGREGATIONS} @@ -846,20 +862,36 @@ def test_min_with_one_infinity_element(lmdb_version_store_v1): assert np.isneginf(lib.read(sym, query_builder=q).data['col_min'][0]) class TestDynamicSchema: - def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_dynamic_schema_v1): + @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64]) + def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_dynamic_schema_v1, dtype): + pd.set_option('display.max_columns', None) + pd.set_option('display.max_rows', None) + pd.set_option('display.width', None) + pd.set_option('display.max_columns', None) + lib = lmdb_version_store_dynamic_schema_v1 sym = "sym" idx = pd.date_range(pd.Timestamp(0), periods=20, freq='ns') - lib.write(sym, pd.DataFrame({"a": range(len(idx))}, index=idx)) + initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) + lib.write(sym, initial_df) - idx = pd.date_range(pd.Timestamp(20), periods=20, freq='ns') - lib.append(sym, pd.DataFrame({"a": range(len(idx)), "b": np.array(range(len(idx)), dtype=np.float32)}, index=idx)) + idx_to_append = pd.date_range(pd.Timestamp(40), periods=20, freq='ns') + df_to_append = pd.DataFrame({"a": range(len(idx)), "b": np.array(range(len(idx)), dtype=dtype)}, index=idx_to_append) + lib.append(sym, df_to_append) - data = lib.read(sym).data - print(data) - #res = data.resample("10ns").agg(None, **{"col_count":("b", "count")}) - #print(res) q = QueryBuilder() - q = q.resample('10ns').agg({"b_count": ("b", "min")}) - lib.read(sym, query_builder=q) \ No newline at end of file + q = q.resample('10ns').agg(all_aggregations_dict("b")) + arctic_resampled = lib.read(sym, query_builder=q).data + arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) + + expected_slice_with_missing_column_idx = [pd.Timestamp(0), pd.Timestamp(10)] + expected_slice_with_missing_column = pd.DataFrame({ + f"to_{agg}": [default_aggregation_value(agg, dtype) for _ in range(len(expected_slice_with_missing_column_idx))] for agg in ALL_AGGREGATIONS + }, index=expected_slice_with_missing_column_idx) + expected_slice_containing_column = df_to_append.resample('10ns').agg(None, **all_aggregations_dict("b")) + expected = pd.concat([expected_slice_with_missing_column, expected_slice_containing_column]) + expected = expected.reindex(columns=sorted(expected.columns)) + assert_frame_equal(arctic_resampled, expected, check_dtype=False) + + From baf4308fb4d26aa18a06028d789e12d5eeba574e Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Fri, 7 Feb 2025 12:47:03 +0200 Subject: [PATCH 05/10] Update python tests --- .../arcticdb/version_store/test_resample.py | 94 +++++++++++++++++-- 1 file changed, 86 insertions(+), 8 deletions(-) diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 9b45baedd7..3469d94a63 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -25,6 +25,7 @@ ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"] +DATETIME_AGGREGATIONS = ["mean", "min", "max", "first", "last", "count"] def default_aggregation_value(aggregation: str, dtype: Union[np.dtype, str]): assert aggregation in ALL_AGGREGATIONS @@ -33,15 +34,20 @@ def default_aggregation_value(aggregation: str, dtype: Union[np.dtype, str]): elif np.issubdtype(dtype, np.integer): return np.nan if aggregation == "mean" else 0 elif np.issubdtype(dtype, np.datetime64): - pass + return 0 if aggregation == "count" else pd.NaT elif np.issubdtype(dtype, np.str_): pass + elif pd.api.types.is_bool_dtype(dtype): + return np.nan if aggregation == "mean" else 0 else: raise "Unknown dtype" def all_aggregations_dict(col): return {f"to_{agg}": (col, agg) for agg in ALL_AGGREGATIONS} +def datetime_aggregations_dict(col): + return {f"to_{agg}": (col, agg) for agg in DATETIME_AGGREGATIONS} + # Pandas recommended way to resample and exclude buckets with no index values, which is our behaviour # See https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#sparse-resampling def round(t, freq): @@ -862,13 +868,8 @@ def test_min_with_one_infinity_element(lmdb_version_store_v1): assert np.isneginf(lib.read(sym, query_builder=q).data['col_min'][0]) class TestDynamicSchema: - @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64]) + @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool]) def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_dynamic_schema_v1, dtype): - pd.set_option('display.max_columns', None) - pd.set_option('display.max_rows', None) - pd.set_option('display.width', None) - pd.set_option('display.max_columns', None) - lib = lmdb_version_store_dynamic_schema_v1 sym = "sym" @@ -877,7 +878,8 @@ def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_d lib.write(sym, initial_df) idx_to_append = pd.date_range(pd.Timestamp(40), periods=20, freq='ns') - df_to_append = pd.DataFrame({"a": range(len(idx)), "b": np.array(range(len(idx)), dtype=dtype)}, index=idx_to_append) + data_to_append = {"a": range(len(idx_to_append)), "b": np.array(range(len(idx_to_append)), dtype=dtype)} + df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) lib.append(sym, df_to_append) q = QueryBuilder() @@ -894,4 +896,80 @@ def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_d expected = expected.reindex(columns=sorted(expected.columns)) assert_frame_equal(arctic_resampled, expected, check_dtype=False) + def test_missing_column_segment_does_not_cross_bucket_date(self, lmdb_version_store_dynamic_schema_v1): + # Datetime types do not support all aggregation types that is why they are separated in a separate test + lib = lmdb_version_store_dynamic_schema_v1 + sym = "sym" + + idx = pd.date_range(pd.Timestamp(0), periods=20, freq='ns') + initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) + lib.write(sym, initial_df) + + idx_to_append = pd.date_range(pd.Timestamp(40), periods=20, freq='ns') + data_to_append = {"a": range(len(idx_to_append)), "b": np.array([pd.Timestamp(i) for i in range(len(idx_to_append))])} + df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) + lib.append(sym, df_to_append) + + q = QueryBuilder() + q = q.resample('10ns').agg(datetime_aggregations_dict("b")) + arctic_resampled = lib.read(sym, query_builder=q).data + arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) + + expected_slice_with_missing_column_idx = [pd.Timestamp(0), pd.Timestamp(10)] + expected_slice_with_missing_column = pd.DataFrame({ + f"to_{agg}": [default_aggregation_value(agg, np.datetime64) for _ in range(len(expected_slice_with_missing_column_idx))] for agg in DATETIME_AGGREGATIONS + }, index=expected_slice_with_missing_column_idx) + expected_slice_containing_column = df_to_append.resample('10ns').agg(None, **datetime_aggregations_dict("b")) + expected = pd.concat([expected_slice_with_missing_column, expected_slice_containing_column]) + expected = expected.reindex(columns=sorted(expected.columns)) + assert_frame_equal(arctic_resampled, expected, check_dtype=False) + @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool]) + def test_date_range_select_missing_column(self, lmdb_version_store_dynamic_schema_v1, dtype): + lib = lmdb_version_store_dynamic_schema_v1 + sym = "sym" + + idx = pd.date_range(pd.Timestamp(0), periods=20, freq='ns') + initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) + lib.write(sym, initial_df) + + idx_to_append = pd.date_range(pd.Timestamp(40), periods=20, freq='ns') + data_to_append = {"a": range(len(idx_to_append)), "b": np.array(range(len(idx_to_append)), dtype=dtype)} + df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) + lib.append(sym, df_to_append) + q = QueryBuilder() + q = q.resample('10ns').agg(all_aggregations_dict("b")) + arctic_resampled = lib.read(sym, query_builder=q, date_range=(None, pd.Timestamp(20))).data + arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) + expected = pd.DataFrame({}, index=pd.DatetimeIndex([pd.Timestamp(0), pd.Timestamp(10)])) + assert_frame_equal(arctic_resampled, expected) + + @pytest.mark.parametrize("dtype", [np.float32]) + def test_bucket_spans_two_segments(self, lmdb_version_store_dynamic_schema_v1, dtype): + + pd.set_option('display.max_rows', None) + pd.set_option('display.max_columns', None) + pd.set_option('display.width', None) + pd.set_option('display.max_colwidth', None) + + lib = lmdb_version_store_dynamic_schema_v1 + sym = "sym" + + idx = pd.date_range(pd.Timestamp(1), periods=5, freq='ns') + initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) + lib.write(sym, initial_df) + + idx_to_append = pd.date_range(pd.Timestamp(6), periods=4, freq='ns') + data_to_append = {"a": range(len(idx_to_append)), "b": np.array(range(-len(idx_to_append),0), dtype=dtype)} + df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) + lib.append(sym, df_to_append) + + print() + print(lib.read(sym).data) + + q = QueryBuilder() + q = q.resample('10ns').agg({"mx": ("b", "max")}) + arctic_resampled = lib.read(sym, query_builder=q).data + arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) + print() + print(arctic_resampled) \ No newline at end of file From d31d33e786710dfa1f65266597c6557d737f0f3d Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Fri, 7 Feb 2025 17:38:05 +0200 Subject: [PATCH 06/10] Add comments. Split the sparse and dense aggregations (by sparse I mean aggregations where a column is missing in some slices). TODO: make it actually work. --- cpp/arcticdb/processing/clause.cpp | 13 +++++- .../processing/sorted_aggregation.cpp | 38 +++++++++++++---- .../processing/sorted_aggregation.hpp | 42 ++++++++++++++++++- 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index f82914a938..93138653b6 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -654,11 +654,16 @@ std::vector ResampleClause::process(std::vector input_agg_columns; input_agg_columns.reserve(row_slices.size()); + size_t slice_index = 0; + bm::bvector<> existing_columns; + existing_columns.init(); + existing_columns.resize(row_slices.size()); for (auto& row_slice: row_slices) { auto variant_data = row_slice.get(aggregator.get_input_column_name()); util::variant_match(variant_data, - [&input_agg_columns](const ColumnWithStrings& column_with_strings) { + [&input_agg_columns, &existing_columns, &slice_index](const ColumnWithStrings& column_with_strings) { input_agg_columns.emplace_back(column_with_strings); + existing_columns.set(slice_index); }, [](const EmptyResult&) { // Dynamic schema, missing column from this row-slice @@ -668,9 +673,13 @@ std::vector ResampleClause::process(std::vector("Unexpected return type from ProcessingUnit::get, expected column-like"); } ); + ++slice_index; } if (!input_agg_columns.empty()) { - auto aggregated_column = std::make_shared(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool)); + const bool has_missing_columns = existing_columns.is_all_one_range(0, row_slices.size()); + auto aggregated_column = has_missing_columns ? + std::make_shared(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool, existing_columns)) : + std::make_shared(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool)); const auto field = scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value); seg.add_column(field, std::move(aggregated_column)); } diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index e38c8ad2e1..3046d8d89e 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -118,11 +118,25 @@ auto finalize_aggregator( } template -Column SortedAggregator::aggregate(const std::vector>& input_index_columns, - const std::vector& input_agg_columns, - const std::vector& bucket_boundaries, - const Column& output_index_column, - StringPool& string_pool) const { +Column SortedAggregator::aggregate( + [[maybe_unused]] const std::vector>& input_index_columns, + [[maybe_unused]] const std::vector& input_agg_columns, + [[maybe_unused]] const std::vector& bucket_boundaries, + [[maybe_unused]] const Column& output_index_column, + [[maybe_unused]] StringPool& string_pool +) const { + return {}; +} + +template +Column SortedAggregator::aggregate( + const std::vector>& input_index_columns, + const std::vector& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool, + const bm::bvector<>& existing_columns +) const { std::optional common_input_type = generate_common_input_type(input_agg_columns); using IndexTDT = ScalarTagType>; Column res( @@ -138,6 +152,7 @@ Column SortedAggregator::aggregate(const &input_agg_columns, &bucket_boundaries, &string_pool, + &existing_columns, &res](auto output_type_desc_tag) { using output_type_info = ScalarTypeInfo; auto output_data = res.data(); @@ -151,15 +166,18 @@ Column SortedAggregator::aggregate(const auto bucket_end_it = std::next(bucket_start_it); Bucket current_bucket(*bucket_start_it, *bucket_end_it); const auto bucket_boundaries_end = bucket_boundaries.end(); - for (auto [idx, input_agg_column]: folly::enumerate(input_agg_columns)) { + auto existing_columns_begin = existing_columns.first(); + auto existing_columns_end = existing_columns.end(); + auto input_agg_column_it = input_agg_columns.begin(); + while (existing_columns_begin != existing_columns_end) { details::visit_type( - input_agg_column.column_->type().data_type(), + input_agg_column_it->column_->type().data_type(), [this, &output_it, &output_end_it, &bucket_aggregator, - &agg_column = input_agg_column, - &input_index_column = input_index_columns[idx], + &agg_column = *input_agg_column_it, + &input_index_column = input_index_columns[*existing_columns_begin], &bucket_boundaries_end, &string_pool, &bucket_start_it, @@ -221,6 +239,8 @@ Column SortedAggregator::aggregate(const } } ); + ++input_agg_column_it; + ++existing_columns_begin; } // We were in the middle of aggregating a bucket when we ran out of index values if (output_it != output_end_it) { diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index fa23eae24e..a52cd65798 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -36,10 +36,32 @@ struct ISortedAggregator { StringPool& string_pool) const { return folly::poly_call<2>(*this, input_index_columns, input_agg_columns, bucket_boundaries, output_index_column, string_pool); } + [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, + const std::vector& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool, + const bm::bvector<>& existing_columns) const { + return folly::poly_call<3>(*this, input_index_columns, input_agg_columns, bucket_boundaries, output_index_column, string_pool, existing_columns); + } }; template - using Members = folly::PolyMembers<&T::get_input_column_name, &T::get_output_column_name, &T::aggregate>; + using Members = folly::PolyMembers< + &T::get_input_column_name, + &T::get_output_column_name, + folly::sig>& input_index_columns, + const std::vector& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool)>(&T::aggregate), + folly::sig>& input_index_columns, + const std::vector& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool, + const bm::bvector<>& existing_columns)>(&T::aggregate) + >; }; using SortedAggregatorInterface = folly::Poly; @@ -338,6 +360,24 @@ class SortedAggregator [[nodiscard]] ColumnName get_input_column_name() const; [[nodiscard]] ColumnName get_output_column_name() const; + /// @brief Aggregate a single column over many row slices using a single aggregator defined by aggregation_operator + /// @param input_index_columns The index column for each segment. There will always be one column per segment + /// @param input_agg_columns The column which will be aggregated. For static schema there will be one column per + /// segment, thus the size of @p input_index_columns will be the same as @p input_agg_columns and i-th element + /// of @p input_index_columns will be the index for the i-th element of @p input_agg_columns. In case of dynamic + /// schema there might be less @p input_agg_columns than @p input_index_columns. In that case + /// @p existing_columns is used to pair existing columns and indexes + /// @param bucket_boundaries list of bucket boundaries where the i-th bucket is defined by bucket_boundaries[i] and + /// bucket_boundaries[i+1] + /// @param existing_columns bitset having the same size as @p input_index_columns, i-th bit is 1 if the column + /// exists in the i-th segment. The count of bits with value 1 is same as the size of @p input_agg_columns + [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, + const std::vector& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool, + const bm::bvector<>& existing_columns) const; + [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, const std::vector& input_agg_columns, const std::vector& bucket_boundaries, From 1c6ee0928d99bbbb06a9f8bec873123a30d5ba42 Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Tue, 11 Feb 2025 18:25:40 +0200 Subject: [PATCH 07/10] Refactor code so that the actual aggregation of a column is moved in a separate function --- cpp/arcticdb/column_store/column.hpp | 2 +- cpp/arcticdb/column_store/column_data.hpp | 15 +- .../processing/sorted_aggregation.cpp | 196 ++++++++++-------- .../processing/sorted_aggregation.hpp | 1 - 4 files changed, 114 insertions(+), 100 deletions(-) diff --git a/cpp/arcticdb/column_store/column.hpp b/cpp/arcticdb/column_store/column.hpp index 1d89cb3107..b7d2554626 100644 --- a/cpp/arcticdb/column_store/column.hpp +++ b/cpp/arcticdb/column_store/column.hpp @@ -497,7 +497,7 @@ class Column { return data_.bytes(); } - ColumnData data() const { + ColumnData data() const { return ColumnData(&data_.buffer(), &shapes_.buffer(), type_, sparse_map_ ? &*sparse_map_ : nullptr); } diff --git a/cpp/arcticdb/column_store/column_data.hpp b/cpp/arcticdb/column_store/column_data.hpp index 0270c4e39f..a7cad526de 100644 --- a/cpp/arcticdb/column_store/column_data.hpp +++ b/cpp/arcticdb/column_store/column_data.hpp @@ -166,16 +166,18 @@ struct ColumnData { ssize_t idx_{0}; RawType* ptr_{nullptr}; - [[nodiscard]] inline ssize_t idx() const { + Enumeration() = default; + + [[nodiscard]] ssize_t idx() const { return idx_; } - inline RawType& value() { + RawType& value() { debug::check(ptr_ != nullptr, "Dereferencing nullptr in enumerating ColumnDataIterator"); return *ptr_; }; - inline const RawType& value() const { + const RawType& value() const { debug::check(ptr_ != nullptr, "Dereferencing nullptr in enumerating ColumnDataIterator"); return *ptr_; }; @@ -188,14 +190,14 @@ struct ColumnData { }; template - using IteratorValueType_t = typename std::conditional_t< + using IteratorValueType_t = std::conditional_t< iterator_type == IteratorType::ENUMERATED, Enumeration, PointerWrapper >; template - using IteratorReferenceType_t = typename std::conditional_t< + using IteratorReferenceType_t = std::conditional_t< iterator_type == IteratorType::ENUMERATED, std::conditional_t, Enumeration>, std::conditional_t @@ -217,8 +219,7 @@ struct ColumnData { using base_type = base_iterator_type; using RawType = typename TDT::DataTypeTag::raw_type; public: - ColumnDataIterator() = delete; - + ColumnDataIterator() = default; // Used to construct [c]begin iterators explicit ColumnDataIterator(ColumnData* parent): parent_(parent) diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 3046d8d89e..be377aed10 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -9,9 +9,13 @@ #include #include #include +#include +#include namespace arcticdb { +namespace ranges = std::ranges; + template void push_to_aggregator(Aggregator& bucket_aggregator, T value, ARCTICDB_UNUSED const ColumnWithStrings& column_with_strings) { if constexpr(is_time_type(input_data_type) && aggregation_operator == AggregationOperator::COUNT) { @@ -117,6 +121,88 @@ auto finalize_aggregator( } } +template +bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return index_value >= bucket_end; + } else { + // closed_boundary == ResampleBoundary::RIGHT + return index_value > bucket_end; + } +} + +template +using ColumnSubrange = ranges::subrange().data().begin()), decltype(std::declval().data().end())>; + +template< + AggregationOperator aggregation_operator, + ResampleBoundary closed_boundary, + typename input_type_info, + typename output_type_info, + typename BucketAggregator = decltype(get_bucket_aggregator()) +> +void aggregate_column( + std::span bucket_boundaries, + Column& input_index_column, + const ColumnWithStrings& agg_column, + StringPool& string_pool, + BucketAggregator& bucket_aggregator, + ColumnSubrange& output +) { + using IndexTDT = ScalarTagType>; + bool reached_end_of_buckets{false}; + auto bucket_start_it = bucket_boundaries.begin(); + auto bucket_end_it = std::next(bucket_start_it); + const auto bucket_boundaries_end = bucket_boundaries.end(); + Bucket current_bucket(*bucket_start_it, *bucket_end_it); + auto index_data = input_index_column.data(); + const auto index_cend = index_data.cend(); + auto agg_data = agg_column.column_->data(); + auto agg_it = agg_data.cbegin(); + bool bucket_has_values = false; + for (auto index_it = index_data.cbegin(); index_it != index_cend && !reached_end_of_buckets; ++index_it, ++agg_it) { + if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { + push_to_aggregator(bucket_aggregator, *agg_it, agg_column); + bucket_has_values = true; + } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it)) && !output.empty()) { + if (bucket_has_values) { + *output.begin() = finalize_aggregator(bucket_aggregator, string_pool); + output.advance(1); + } + // The following code is equivalent to: + // if constexpr (closed_boundary == ResampleBoundary::LEFT) { + // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it); + // } else { + // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it, std::less_equal{}); + // } + // bucket_start_it = std::prev(bucket_end_it); + // reached_end_of_buckets = bucket_end_it == bucket_boundaries_end; + // The above code will be more performant when the vast majority of buckets are empty + // See comment in ResampleClause::advance_boundary_past_value for mathematical and experimental bounds + ++bucket_start_it; + if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { + reached_end_of_buckets = true; + } else { + while (ARCTICDB_UNLIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it))) { + ++bucket_start_it; + if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { + reached_end_of_buckets = true; + break; + } + } + } + if (ARCTICDB_LIKELY(!reached_end_of_buckets)) { + bucket_has_values = false; + current_bucket.set_boundaries(*bucket_start_it, *bucket_end_it); + if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { + push_to_aggregator(bucket_aggregator, *agg_it, agg_column); + bucket_has_values = true; + } + } + } + } +} + template Column SortedAggregator::aggregate( [[maybe_unused]] const std::vector>& input_index_columns, @@ -138,7 +224,6 @@ Column SortedAggregator::aggregate( const bm::bvector<>& existing_columns ) const { std::optional common_input_type = generate_common_input_type(input_agg_columns); - using IndexTDT = ScalarTagType>; Column res( TypeDescriptor(generate_output_data_type(*common_input_type), Dimension::Dim0), output_index_column.row_count(), @@ -147,95 +232,33 @@ Column SortedAggregator::aggregate( ); details::visit_type( res.type().data_type(), - [this, - &input_index_columns, - &input_agg_columns, - &bucket_boundaries, - &string_pool, - &existing_columns, - &res](auto output_type_desc_tag) { + [&](auto output_type_desc_tag) { using output_type_info = ScalarTypeInfo; - auto output_data = res.data(); - auto output_it = output_data.begin(); - auto output_end_it = output_data.end(); // Need this here to only generate valid get_bucket_aggregator code, exception will have been thrown earlier at runtime if constexpr (is_output_type_allowed()) { + auto output_data = res.data(); + ranges::subrange output(output_data.begin(), output_data.end()); auto bucket_aggregator = get_bucket_aggregator(); - bool reached_end_of_buckets{false}; - auto bucket_start_it = bucket_boundaries.begin(); - auto bucket_end_it = std::next(bucket_start_it); - Bucket current_bucket(*bucket_start_it, *bucket_end_it); - const auto bucket_boundaries_end = bucket_boundaries.end(); auto existing_columns_begin = existing_columns.first(); - auto existing_columns_end = existing_columns.end(); + const auto existing_columns_end = existing_columns.end(); auto input_agg_column_it = input_agg_columns.begin(); while (existing_columns_begin != existing_columns_end) { + schema::check( + !input_agg_column_it->column_->is_sparse() && input_agg_column_it->column_->row_count() == input_index_columns[*existing_columns_begin]->row_count(), + "Resample: Cannot aggregate column '{}' as it is sparse", + get_input_column_name().value); details::visit_type( input_agg_column_it->column_->type().data_type(), - [this, - &output_it, - &output_end_it, - &bucket_aggregator, - &agg_column = *input_agg_column_it, - &input_index_column = input_index_columns[*existing_columns_begin], - &bucket_boundaries_end, - &string_pool, - &bucket_start_it, - &bucket_end_it, - ¤t_bucket, - &reached_end_of_buckets](auto input_type_desc_tag) { + [&, &agg_column=*input_agg_column_it, &input_index_column=input_index_columns[*existing_columns_begin]](auto input_type_desc_tag) { using input_type_info = ScalarTypeInfo; - // Again, only needed to generate valid code below, exception will have been thrown earlier at runtime if constexpr (are_input_output_operation_allowed()) { - schema::check( - !agg_column.column_->is_sparse() && agg_column.column_->row_count() == input_index_column->row_count(), - "Resample: Cannot aggregate column '{}' as it is sparse", - get_input_column_name().value); - auto index_data = input_index_column->data(); - const auto index_cend = index_data.cend(); - auto agg_data = agg_column.column_->data(); - auto agg_it = agg_data.cbegin(); - bool bucket_has_values = false; - for (auto index_it = index_data.cbegin(); index_it != index_cend && !reached_end_of_buckets; ++index_it, ++agg_it) { - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator(bucket_aggregator, *agg_it, agg_column); - bucket_has_values = true; - } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it)) && output_it != output_end_it) { - if (bucket_has_values) { - *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); - } - // The following code is equivalent to: - // if constexpr (closed_boundary == ResampleBoundary::LEFT) { - // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it); - // } else { - // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it, std::less_equal{}); - // } - // bucket_start_it = std::prev(bucket_end_it); - // reached_end_of_buckets = bucket_end_it == bucket_boundaries_end; - // The above code will be more performant when the vast majority of buckets are empty - // See comment in ResampleClause::advance_boundary_past_value for mathematical and experimental bounds - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - } else { - while (ARCTICDB_UNLIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it))) { - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - break; - } - } - } - if (ARCTICDB_LIKELY(!reached_end_of_buckets)) { - bucket_has_values = false; - current_bucket.set_boundaries(*bucket_start_it, *bucket_end_it); - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator(bucket_aggregator, *agg_it, agg_column); - bucket_has_values = true; - } - } - } - } + aggregate_column( + bucket_boundaries, + *input_index_column, + agg_column, + string_pool, + bucket_aggregator, + output); } } ); @@ -243,8 +266,9 @@ Column SortedAggregator::aggregate( ++existing_columns_begin; } // We were in the middle of aggregating a bucket when we ran out of index values - if (output_it != output_end_it) { - *output_it++ = finalize_aggregator(bucket_aggregator, string_pool); + if (!output.empty()) { + *output.begin() = finalize_aggregator(bucket_aggregator, string_pool); + output.advance(1); } } } @@ -301,16 +325,6 @@ DataType SortedAggregator::generate_outpu return output_type; } -template -bool SortedAggregator::index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const { - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - return index_value >= bucket_end; - } else { - // closed_boundary == ResampleBoundary::RIGHT - return index_value > bucket_end; - } -} - template class SortedAggregator; template class SortedAggregator; template class SortedAggregator; diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index a52cd65798..1811e5ccff 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -387,7 +387,6 @@ class SortedAggregator [[nodiscard]] std::optional generate_common_input_type(std::span input_agg_columns) const; void check_aggregator_supported_with_data_type(DataType data_type) const; [[nodiscard]] DataType generate_output_data_type(DataType common_input_data_type) const; - [[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const; ColumnName input_column_name_; ColumnName output_column_name_; From d40bf206f1cfab0322e7db24f252e3837b029eac Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Mon, 17 Feb 2025 10:59:22 +0200 Subject: [PATCH 08/10] Improve testing to include datetime and string for resampling with dynamic schema. Fix if statement deciding whether to do sparse or dense aggregation --- cpp/arcticdb/processing/clause.cpp | 2 +- .../processing/sorted_aggregation.cpp | 75 +++++++--- .../arcticdb/version_store/test_resample.py | 129 ++++++++---------- 3 files changed, 116 insertions(+), 90 deletions(-) diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 93138653b6..db34436051 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -676,7 +676,7 @@ std::vector ResampleClause::process(std::vector(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool, existing_columns)) : std::make_shared(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool)); diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index be377aed10..9a951ddb00 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -143,7 +143,7 @@ template< > void aggregate_column( std::span bucket_boundaries, - Column& input_index_column, + const Column& input_index_column, const ColumnWithStrings& agg_column, StringPool& string_pool, BucketAggregator& bucket_aggregator, @@ -205,13 +205,56 @@ void aggregate_column( template Column SortedAggregator::aggregate( - [[maybe_unused]] const std::vector>& input_index_columns, - [[maybe_unused]] const std::vector& input_agg_columns, - [[maybe_unused]] const std::vector& bucket_boundaries, - [[maybe_unused]] const Column& output_index_column, - [[maybe_unused]] StringPool& string_pool + const std::vector>& input_index_columns, + const std::vector& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool ) const { - return {}; + const DataType common_input_type = generate_common_input_type(input_agg_columns).value(); + const TypeDescriptor td(generate_output_data_type(common_input_type), Dimension::Dim0); + Column res(td, output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED); + details::visit_type( + res.type().data_type(), + [&](output_type_desc_tag) { + using output_type_info = ScalarTypeInfo; + // Need this here to only generate valid get_bucket_aggregator code, exception will have been thrown earlier at runtime + if constexpr (is_output_type_allowed()) { + auto output_data = res.data(); + ranges::subrange output(output_data.begin(), output_data.end()); + auto bucket_aggregator = get_bucket_aggregator(); + for (size_t i = 0; i < input_agg_columns.size(); ++i) { + const ColumnWithStrings& agg_column = input_agg_columns[i]; + const Column& input_index_column = *input_index_columns[i]; + schema::check( + !agg_column.column_->is_sparse() && agg_column.column_->row_count() == input_index_column.row_count(), + "Resample: Cannot aggregate column '{}' as it is sparse", + get_input_column_name().value); + details::visit_type( + agg_column.column_->type().data_type(), + [&](input_type_desc_tag) { + using input_type_info = ScalarTypeInfo; + if constexpr (are_input_output_operation_allowed()) { + aggregate_column( + bucket_boundaries, + input_index_column, + agg_column, + string_pool, + bucket_aggregator, + output); + } + } + ); + } + // We were in the middle of aggregating a bucket when we ran out of index values + if (!output.empty()) { + *output.begin() = finalize_aggregator(bucket_aggregator, string_pool); + output.advance(1); + } + } + } + ); + return res; } template @@ -223,17 +266,13 @@ Column SortedAggregator::aggregate( StringPool& string_pool, const bm::bvector<>& existing_columns ) const { - std::optional common_input_type = generate_common_input_type(input_agg_columns); - Column res( - TypeDescriptor(generate_output_data_type(*common_input_type), Dimension::Dim0), - output_index_column.row_count(), - AllocationType::PRESIZED, - Sparsity::NOT_PERMITTED - ); + const DataType common_input_type = generate_common_input_type(input_agg_columns).value(); + const TypeDescriptor td(generate_output_data_type(common_input_type), Dimension::Dim0); + Column res(td, output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED); details::visit_type( res.type().data_type(), - [&](auto output_type_desc_tag) { - using output_type_info = ScalarTypeInfo; + [&](output_type_desc_tag) { + using output_type_info = ScalarTypeInfo; // Need this here to only generate valid get_bucket_aggregator code, exception will have been thrown earlier at runtime if constexpr (is_output_type_allowed()) { auto output_data = res.data(); @@ -249,8 +288,8 @@ Column SortedAggregator::aggregate( get_input_column_name().value); details::visit_type( input_agg_column_it->column_->type().data_type(), - [&, &agg_column=*input_agg_column_it, &input_index_column=input_index_columns[*existing_columns_begin]](auto input_type_desc_tag) { - using input_type_info = ScalarTypeInfo; + [&, &agg_column=*input_agg_column_it, &input_index_column=input_index_columns[*existing_columns_begin]](input_type_desc_tag) { + using input_type_info = ScalarTypeInfo; if constexpr (are_input_output_operation_allowed()) { aggregate_column( bucket_boundaries, diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 3469d94a63..95d836b992 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -19,13 +19,41 @@ from packaging.version import Version from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION import itertools -from pandas.api.types import is_float_dtype +from pandas.api.types import ( + is_float_dtype, + is_datetime64_ns_dtype as is_datetime_dtype, + is_integer_dtype, + is_string_dtype, + is_bool_dtype +) +import string pytestmark = pytest.mark.pipeline -ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"] -DATETIME_AGGREGATIONS = ["mean", "min", "max", "first", "last", "count"] +ALL_AGGREGATIONS = {"sum", "mean", "min", "max", "first", "last", "count"} + +def rand_data(dtype, count): + rng = np.random.default_rng(12345) + if is_float_dtype(dtype) or is_integer_dtype(dtype) or is_datetime_dtype(dtype): + return rng.integers(0, 100, count).astype(dtype) + elif is_string_dtype(dtype): + return np.array([''.join(rng.choice(list(string.ascii_letters), size=10)) for _ in range(count)]) + elif is_bool_dtype(dtype): + return rng.choice([True, False], size=count).astype(dtype) + else: + raise BaseException(f"Unknown dtype {dtype}") + + +def all_aggregations(dtype=None): + result = ALL_AGGREGATIONS.copy() + if dtype is None: + return result + elif is_datetime_dtype(dtype): + result.remove("sum") + elif is_string_dtype(dtype): + result.difference_update({"sum", "mean", "min", "max"}) + return result def default_aggregation_value(aggregation: str, dtype: Union[np.dtype, str]): assert aggregation in ALL_AGGREGATIONS @@ -36,17 +64,15 @@ def default_aggregation_value(aggregation: str, dtype: Union[np.dtype, str]): elif np.issubdtype(dtype, np.datetime64): return 0 if aggregation == "count" else pd.NaT elif np.issubdtype(dtype, np.str_): - pass + return 0 if aggregation == "count" else None elif pd.api.types.is_bool_dtype(dtype): return np.nan if aggregation == "mean" else 0 else: raise "Unknown dtype" -def all_aggregations_dict(col): - return {f"to_{agg}": (col, agg) for agg in ALL_AGGREGATIONS} - -def datetime_aggregations_dict(col): - return {f"to_{agg}": (col, agg) for agg in DATETIME_AGGREGATIONS} +def all_aggregations_dict(col, dtype=None): + aggregations = all_aggregations(dtype) + return {f"to_{agg}": (col, agg) for agg in aggregations} # Pandas recommended way to resample and exclude buckets with no index values, which is our behaviour # See https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#sparse-resampling @@ -502,24 +528,6 @@ def test_resampling_unsupported_aggregation_type_combos(lmdb_version_store_v1): lib.read(sym, query_builder=q) -def test_resampling_dynamic_schema_missing_column(lmdb_version_store_dynamic_schema_v1): - lib = lmdb_version_store_dynamic_schema_v1 - sym = "test_resampling_dynamic_schema_missing_column" - - lib.write(sym, pd.DataFrame({"col_0": [0]}, index=[pd.Timestamp(0)])) - lib.append(sym, pd.DataFrame({"col_1": [1000]}, index=[pd.Timestamp(2000)])) - - # Schema exception should be thrown regardless of whether there are any buckets that span segments or not - q = QueryBuilder() - q = q.resample("us").agg({"col_0": "sum"}) - with pytest.raises(SchemaException): - lib.read(sym, query_builder=q) - - q = QueryBuilder() - q = q.resample("s").agg({"col_1": "sum"}) - with pytest.raises(SchemaException): - lib.read(sym, query_builder=q) - def test_resampling_sparse_data(lmdb_version_store_v1): lib = lmdb_version_store_v1 @@ -868,62 +876,36 @@ def test_min_with_one_infinity_element(lmdb_version_store_v1): assert np.isneginf(lib.read(sym, query_builder=q).data['col_min'][0]) class TestDynamicSchema: - @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool]) + @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool, "datetime64[ns]", str]) def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_dynamic_schema_v1, dtype): lib = lmdb_version_store_dynamic_schema_v1 sym = "sym" idx = pd.date_range(pd.Timestamp(0), periods=20, freq='ns') - initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) + initial_df = pd.DataFrame({"a": rand_data(dtype, len(idx))}, index=idx) lib.write(sym, initial_df) idx_to_append = pd.date_range(pd.Timestamp(40), periods=20, freq='ns') - data_to_append = {"a": range(len(idx_to_append)), "b": np.array(range(len(idx_to_append)), dtype=dtype)} + data_to_append = {"a": rand_data(dtype, len(idx)), "b": rand_data(dtype, len(idx))} df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) lib.append(sym, df_to_append) q = QueryBuilder() - q = q.resample('10ns').agg(all_aggregations_dict("b")) + q = q.resample('10ns').agg(all_aggregations_dict("b", dtype)) arctic_resampled = lib.read(sym, query_builder=q).data arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) expected_slice_with_missing_column_idx = [pd.Timestamp(0), pd.Timestamp(10)] expected_slice_with_missing_column = pd.DataFrame({ - f"to_{agg}": [default_aggregation_value(agg, dtype) for _ in range(len(expected_slice_with_missing_column_idx))] for agg in ALL_AGGREGATIONS + f"to_{agg}": [default_aggregation_value(agg, dtype) for _ in range(len(expected_slice_with_missing_column_idx))] for agg in all_aggregations(dtype) }, index=expected_slice_with_missing_column_idx) - expected_slice_containing_column = df_to_append.resample('10ns').agg(None, **all_aggregations_dict("b")) + expected_slice_containing_column = df_to_append.resample('10ns').agg(None, **all_aggregations_dict("b", dtype)) expected = pd.concat([expected_slice_with_missing_column, expected_slice_containing_column]) expected = expected.reindex(columns=sorted(expected.columns)) assert_frame_equal(arctic_resampled, expected, check_dtype=False) - def test_missing_column_segment_does_not_cross_bucket_date(self, lmdb_version_store_dynamic_schema_v1): - # Datetime types do not support all aggregation types that is why they are separated in a separate test - lib = lmdb_version_store_dynamic_schema_v1 - sym = "sym" - - idx = pd.date_range(pd.Timestamp(0), periods=20, freq='ns') - initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) - lib.write(sym, initial_df) - - idx_to_append = pd.date_range(pd.Timestamp(40), periods=20, freq='ns') - data_to_append = {"a": range(len(idx_to_append)), "b": np.array([pd.Timestamp(i) for i in range(len(idx_to_append))])} - df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) - lib.append(sym, df_to_append) - - q = QueryBuilder() - q = q.resample('10ns').agg(datetime_aggregations_dict("b")) - arctic_resampled = lib.read(sym, query_builder=q).data - arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) - - expected_slice_with_missing_column_idx = [pd.Timestamp(0), pd.Timestamp(10)] - expected_slice_with_missing_column = pd.DataFrame({ - f"to_{agg}": [default_aggregation_value(agg, np.datetime64) for _ in range(len(expected_slice_with_missing_column_idx))] for agg in DATETIME_AGGREGATIONS - }, index=expected_slice_with_missing_column_idx) - expected_slice_containing_column = df_to_append.resample('10ns').agg(None, **datetime_aggregations_dict("b")) - expected = pd.concat([expected_slice_with_missing_column, expected_slice_containing_column]) - expected = expected.reindex(columns=sorted(expected.columns)) - assert_frame_equal(arctic_resampled, expected, check_dtype=False) - @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool]) + @pytest.mark.xfail(reason="Not decided whether to backfill or to leave empty") + @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool, "datetime64[ns]", str]) def test_date_range_select_missing_column(self, lmdb_version_store_dynamic_schema_v1, dtype): lib = lmdb_version_store_dynamic_schema_v1 sym = "sym" @@ -944,7 +926,7 @@ def test_date_range_select_missing_column(self, lmdb_version_store_dynamic_schem expected = pd.DataFrame({}, index=pd.DatetimeIndex([pd.Timestamp(0), pd.Timestamp(10)])) assert_frame_equal(arctic_resampled, expected) - @pytest.mark.parametrize("dtype", [np.float32]) + @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool, "datetime64[ns]", str]) def test_bucket_spans_two_segments(self, lmdb_version_store_dynamic_schema_v1, dtype): pd.set_option('display.max_rows', None) @@ -955,21 +937,26 @@ def test_bucket_spans_two_segments(self, lmdb_version_store_dynamic_schema_v1, d lib = lmdb_version_store_dynamic_schema_v1 sym = "sym" - idx = pd.date_range(pd.Timestamp(1), periods=5, freq='ns') - initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) + idx = pd.date_range(pd.Timestamp(0), periods=5, freq='ns') + initial_df = pd.DataFrame({"a": rand_data(dtype, len(idx))}, index=idx) lib.write(sym, initial_df) idx_to_append = pd.date_range(pd.Timestamp(6), periods=4, freq='ns') - data_to_append = {"a": range(len(idx_to_append)), "b": np.array(range(-len(idx_to_append),0), dtype=dtype)} + data_to_append = {"a": rand_data(dtype, len(idx_to_append)), "b": rand_data(dtype, len(idx_to_append))} df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) lib.append(sym, df_to_append) - print() - print(lib.read(sym).data) - q = QueryBuilder() - q = q.resample('10ns').agg({"mx": ("b", "max")}) + q = q.resample('10ns').agg(all_aggregations_dict("b", dtype)) arctic_resampled = lib.read(sym, query_builder=q).data arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) - print() - print(arctic_resampled) \ No newline at end of file + + # Resampling is performed on column b which is introduced in the second segment. That is why pandas resampling + # is performed only on the second segment. Another option would be to read the whole dataframe and run pandas + # resampling on it. However, this breaks for integer dtypes as they get backfilled (when they are missing from + # a segment) pandas will then pick this 0 as existing value this might affect all aggregations e.g. (pandas will + # count it while ArcticDB will igrnore it in the count clause) + expected = df_to_append.resample('10ns').agg(None, **all_aggregations_dict("b", dtype)) + expected = expected.reindex(columns=sorted(expected.columns)) + + assert_frame_equal(arctic_resampled, expected, check_dtype=False) From 7b6b64dea718b394c34d652b2c810c1daf7c572a Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Tue, 25 Feb 2025 16:34:07 +0200 Subject: [PATCH 09/10] Extend tests for resampling with dynamic schema --- .../processing/sorted_aggregation.cpp | 53 ++++---- .../processing/sorted_aggregation.hpp | 1 - .../arcticdb/version_store/test_resample.py | 120 +++++++++++++----- 3 files changed, 114 insertions(+), 60 deletions(-) diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 9a951ddb00..44834b6104 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -88,8 +88,29 @@ template && arcticdb::util::is_instantiation_of_v consteval bool are_input_output_operation_allowed() { return (is_numeric_type(input_type_info::data_type) && is_numeric_type(output_type_info::data_type)) || - (is_sequence_type(input_type_info::data_type) && (is_sequence_type(output_type_info::data_type) || aggregation_operator == AggregationOperator::COUNT)) || - (is_bool_type(input_type_info::data_type) && (is_bool_type(output_type_info::data_type) || is_numeric_type(output_type_info::data_type))); + (is_sequence_type(input_type_info::data_type) && (is_sequence_type(output_type_info::data_type) || aggregation_operator == AggregationOperator::COUNT)) || + (is_bool_type(input_type_info::data_type) && (is_bool_type(output_type_info::data_type) || is_numeric_type(output_type_info::data_type))); +} + +template +DataType generate_output_data_type(const DataType common_input_data_type) { + if constexpr (aggregation_operator == AggregationOperator::SUM) { + // Deal with overflow as best we can + if (is_unsigned_type(common_input_data_type) || is_bool_type(common_input_data_type)) { + return DataType::UINT64; + } else if (is_signed_type(common_input_data_type)) { + return DataType::INT64; + } else if (is_floating_point_type(common_input_data_type)) { + return DataType::FLOAT64; + } + } else if constexpr (aggregation_operator == AggregationOperator::MEAN) { + if (!is_time_type(common_input_data_type)) { + return DataType::FLOAT64; + } + } else if constexpr (aggregation_operator == AggregationOperator::COUNT) { + return DataType::UINT64; + } + return common_input_data_type; } template @@ -212,7 +233,7 @@ Column SortedAggregator::aggregate( StringPool& string_pool ) const { const DataType common_input_type = generate_common_input_type(input_agg_columns).value(); - const TypeDescriptor td(generate_output_data_type(common_input_type), Dimension::Dim0); + const TypeDescriptor td(generate_output_data_type(common_input_type), Dimension::Dim0); Column res(td, output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED); details::visit_type( res.type().data_type(), @@ -267,7 +288,7 @@ Column SortedAggregator::aggregate( const bm::bvector<>& existing_columns ) const { const DataType common_input_type = generate_common_input_type(input_agg_columns).value(); - const TypeDescriptor td(generate_output_data_type(common_input_type), Dimension::Dim0); + const TypeDescriptor td(generate_output_data_type(common_input_type), Dimension::Dim0); Column res(td, output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED); details::visit_type( res.type().data_type(), @@ -317,7 +338,7 @@ Column SortedAggregator::aggregate( template std::optional SortedAggregator::generate_common_input_type( - std::span input_agg_columns + const std::span input_agg_columns ) const { std::optional common_input_type; for (const auto& opt_input_agg_column: input_agg_columns) { @@ -342,28 +363,6 @@ void SortedAggregator::check_aggregator_s aggregation_operator, get_input_column_name().value, data_type); } -template -DataType SortedAggregator::generate_output_data_type(DataType common_input_data_type) const { - DataType output_type{common_input_data_type}; - if constexpr (aggregation_operator == AggregationOperator::SUM) { - // Deal with overflow as best we can - if (is_unsigned_type(common_input_data_type) || is_bool_type(common_input_data_type)) { - output_type = DataType::UINT64; - } else if (is_signed_type(common_input_data_type)) { - output_type = DataType::INT64; - } else if (is_floating_point_type(common_input_data_type)) { - output_type = DataType::FLOAT64; - } - } else if constexpr (aggregation_operator == AggregationOperator::MEAN) { - if (!is_time_type(common_input_data_type)) { - output_type = DataType::FLOAT64; - } - } else if constexpr (aggregation_operator == AggregationOperator::COUNT) { - output_type = DataType::UINT64; - } - return output_type; -} - template class SortedAggregator; template class SortedAggregator; template class SortedAggregator; diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index 1811e5ccff..f0d118aa6f 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -386,7 +386,6 @@ class SortedAggregator private: [[nodiscard]] std::optional generate_common_input_type(std::span input_agg_columns) const; void check_aggregator_supported_with_data_type(DataType data_type) const; - [[nodiscard]] DataType generate_output_data_type(DataType common_input_data_type) const; ColumnName input_column_name_; ColumnName output_column_name_; diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 95d836b992..e502da3f4d 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -6,7 +6,7 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ from functools import partial -from typing import Union +from typing import Union, Optional import numpy as np import pandas as pd @@ -45,7 +45,16 @@ def rand_data(dtype, count): raise BaseException(f"Unknown dtype {dtype}") -def all_aggregations(dtype=None): +def all_aggregations(dtype: Union[np.dtype, str] = None) -> set: + """ + Get a set of all aggregation operations supported for a given dtype. + + Parameters + ---------- + dtype : Union[np.dtype, str] + Dtype to generate supported aggregations for. If None returns all aggregation operations. + """ + result = ALL_AGGREGATIONS.copy() if dtype is None: return result @@ -56,6 +65,13 @@ def all_aggregations(dtype=None): return result def default_aggregation_value(aggregation: str, dtype: Union[np.dtype, str]): + """ + Get the default value for an aggregation operation. This is used to back-fill Pandas dataframes when they are + compared against dynamic schema libraries. Dynamic schema allows for missing columns but Pandas does not. ArcticDB + will put these default values everywhere where we have an index value in a bucket but the segment does not contain + column data. + """ + assert aggregation in ALL_AGGREGATIONS if is_float_dtype(dtype): return 0 if aggregation == "count" else np.nan @@ -70,7 +86,21 @@ def default_aggregation_value(aggregation: str, dtype: Union[np.dtype, str]): else: raise "Unknown dtype" -def all_aggregations_dict(col, dtype=None): +def all_aggregations_dict(col: str, dtype: Optional[Union[str, np.dtype]] = None): + """ + Generate a dict that can be used as an argument to both QueryBuilder and Pandas resample methods. The dict will + contain all possible aggregation operations for a column + + Parameters + ---------- + + col: str + The name of the column to create aggregation dict for + dtype: Optional[Union[str, np.dtype]] + The dtype of the column to create aggregation operations dict for. Some dtypes support only a subset of the + aggregations. If None it creates a dict with all possible aggregations. + """ + aggregations = all_aggregations(dtype) return {f"to_{agg}": (col, agg) for agg in aggregations} @@ -896,44 +926,19 @@ def test_missing_column_segment_does_not_cross_bucket(self, lmdb_version_store_d arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) expected_slice_with_missing_column_idx = [pd.Timestamp(0), pd.Timestamp(10)] + # Pandas has no equivalent of dynamic schema and expects that the column will always be there, thus for the + # buckets where the column is missing we have to place the values manually by back-filling the column using the + # default value based on the type and the aggregator expected_slice_with_missing_column = pd.DataFrame({ - f"to_{agg}": [default_aggregation_value(agg, dtype) for _ in range(len(expected_slice_with_missing_column_idx))] for agg in all_aggregations(dtype) + f"to_{agg}": [default_aggregation_value(agg, dtype) for _ in expected_slice_with_missing_column_idx] for agg in all_aggregations(dtype) }, index=expected_slice_with_missing_column_idx) expected_slice_containing_column = df_to_append.resample('10ns').agg(None, **all_aggregations_dict("b", dtype)) expected = pd.concat([expected_slice_with_missing_column, expected_slice_containing_column]) expected = expected.reindex(columns=sorted(expected.columns)) assert_frame_equal(arctic_resampled, expected, check_dtype=False) - @pytest.mark.xfail(reason="Not decided whether to backfill or to leave empty") - @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool, "datetime64[ns]", str]) - def test_date_range_select_missing_column(self, lmdb_version_store_dynamic_schema_v1, dtype): - lib = lmdb_version_store_dynamic_schema_v1 - sym = "sym" - - idx = pd.date_range(pd.Timestamp(0), periods=20, freq='ns') - initial_df = pd.DataFrame({"a": range(len(idx))}, index=idx) - lib.write(sym, initial_df) - - idx_to_append = pd.date_range(pd.Timestamp(40), periods=20, freq='ns') - data_to_append = {"a": range(len(idx_to_append)), "b": np.array(range(len(idx_to_append)), dtype=dtype)} - df_to_append = pd.DataFrame(data_to_append, index=idx_to_append) - lib.append(sym, df_to_append) - - q = QueryBuilder() - q = q.resample('10ns').agg(all_aggregations_dict("b")) - arctic_resampled = lib.read(sym, query_builder=q, date_range=(None, pd.Timestamp(20))).data - arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) - expected = pd.DataFrame({}, index=pd.DatetimeIndex([pd.Timestamp(0), pd.Timestamp(10)])) - assert_frame_equal(arctic_resampled, expected) - @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool, "datetime64[ns]", str]) def test_bucket_spans_two_segments(self, lmdb_version_store_dynamic_schema_v1, dtype): - - pd.set_option('display.max_rows', None) - pd.set_option('display.max_columns', None) - pd.set_option('display.width', None) - pd.set_option('display.max_colwidth', None) - lib = lmdb_version_store_dynamic_schema_v1 sym = "sym" @@ -960,3 +965,54 @@ def test_bucket_spans_two_segments(self, lmdb_version_store_dynamic_schema_v1, d expected = expected.reindex(columns=sorted(expected.columns)) assert_frame_equal(arctic_resampled, expected, check_dtype=False) + + @pytest.mark.parametrize("dtype", [np.float32, np.float64, np.uint32, np.int32, np.int64, bool, "datetime64[ns]", str]) + def test_bucket_spans_multiple_segments(self, lmdb_version_store_dynamic_schema_v1, dtype): + lib = lmdb_version_store_dynamic_schema_v1 + sym = "sym" + + data = [ + pd.DataFrame({"a": rand_data(dtype, 1)}, index=pd.DatetimeIndex([pd.Timestamp(0)])), + pd.DataFrame({"b": rand_data(dtype, 1)}, index=pd.DatetimeIndex([pd.Timestamp(1)])), + pd.DataFrame({"a": rand_data(dtype, 1)}, index=pd.DatetimeIndex([pd.Timestamp(2)])), + pd.DataFrame({"a": rand_data(dtype, 1)}, index=pd.DatetimeIndex([pd.Timestamp(3)])), + pd.DataFrame({"b": rand_data(dtype, 1)}, index=pd.DatetimeIndex([pd.Timestamp(4)])), + pd.DataFrame({"b": rand_data(dtype, 1)}, index=pd.DatetimeIndex([pd.Timestamp(5)])), + pd.DataFrame({"a": rand_data(dtype, 3), "b": rand_data(dtype, 3)}, index=pd.date_range(pd.Timestamp(5), periods=3, freq='ns')) + ] + for df in data: + lib.append(sym, df) + + pandas_to_resample = pd.concat(filter(lambda df: "b" in df, data)) + expected = pandas_to_resample.resample('10ns').agg(None, **all_aggregations_dict("b", dtype)) + expected = expected.reindex(columns=sorted(expected.columns)) + + q = QueryBuilder() + q = q.resample('10ns').agg(all_aggregations_dict("b", dtype)) + arctic_resampled = lib.read(sym, query_builder=q).data + arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) + + assert_frame_equal(expected, arctic_resampled, check_dtype=False) + + def test_type_promotion(self, lmdb_version_store_dynamic_schema_v1): + lib = lmdb_version_store_dynamic_schema_v1 + sym = "sym" + data = [ + pd.DataFrame({"a": rand_data(np.int16, 15)}, pd.date_range(pd.Timestamp(0), periods=15, freq='ns')), + pd.DataFrame({"a": rand_data(np.int8, 15)}, pd.date_range(pd.Timestamp(15), periods=15, freq='ns')), + pd.DataFrame({"a": rand_data(np.int32, 15)}, pd.date_range(pd.Timestamp(30), periods=15, freq='ns')), + pd.DataFrame({"a": rand_data(np.float64, 15)}, pd.date_range(pd.Timestamp(45), periods=15, freq='ns')) + ] + for df in data: + lib.append(sym, df) + + q = QueryBuilder() + q = q.resample('10ns').agg(all_aggregations_dict("a")) + arctic_resampled = lib.read(sym, query_builder=q).data + arctic_resampled = arctic_resampled.reindex(columns=sorted(arctic_resampled.columns)) + + pandas_to_resample = lib.read(sym).data + expected = pandas_to_resample.resample('10ns').agg(None, **all_aggregations_dict("a")) + expected = expected.reindex(columns=sorted(expected.columns)) + + assert_frame_equal(arctic_resampled, expected, check_dtype=False) \ No newline at end of file From 6baedc5d798e5144dd740a28d6f232c63834883b Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Tue, 25 Feb 2025 18:16:06 +0200 Subject: [PATCH 10/10] Minor refactor --- cpp/arcticdb/column_store/column.hpp | 3 +-- cpp/arcticdb/storage/s3/s3_client_wrapper.cpp | 3 --- cpp/arcticdb/version/version_core.cpp | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cpp/arcticdb/column_store/column.hpp b/cpp/arcticdb/column_store/column.hpp index 6360a927c1..8ecf32a465 100644 --- a/cpp/arcticdb/column_store/column.hpp +++ b/cpp/arcticdb/column_store/column.hpp @@ -25,7 +25,6 @@ #ifdef __APPLE__ #include #endif -#include #include #include @@ -515,7 +514,7 @@ class Column { return data_.bytes(); } - ColumnData data() const { + ColumnData data() const { return ColumnData(&data_.buffer(), &shapes_.buffer(), type_, sparse_map_ ? &*sparse_map_ : nullptr); } diff --git a/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp b/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp index a7e98427d2..085baa6bc7 100644 --- a/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp +++ b/cpp/arcticdb/storage/s3/s3_client_wrapper.cpp @@ -136,9 +136,6 @@ folly::Future> S3ClientTestWrapper::delete_object( return actual_client_->delete_object(s3_object_name, bucket_name); } -// Using a fixed page size since it's only being used for simple tests. -// If we ever need to configure it we should move it to the s3 proto config instead. -constexpr auto page_size = 10; S3Result S3ClientTestWrapper::list_objects( const std::string& name_prefix, const std::string& bucket_name, diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index c1eda3dea4..4339492eca 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -899,7 +899,7 @@ folly::Future> read_and_process( const ReadOptions& read_options ) { auto component_manager = std::make_shared(); - const ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; + const ProcessingConfig processing_config{opt_false(read_options.dynamic_schema()), pipeline_context->rows_}; for (const auto& clause: read_query->clauses_) { clause->set_processing_config(processing_config); clause->set_component_manager(component_manager);