Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resampling using dynamic schema [Part 1] #2201

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

vasil-pashov
Copy link
Collaborator

@vasil-pashov vasil-pashov commented Feb 25, 2025

Reference Issues/PRs

This enables using the QueryBuilder to resample libraries where dynamic schema is enabled. This requires handling two cases which are different in comparison to static schema.

  1. Type propagation. It turns out that it works without any functional changes in the code.
  2. Missing columns. Dynamic schema allows for segments to contain only a subset of all columns in the symbol. This case needed additional care.

Handling missing columns
Two changes make this possible. First is in std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<EntityId>&& entity_ids) it now builds a bitmap per column. The bitmap has length equal to the number index segments (as we have it as invariant that the index cannot be sparse) and has a bit set to 1 if the column appears in a particular segment. The vector of segments passed for aggregation has length equal to the number of set bits in the bitset. If all bits are set to 1 we go down the static schema way which is a bit more efficient otherwise we go down the path of "not all segments have the column"

The second change is in Column SortedAggregator<aggregation_operator, closed_boundary>::aggregate. Now it has two overloads. One taking a dense vector of columns. Where the i-th column corresponds to the i-th index segment. And a second overload that takes a vector of columns whose size is less than the number of index segments and a bitmap. The length of the bitmap is equal to the count of the index segments. We then iterate over the set bits in the bitmap.

The initial idea used std::vector<std::optional> but this makes the "dense" codepath a bit better.

Code refactoring
There's some code refactoring going on. Mostly moving things from sorted_aggregation.hpp to sorted_aggregation.cpp. Some of the functions did not require the state of SortedAggregator and were made free functions.

Missing behavior
The current implementation has a flaws related to using date_range

  1. If date_range is passed to the QueryBuilder and the date range does not contain some column (which is otherwise in the data frame) we will return either an empty dataframe (if no other columns are being aggregated) or a dataframe not containing that column. More sensible behavior would be to backfill the missing buckets with a default value based on the dtype and the aggregator.
  2. Type propagation takes into account only the segments using in the aggregation and the dtypes from the segment descriptor. This means that different date ranges can result in dataframes whose columns have different dtypes. More sensible behavior would be to use the timeseries descriptor.

@alexowens90 is working on making the TSD easily available. Both of the above can be implemented after that.

What does this implement or fix?

Change Type (Required)

  • Patch (Bug fix or non-breaking improvement)
  • Minor (New feature, but backward compatible)
  • Major (Breaking changes)
  • Cherry pick

Any other comments?

Checklist

Checklist for code changes...
  • Have you updated the relevant docstrings, documentation and copyright notice?
  • Is this contribution tested against all ArcticDB's features?
  • Do all exceptions introduced raise appropriate error messages?
  • Are API changes highlighted in the PR description?
  • Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?

@@ -217,8 +219,7 @@ struct ColumnData {
using base_type = base_iterator_type<TDT, iterator_type, iterator_density, constant>;
using RawType = typename TDT::DataTypeTag::raw_type;
public:
ColumnDataIterator() = delete;

ColumnDataIterator() = default;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was intentional change. If ColumnDataIterator has a default constructor it implements the concept needed by most of the algorithms and classes in std::ranges. I used this in particular to create a std::ranges::subrange in sorted_aggregation.cpp and avoid passing iterators around.

Copy link
Collaborator Author

@vasil-pashov vasil-pashov Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline is the default when the implementation is inside a class/struct body, typename is not needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No functional changes here. Copy/paste from types.hpp

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to types.cpp

};

using SortedAggregatorInterface = folly::Poly<ISortedAggregator>;

template<ResampleBoundary closed_boundary>
class Bucket {
public:
Bucket(timestamp start, timestamp end):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to sorted_aggregation.cpp

Comment on lines -365 to -439
[[nodiscard]] DataType generate_output_data_type(DataType common_input_data_type) const;
[[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const;

template<DataType input_data_type, typename Aggregator, typename T>
void push_to_aggregator(Aggregator& bucket_aggregator, T value, ARCTICDB_UNUSED const ColumnWithStrings& column_with_strings) const {
if constexpr(is_time_type(input_data_type) && aggregation_operator == AggregationOperator::COUNT) {
bucket_aggregator.template push<timestamp, true>(value);
} else if constexpr (is_numeric_type(input_data_type) || is_bool_type(input_data_type)) {
bucket_aggregator.push(value);
} else if constexpr (is_sequence_type(input_data_type)) {
bucket_aggregator.push(column_with_strings.string_at_offset(value));
}
}

template<DataType output_data_type, typename Aggregator>
[[nodiscard]] auto finalize_aggregator(Aggregator& bucket_aggregator, ARCTICDB_UNUSED StringPool& string_pool) const {
if constexpr (is_numeric_type(output_data_type) || is_bool_type(output_data_type) || aggregation_operator == AggregationOperator::COUNT) {
return bucket_aggregator.finalize();
} else if constexpr (is_sequence_type(output_data_type)) {
auto opt_string_view = bucket_aggregator.finalize();
if (ARCTICDB_LIKELY(opt_string_view.has_value())) {
return string_pool.get(*opt_string_view).offset();
} else {
return string_none;
}
}
}

template<typename scalar_type_info>
[[nodiscard]] auto get_bucket_aggregator() const {
if constexpr (aggregation_operator == AggregationOperator::SUM) {
if constexpr (is_bool_type(scalar_type_info::data_type)) {
// Sum of bool column is just the count of true values
return SumAggregatorSorted<uint64_t>();
} else {
return SumAggregatorSorted<typename scalar_type_info::RawType>();
}
} else if constexpr (aggregation_operator == AggregationOperator::MEAN) {
if constexpr (is_time_type(scalar_type_info::data_type)) {
return MeanAggregatorSorted<typename scalar_type_info::RawType, true>();
} else {
return MeanAggregatorSorted<typename scalar_type_info::RawType>();
}
} else if constexpr (aggregation_operator == AggregationOperator::MIN) {
if constexpr (is_time_type(scalar_type_info::data_type)) {
return MinAggregatorSorted<typename scalar_type_info::RawType, true>();
} else {
return MinAggregatorSorted<typename scalar_type_info::RawType>();
}
} else if constexpr (aggregation_operator == AggregationOperator::MAX) {
if constexpr (is_time_type(scalar_type_info::data_type)) {
return MaxAggregatorSorted<typename scalar_type_info::RawType, true>();
} else {
return MaxAggregatorSorted<typename scalar_type_info::RawType>();
}
} else if constexpr (aggregation_operator == AggregationOperator::FIRST) {
if constexpr (is_time_type(scalar_type_info::data_type)) {
return FirstAggregatorSorted<typename scalar_type_info::RawType, true>();
} else if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) {
return FirstAggregatorSorted<typename scalar_type_info::RawType>();
} else if constexpr (is_sequence_type(scalar_type_info::data_type)) {
return FirstAggregatorSorted<std::optional<std::string_view>>();
}
} else if constexpr (aggregation_operator == AggregationOperator::LAST) {
if constexpr (is_time_type(scalar_type_info::data_type)) {
return LastAggregatorSorted<typename scalar_type_info::RawType, true>();
} else if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) {
return LastAggregatorSorted<typename scalar_type_info::RawType>();
} else if constexpr (is_sequence_type(scalar_type_info::data_type)) {
return LastAggregatorSorted<std::optional<std::string_view>>();
}
} else if constexpr (aggregation_operator == AggregationOperator::COUNT) {
return CountAggregatorSorted();
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to sorted_aggregation.cpp

@vasil-pashov vasil-pashov force-pushed the vasil.pashov/resampling-dynamic-schema branch from 1748441 to 6baedc5 Compare February 25, 2025 16:18
@vasil-pashov vasil-pashov marked this pull request as ready for review February 25, 2025 16:19
auto bucket_start_it = bucket_boundaries.begin();
auto bucket_end_it = std::next(bucket_start_it);
const auto bucket_boundaries_end = bucket_boundaries.end();
Bucket<closed_boundary> current_bucket(*bucket_start_it, *bucket_end_it);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this code was outside the loop over input index/agg columns before, why is it here now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For row slices after the first one, won't this result in us having to advance the current bucket iterators repeatedly until we find the first bucket with an index value for this row slice?

const auto index_cend = index_data.cend<IndexTDT>();
auto agg_data = agg_column.column_->data();
auto agg_it = agg_data.cbegin<typename input_type_info::TDT>();
bool bucket_has_values = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't be correct for buckets that span multiple input row slices?

}
if constexpr (is_output_type_allowed<aggregation_operator, output_type_info>()) {
auto output_data = res.data();
ranges::subrange output(output_data.begin<typename output_type_info::TDT>(), output_data.end<typename output_type_info::TDT>());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code was very aggressively optimised to avoid all unnecessary branching. e.g. output_end_it was precalculated, rather than repeatedly calling output_data.end<typename output_type_info::TDT>(), as it was found empirically to be faster. Can you guarantee these optimisations are still in place with this refactor to use ranges::subrange?

const std::vector<timestamp>& bucket_boundaries,
const Column& output_index_column,
StringPool& string_pool,
const bm::bvector<>& existing_columns
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why this implementation is cleaner than input_agg_columns being a vector of optionals, as it seems like this method is extremely similar to the one above. Let's discuss in person

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants