diff --git a/cpp/arcticdb/entity/read_result.hpp b/cpp/arcticdb/entity/read_result.hpp index 526407f073..33dfb64af7 100644 --- a/cpp/arcticdb/entity/read_result.hpp +++ b/cpp/arcticdb/entity/read_result.hpp @@ -21,10 +21,10 @@ namespace arcticdb { struct ARCTICDB_VISIBILITY_HIDDEN ReadResult { ReadResult( - const VersionedItem& versioned_item, + const std::variant>& versioned_item, pipelines::PythonOutputFrame&& frame_data, const arcticdb::proto::descriptors::NormalizationMetadata& norm_meta, - const arcticdb::proto::descriptors::UserDefinedMetadata& user_meta, + const std::variant>& user_meta, const arcticdb::proto::descriptors::UserDefinedMetadata& multi_key_meta, std::vector&& multi_keys) : item(versioned_item), @@ -35,10 +35,10 @@ struct ARCTICDB_VISIBILITY_HIDDEN ReadResult { multi_keys(std::move(multi_keys)) { } - VersionedItem item; + std::variant> item; pipelines::PythonOutputFrame frame_data; arcticdb::proto::descriptors::NormalizationMetadata norm_meta; - arcticdb::proto::descriptors::UserDefinedMetadata user_meta; + std::variant> user_meta; arcticdb::proto::descriptors::UserDefinedMetadata multi_key_meta; std::vector multi_keys; @@ -46,8 +46,9 @@ struct ARCTICDB_VISIBILITY_HIDDEN ReadResult { }; inline ReadResult create_python_read_result( - const VersionedItem& version, - FrameAndDescriptor&& fd) { + const std::variant>& version, + FrameAndDescriptor&& fd, + std::optional>&& user_meta = std::nullopt) { auto result = std::move(fd); // Very old (pre Nov-2020) PandasIndex protobuf messages had no "start" or "step" fields. If is_physically_stored // (renamed from is_not_range_index) was false, the index was always RangeIndex(num_rows, 1) @@ -73,7 +74,13 @@ inline ReadResult create_python_read_result( util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); const auto& desc_proto = result.desc_.proto(); + std::variant> metadata; + if (user_meta.has_value()) { + metadata = *user_meta; + } else { + metadata = desc_proto.user_meta(); + } return {version, std::move(python_frame), desc_proto.normalization(), - desc_proto.user_meta(), desc_proto.multi_key_meta(), std::move(result.keys_)}; + metadata, desc_proto.multi_key_meta(), std::move(result.keys_)}; } } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index a69d9a671a..76fa60831c 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -647,6 +647,7 @@ std::vector ResampleClause::process(std::vectorat(0)->start(), row_slices.front().row_ranges_->at(0)->start() + output_index_column->row_count()); + ColRange output_col_range(1, aggregators_.size() + 1); seg.add_column(scalar_field(DataType::NANOSECONDS_UTC64, index_column_name), output_index_column); seg.descriptor().set_index(IndexDescriptorImpl(1, IndexDescriptor::Type::TIMESTAMP)); auto& string_pool = seg.string_pool(); @@ -675,7 +676,7 @@ std::vector ResampleClause::process(std::vectortype().data_type(), aggregator.get_output_column_name().value), aggregated_column); } seg.set_row_data(output_index_column->row_count() - 1); - return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range))); + return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range), std::move(output_col_range))); } template @@ -1217,4 +1218,55 @@ std::string DateRangeClause::to_string() const { return fmt::format("DATE RANGE {} - {}", start_, end_); } +ConcatClause::ConcatClause() { + clause_info_.input_structure_ = ProcessingStructure::MULTI_SYMBOL; + clause_info_.multi_symbol_ = true; +} + +std::vector> ConcatClause::structure_for_processing(std::vector>&& entity_ids_vec) { + // Similar logic to RowRangeClause::structure_for_processing but as input row ranges come from multiple symbols it is slightly different + std::vector ranges_and_entities; + std::vector> new_row_ranges; + bool first_range{true}; + size_t prev_range_end{0}; + for (const auto& entity_ids: entity_ids_vec) { + auto [old_row_ranges, col_ranges] = component_manager_->get_entities, std::shared_ptr>(entity_ids, false); + // Map from old row ranges WITHIN THIS SYMBOL to new ones + std::map row_range_mapping; + for (const auto& row_range: old_row_ranges) { + // Value is same as key initially + row_range_mapping.insert({*row_range, *row_range}); + } + for (auto& [old_range, new_range]: row_range_mapping) { + if (first_range) { + // Make the first row-range start from zero + new_range.first = 0; + new_range.second = old_range.diff(); + first_range = false; + } else { + new_range.first = prev_range_end; + new_range.second = new_range.first + old_range.diff(); + } + prev_range_end = new_range.second; + } + + for (size_t idx=0; idx(row_range_mapping.at(*old_row_ranges[idx])); + ranges_and_entities.emplace_back(entity_ids[idx], new_row_range, col_ranges[idx]); + new_row_ranges.emplace_back(std::move(new_row_range)); + } + } + component_manager_->replace_entities>(flatten_entities(std::move(entity_ids_vec)), new_row_ranges); + auto new_structure_offsets = structure_by_row_slice(ranges_and_entities); + return offsets_to_entity_ids(new_structure_offsets, ranges_and_entities); +} + +std::vector ConcatClause::process(std::vector&& entity_ids) const { + return std::move(entity_ids); +} + +std::string ConcatClause::to_string() const { + return "CONCAT"; +} + } diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 134780d8db..51f69b2640 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -669,4 +669,37 @@ struct DateRangeClause { [[nodiscard]] std::string to_string() const; }; +struct ConcatClause { + + ClauseInfo clause_info_; + std::shared_ptr component_manager_; + + ConcatClause(); + + ARCTICDB_MOVE_COPY_DEFAULT(ConcatClause) + + [[nodiscard]] std::vector> structure_for_processing(ARCTICDB_UNUSED std::vector& ranges_and_keys) { + // TODO: This isn't a long term requirement, for simplicity at the moment we force all input symbols through their own + // processing pipeline right now, even if they have no clauses + internal::raise("ConcatClause should never be first in the pipeline"); + } + + [[nodiscard]] std::vector> structure_for_processing(std::vector>&& entity_ids_vec); + + [[nodiscard]] std::vector process(std::vector&& entity_ids) const; + + [[nodiscard]] const ClauseInfo& clause_info() const { + return clause_info_; + } + + void set_processing_config(ARCTICDB_UNUSED const ProcessingConfig& processing_config) { + } + + void set_component_manager(std::shared_ptr component_manager) { + component_manager_ = component_manager; + } + + [[nodiscard]] std::string to_string() const; +}; + }//namespace arcticdb diff --git a/cpp/arcticdb/processing/clause_utils.hpp b/cpp/arcticdb/processing/clause_utils.hpp index aa3ef0c1d1..84951730f7 100644 --- a/cpp/arcticdb/processing/clause_utils.hpp +++ b/cpp/arcticdb/processing/clause_utils.hpp @@ -28,7 +28,8 @@ enum class ProcessingStructure { ROW_SLICE, TIME_BUCKETED, HASH_BUCKETED, - ALL + ALL, + MULTI_SYMBOL }; struct KeepCurrentIndex{}; @@ -52,6 +53,8 @@ struct ClauseInfo { std::variant index_{KeepCurrentIndex()}; // Whether this clause modifies the output descriptor bool modifies_output_descriptor_{false}; + // Whether this clause operates on one or multiple symbols + bool multi_symbol_{false}; }; // Changes how the clause behaves based on information only available after it is constructed diff --git a/cpp/arcticdb/processing/query_planner.hpp b/cpp/arcticdb/processing/query_planner.hpp index 99106654e1..b16b79fb40 100644 --- a/cpp/arcticdb/processing/query_planner.hpp +++ b/cpp/arcticdb/processing/query_planner.hpp @@ -22,7 +22,8 @@ using ClauseVariant = std::variant, std::shared_ptr>, std::shared_ptr>, std::shared_ptr, - std::shared_ptr>; + std::shared_ptr, + std::shared_ptr>; std::vector plan_query(std::vector&& clauses); diff --git a/cpp/arcticdb/python/adapt_read_dataframe.hpp b/cpp/arcticdb/python/adapt_read_dataframe.hpp index 65f59b7658..b6594c3bfa 100644 --- a/cpp/arcticdb/python/adapt_read_dataframe.hpp +++ b/cpp/arcticdb/python/adapt_read_dataframe.hpp @@ -14,7 +14,18 @@ namespace arcticdb { inline auto adapt_read_df = [](ReadResult && ret) -> py::tuple{ auto pynorm = python_util::pb_to_python(ret.norm_meta); - auto pyuser_meta = python_util::pb_to_python(ret.user_meta); + auto pyuser_meta = util::variant_match( + ret.user_meta, + [](const arcticdb::proto::descriptors::UserDefinedMetadata& metadata) -> py::object { + return python_util::pb_to_python(metadata); + }, + [](const std::vector& metadatas) -> py::object { + py::list py_metadatas; + for (const auto& metadata: metadatas) { + py_metadatas.append(python_util::pb_to_python(metadata)); + } + return py_metadatas; + }); auto multi_key_meta = python_util::pb_to_python(ret.multi_key_meta); return py::make_tuple(ret.item, std::move(ret.frame_data), pynorm, pyuser_meta, multi_key_meta, ret.multi_keys); }; diff --git a/cpp/arcticdb/python/python_utils.hpp b/cpp/arcticdb/python/python_utils.hpp index f24228b1cb..f9e803944c 100644 --- a/cpp/arcticdb/python/python_utils.hpp +++ b/cpp/arcticdb/python/python_utils.hpp @@ -252,7 +252,7 @@ inline py::list adapt_read_dfs(std::vector>& res, [&lst] (ReadResult& read_result) { auto pynorm = python_util::pb_to_python(read_result.norm_meta); - auto pyuser_meta = python_util::pb_to_python(read_result.user_meta); + auto pyuser_meta = python_util::pb_to_python(std::get(read_result.user_meta)); auto multi_key_meta = python_util::pb_to_python(read_result.multi_key_meta); lst.append(py::make_tuple(read_result.item, std::move(read_result.frame_data), pynorm, pyuser_meta, multi_key_meta, read_result.multi_keys)); diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 99c1a245af..c109631f88 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -1151,6 +1151,124 @@ std::vector> LocalVersionedEngine::ba return read_versions_or_errors; } +StreamDescriptor descriptor_from_segments(std::vector& slice_and_keys) { + internal::check(!slice_and_keys.empty(), "No slice and keys in batch_read_with_join_internal"); + std::sort(std::begin(slice_and_keys), std::end(slice_and_keys), [] (const auto& left, const auto& right) { + return std::tie(left.slice_.row_range, left.slice_.col_range) < std::tie(right.slice_.row_range, right.slice_.col_range); + }); + // TODO: Move this check to ::structure_for_processing, as it could be different for different clauses + // TODO: Make more memory efficient by only keeping first element in memory, and comparing others on the fly + // Track the field collections for each row-slice + std::vector field_collections; + RowRange current_row_range(std::numeric_limits::max(), std::numeric_limits::max()); + for (const auto& slice_and_key: slice_and_keys) { + const auto& desc = slice_and_key.segment_->descriptor(); + if (slice_and_key.slice().rows() != current_row_range) { + field_collections.emplace_back(); + for (size_t field_idx = 0; field_idx < desc.data().index().field_count(); ++field_idx) { + field_collections.back().add(desc.fields().ref_at(field_idx)); + } + current_row_range = slice_and_key.slice().rows(); + } + for (size_t field_idx = desc.data().index().field_count(); field_idx < desc.field_count(); ++field_idx) { + field_collections.back().add(desc.fields().ref_at(field_idx)); + } + } + // Ensure they are all the same + // TODO: Relax this requirement: + // - columns in different orders + // - columns of different but promotable types + // - extra/missing columns in different row slices + for (size_t idx = 1; idx < field_collections.size(); ++idx) { + schema::check( + field_collections[0] == field_collections[idx], + "Mismatching fields in multi-symbol join: {} != {}", + field_collections[0], + field_collections[idx] + ); + } + + // Set the output descriptor based on the first row slice + StreamDescriptor res = slice_and_keys[0].segment_->descriptor().clone(); + res.fields_ = std::make_shared(std::move(field_collections[0])); + + return res; +} + +MultiSymbolReadOutput LocalVersionedEngine::batch_read_with_join_internal( + const std::vector& stream_ids, + const std::vector& version_queries, + std::vector>& read_queries, + const ReadOptions& read_options, + std::vector>&& clauses, + std::any& handler_data) { + py::gil_scoped_release release_gil; + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); + std::vector>> symbol_entities_futs; + symbol_entities_futs.reserve(opt_index_key_futs.size()); + auto component_manager = std::make_shared(); + auto pipeline_context = std::make_shared(); + auto norm_meta_mtx = std::make_shared(); + // TODO: Generate these in a less hacky way + auto res_versioned_items = std::make_shared>(stream_ids.size()); + auto res_metadatas = std::make_shared>(stream_ids.size()); + DecodePathData shared_data; + for (auto&& [idx, opt_index_key_fut]: folly::enumerate(opt_index_key_futs)) { + symbol_entities_futs.emplace_back( + std::move(opt_index_key_fut).thenValue([store = store(), + read_query = read_queries.empty() ? std::make_shared(): read_queries[idx], + idx, + &read_options, + &component_manager, + pipeline_context, + norm_meta_mtx, + res_versioned_items, + res_metadatas](auto&& opt_index_key) mutable { + std::variant version_info; + // TODO: Add support for symbols that only have incomplete segments + internal::check(opt_index_key.has_value(), "batch_read_with_join_internal not supported with non-indexed data"); + // TODO: Only read the index segment once + auto index_key_seg = store->read_sync(*opt_index_key).second; + { + std::lock_guard lock(*norm_meta_mtx); + // TODO: Construct this at the end of the pipeline, instead of reusing input data + pipeline_context->norm_meta_ = std::make_unique(std::move(*index_key_seg.mutable_index_descriptor().mutable_proto().mutable_normalization())); + } + // Shouldn't need mutex protecting as vectors are presized and each element is only accessed once + res_versioned_items->at(idx) = VersionedItem(*opt_index_key); + res_metadatas->at(idx) = index_key_seg.mutable_index_descriptor().user_metadata(); + version_info = VersionedItem(std::move(*opt_index_key)); + return read_entity_ids_for_version(store, version_info, read_query, read_options, component_manager); + }) + ); + } + auto entity_ids_vec_fut = folly::collect(symbol_entities_futs).via(&async::io_executor()); + for (auto& clause: clauses) { + clause->set_component_manager(component_manager); + } + auto clauses_ptr = std::make_shared>>(std::move(clauses)); + + return schedule_remaining_iterations(std::move(entity_ids_vec_fut), clauses_ptr, true) + .thenValueInline([component_manager](std::vector&& processed_entity_ids) { + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); + return collect_segments(std::move(proc)); + }).thenValueInline([store=store(), &handler_data, pipeline_context](std::vector&& slice_and_keys) { + // TODO: Make constructing the output descriptor a standard end of pipeline operation + pipeline_context->set_descriptor(descriptor_from_segments(slice_and_keys)); + return prepare_output_frame(std::move(slice_and_keys), pipeline_context, store, ReadOptions{}, handler_data); + }).thenValueInline([&handler_data, pipeline_context, shared_data, res_versioned_items, res_metadatas](SegmentInMemory&& frame) mutable { + return reduce_and_fix_columns(pipeline_context, frame, ReadOptions{}, handler_data) + .thenValue([pipeline_context, frame, shared_data, res_versioned_items, res_metadatas](auto&&) mutable { + return MultiSymbolReadOutput{std::move(*res_versioned_items), + std::move(*res_metadatas), + {frame, + timeseries_descriptor_from_pipeline_context(pipeline_context, {}, pipeline_context->bucketize_dynamic_), + {}, + shared_data.buffers()}}; + }); + }).get(); +} + void LocalVersionedEngine::write_version_and_prune_previous( bool prune_previous_versions, const AtomKey& new_version, diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index da140ceec4..c3aea05425 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -286,6 +286,14 @@ class LocalVersionedEngine : public VersionedEngine { const ReadOptions& read_options, std::any& handler_data); + MultiSymbolReadOutput batch_read_with_join_internal( + const std::vector& stream_ids, + const std::vector& version_queries, + std::vector>& read_queries, + const ReadOptions& read_options, + std::vector>&& clauses, + std::any& handler_data); + std::vector> batch_read_descriptor_internal( const std::vector& stream_ids, const std::vector& version_queries, diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index b839159843..b8f4dd2317 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -402,6 +402,10 @@ void register_bindings(py::module &version, py::exception>(version, "ConcatClause") + .def(py::init<>()) + .def("__str__", &ConcatClause::to_string); + py::class_>(version, "PythonVersionStoreReadQuery") .def(py::init()) .def_readwrite("columns",&ReadQuery::columns) @@ -410,15 +414,7 @@ void register_bindings(py::module &version, py::exception, - std::shared_ptr, - std::shared_ptr, - std::shared_ptr, - std::shared_ptr>, - std::shared_ptr>, - std::shared_ptr, - std::shared_ptr>> clauses) { + [](ReadQuery& self, std::vector clauses) { clauses = plan_query(std::move(clauses)); std::vector> _clauses; self.needs_post_processing = false; @@ -767,6 +763,43 @@ void register_bindings(py::module &version, py::exception(), "Read a dataframe from the store") + .def("batch_read_with_join", + [&](PythonVersionStore& v, + const std::vector &stream_ids, + const std::vector& version_queries, + std::vector>& read_queries, + const ReadOptions& read_options, + std::vector clauses + ){ + user_input::check(!clauses.empty(), "batch_read_with_join called with no clauses"); + clauses = plan_query(std::move(clauses)); + std::vector> _clauses; + bool first_clause{true}; + for (auto&& clause: clauses) { + util::variant_match( + clause, + [&](auto&& clause) { + if (first_clause) { + user_input::check( + clause->clause_info().multi_symbol_, + "Single-symbol clause cannot be used to join multiple symbols together"); + _clauses.emplace_back(std::make_shared(*clause)); + first_clause = false; + } else { + // TODO: Add this check to ReadQuery.add_clauses + user_input::check( + !clause->clause_info().multi_symbol_, + "Multi-symbol clause cannot be used on a single symbol"); + _clauses.emplace_back(std::make_shared(*clause)); + } + _clauses.emplace_back(std::make_shared(*clause)); + } + ); + } + auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + return adapt_read_df(v.batch_read_with_join(stream_ids, version_queries, read_queries, read_options, std::move(_clauses), handler_data)); + }, + py::call_guard(), "Read a dataframe from the store") .def("batch_read_keys", [&](PythonVersionStore& v, std::vector atom_keys) { return python_util::adapt_read_dfs(frame_to_read_result(v.batch_read_keys(atom_keys))); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index e81118a16b..3012e325d5 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -528,6 +528,9 @@ void add_slice_to_component_manager( } size_t num_scheduling_iterations(const std::vector>& clauses) { + if (clauses.empty()) { + return 0; + } size_t res = 1UL; auto it = std::next(clauses.cbegin()); while (it != clauses.cend()) { @@ -637,6 +640,44 @@ std::shared_ptr>>> schedule_firs return futures; } +folly::Future> schedule_remaining_iterations( + folly::Future>>&& entity_ids_vec_fut, + std::shared_ptr>> clauses, + const bool from_join + ) { + auto scheduling_iterations = num_scheduling_iterations(*clauses); + if (from_join) { + ++scheduling_iterations; + } + for (auto i = 1UL; i < scheduling_iterations; ++i) { + entity_ids_vec_fut = std::move(entity_ids_vec_fut).thenValue([clauses, scheduling_iterations, i, from_join] (std::vector>&& entity_id_vectors) { + ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling iteration {} of {}", i, scheduling_iterations); + + util::check(!clauses->empty(), "Scheduling iteration {} has no clauses to process", scheduling_iterations); + if (i == 1) { + if (!from_join) { + remove_processed_clauses(*clauses); + } + } else { + remove_processed_clauses(*clauses); + } + auto next_units_of_work = clauses->front()->structure_for_processing(std::move(entity_id_vectors)); + + std::vector>> work_futures; + for(auto&& unit_of_work : next_units_of_work) { + ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling work for entity ids: {}", unit_of_work); + work_futures.emplace_back(async::submit_cpu_task(async::MemSegmentProcessingTask{*clauses, std::move(unit_of_work)})); + } + + return folly::collect(work_futures).via(&async::io_executor()); + }); + } + + return std::move(entity_ids_vec_fut).thenValueInline([](std::vector>&& entity_id_vectors) { + return flatten_entities(std::move(entity_id_vectors)); + }); +} + folly::Future> schedule_clause_processing( std::shared_ptr component_manager, std::vector>&& segment_and_slice_futures, @@ -669,29 +710,7 @@ folly::Future> schedule_clause_processing( clauses); auto entity_ids_vec_fut = folly::collect(*futures).via(&async::io_executor()); - - const auto scheduling_iterations = num_scheduling_iterations(*clauses); - for (auto i = 1UL; i < scheduling_iterations; ++i) { - entity_ids_vec_fut = std::move(entity_ids_vec_fut).thenValue([clauses, scheduling_iterations, i] (std::vector>&& entity_id_vectors) { - ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling iteration {} of {}", i, scheduling_iterations); - - util::check(!clauses->empty(), "Scheduling iteration {} has no clauses to process", scheduling_iterations); - remove_processed_clauses(*clauses); - auto next_units_of_work = clauses->front()->structure_for_processing(std::move(entity_id_vectors)); - - std::vector>> work_futures; - for(auto&& unit_of_work : next_units_of_work) { - ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling work for entity ids: {}", unit_of_work); - work_futures.emplace_back(async::submit_cpu_task(async::MemSegmentProcessingTask{*clauses, std::move(unit_of_work)})); - } - - return folly::collect(work_futures).via(&async::io_executor()); - }); - } - - return std::move(entity_ids_vec_fut).thenValueInline([](std::vector>&& entity_id_vectors) { - return flatten_entities(std::move(entity_id_vectors)); - }); + return schedule_remaining_iterations(std::move(entity_ids_vec_fut), clauses); } void set_output_descriptors( @@ -922,6 +941,43 @@ folly::Future> read_and_process( }); } +folly::Future> read_and_process_2( + const std::shared_ptr& store, + const std::shared_ptr& pipeline_context, + const std::shared_ptr& read_query, + const ReadOptions& read_options, + std::shared_ptr component_manager + ) { + ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; + for (auto& clause: read_query->clauses_) { + clause->set_processing_config(processing_config); + clause->set_component_manager(component_manager); + } + + auto ranges_and_keys = generate_ranges_and_keys(*pipeline_context); + + // Each element of the vector corresponds to one processing unit containing the list of indexes in ranges_and_keys required for that processing unit + // i.e. if the first processing unit needs ranges_and_keys[0] and ranges_and_keys[1], and the second needs ranges_and_keys[2] and ranges_and_keys[3] + // then the structure will be {{0, 1}, {2, 3}} + std::vector> processing_unit_indexes; + if (!read_query->clauses_.empty()) { + processing_unit_indexes = read_query->clauses_[0]->structure_for_processing( + ranges_and_keys); + } else { + processing_unit_indexes = structure_by_row_slice(ranges_and_keys); + } + + // Start reading as early as possible + auto segment_and_slice_futures = generate_segment_and_slice_futures(store, pipeline_context, processing_config, std::move(ranges_and_keys)); + + return schedule_clause_processing( + component_manager, + std::move(segment_and_slice_futures), + std::move(processing_unit_indexes), + std::make_shared>>(read_query->clauses_)) + .via(&async::cpu_executor()); +} + void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDescriptor& desc) { if (read_query.columns.has_value()) { auto index_columns = stream::get_index_columns_from_descriptor(desc); @@ -1264,7 +1320,7 @@ void copy_frame_data_to_buffer( } struct CopyToBufferTask : async::BaseTask { - SegmentInMemory&& source_segment_; + SegmentInMemory source_segment_; SegmentInMemory target_segment_; FrameSlice frame_slice_; DecodePathData shared_data_; @@ -1322,11 +1378,10 @@ folly::Future copy_segments_to_frame( DecodePathData shared_data; for (auto context_row : folly::enumerate(*pipeline_context)) { auto &slice_and_key = context_row->slice_and_key(); - auto &segment = slice_and_key.segment(store); copy_tasks.emplace_back(async::submit_cpu_task( CopyToBufferTask{ - std::move(segment), + slice_and_key.release_segment(store), frame, context_row->slice_and_key().slice(), shared_data, @@ -1535,6 +1590,16 @@ folly::Future do_direct_read_or_process( } } +folly::Future> do_process( + const std::shared_ptr& store, + const std::shared_ptr& read_query, + const ReadOptions& read_options, + const std::shared_ptr& pipeline_context, + std::shared_ptr component_manager) { + schema::check(!pipeline_context->is_pickled(),"Cannot perform multi-symbol join on pickled data"); + return read_and_process_2(store, pipeline_context, read_query, read_options, component_manager); +} + VersionedItem collate_and_write( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, @@ -2027,6 +2092,44 @@ folly::Future read_frame_for_version( }); }); } + +folly::Future> read_entity_ids_for_version( + const std::shared_ptr& store, + const std::variant& version_info, + const std::shared_ptr& read_query , + const ReadOptions& read_options, + std::shared_ptr component_manager) { + using namespace arcticdb::pipelines; + auto pipeline_context = std::make_shared(); + VersionedItem res_versioned_item; + + if(std::holds_alternative(version_info)) { + pipeline_context->stream_id_ = std::get(version_info); + } else { + pipeline_context->stream_id_ = std::get(version_info).key_.id(); + read_indexed_keys_to_pipeline(store, pipeline_context, std::get(version_info), *read_query, read_options); + } + + user_input::check(!pipeline_context->multi_key_, "Multi-symbol joins not supported with recursively normalized data"); + + if(opt_false(read_options.incompletes_)) { + util::check(std::holds_alternative(read_query->row_filter), "Streaming read requires date range filter"); + const auto& query_range = std::get(read_query->row_filter); + const auto existing_range = pipeline_context->index_range(); + if(!existing_range.specified_ || query_range.end_ > existing_range.end_) + read_incompletes_to_pipeline(store, pipeline_context, *read_query, read_options, false, false, false, opt_false(read_options.dynamic_schema_)); + } + + if(std::holds_alternative(version_info) && !pipeline_context->incompletes_after_) { + return std::vector{}; + } + + modify_descriptor(pipeline_context, read_options); + generate_filtered_field_descriptors(pipeline_context, read_query->columns); + ARCTICDB_DEBUG(log::version(), "Fetching data to frame"); + + return do_process(store, read_query, read_options, pipeline_context, component_manager); +} } //namespace arcticdb::version_store namespace arcticdb { diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index f9e0ae9294..296e56491b 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -53,6 +53,23 @@ struct ReadVersionOutput { FrameAndDescriptor frame_and_descriptor_; }; +struct MultiSymbolReadOutput { + MultiSymbolReadOutput() = delete; + MultiSymbolReadOutput( + std::vector&& versioned_items, + std::vector&& metadatas, + FrameAndDescriptor&& frame_and_descriptor): + versioned_items_(std::move(versioned_items)), + metadatas_(std::move(metadatas)), + frame_and_descriptor_(std::move(frame_and_descriptor)) {} + + ARCTICDB_MOVE_ONLY_DEFAULT(MultiSymbolReadOutput) + + std::vector versioned_items_; + std::vector metadatas_; + FrameAndDescriptor frame_and_descriptor_; +}; + VersionedItem write_dataframe_impl( const std::shared_ptr& store, VersionId version_id, @@ -132,6 +149,11 @@ folly::Future read_multi_key( const SegmentInMemory& index_key_seg, std::any& handler_data); +folly::Future> schedule_remaining_iterations( + folly::Future>>&& entity_ids_vec_fut, + std::shared_ptr>> clauses, + const bool from_join = false); + folly::Future> schedule_clause_processing( std::shared_ptr component_manager, std::vector>&& segment_and_slice_futures, @@ -211,6 +233,14 @@ folly::Future read_frame_for_version( std::any& handler_data ); +folly::Future> read_entity_ids_for_version( + const std::shared_ptr& store, + const std::variant& version_info, + const std::shared_ptr& read_query, + const ReadOptions& read_options, + std::shared_ptr component_manager +); + class DeleteIncompleteKeysOnExit { public: DeleteIncompleteKeysOnExit( @@ -239,6 +269,13 @@ std::optional get_delete_keys_on_failure( const std::shared_ptr& store, const CompactIncompleteOptions& options); +folly::Future prepare_output_frame( + std::vector&& items, + const std::shared_ptr& pipeline_context, + const std::shared_ptr& store, + const ReadOptions& read_options, + std::any& handler_data); + } //namespace arcticdb::version_store namespace arcticdb { diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index ab8c369371..6ed2ad7b96 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -787,6 +787,21 @@ std::vector> PythonVersionStore::batch_read( return res; } +ReadResult PythonVersionStore::batch_read_with_join( + const std::vector& stream_ids, + const std::vector& version_queries, + std::vector>& read_queries, + const ReadOptions& read_options, + std::vector>&& clauses, + std::any& handler_data) { + auto versions_and_frame = batch_read_with_join_internal(stream_ids, version_queries, read_queries, read_options, std::move(clauses), handler_data); + return create_python_read_result( + versions_and_frame.versioned_items_, + std::move(versions_and_frame.frame_and_descriptor_), + std::move(versions_and_frame.metadatas_) + ); +} + void PythonVersionStore::delete_snapshot(const SnapshotId& snap_name) { ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: delete_snapshot"); auto opt_snapshot = get_snapshot(store(), snap_name); diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index e8d3057e34..e24f38113a 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -300,6 +300,14 @@ class PythonVersionStore : public LocalVersionedEngine { const ReadOptions& read_options, std::any& handler_data); + ReadResult batch_read_with_join( + const std::vector& stream_ids, + const std::vector& version_queries, + std::vector>& read_queries, + const ReadOptions& read_options, + std::vector>&& clauses, + std::any& handler_data); + std::vector, DataError>> batch_read_metadata( const std::vector& stream_ids, const std::vector& version_queries, diff --git a/python/arcticdb/__init__.py b/python/arcticdb/__init__.py index 2a1f78fc78..da7472be27 100644 --- a/python/arcticdb/__init__.py +++ b/python/arcticdb/__init__.py @@ -17,6 +17,7 @@ col, LazyDataFrame, LazyDataFrameCollection, + concat, StagedDataFinalizeMethod, WriteMetadataPayload ) diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 4a45c24fb7..777a610b1c 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -7,6 +7,7 @@ """ import copy +from dataclasses import dataclass import datetime import os import sys @@ -124,6 +125,12 @@ def __iter__(self): # Backwards compatible with the old NamedTuple implementati return iter(attr.astuple(self)) +@dataclass +class VersionedItemWithJoin: + versions: List[VersionedItem] + data: Any + + def _env_config_from_lib_config(lib_cfg, env): cfg = EnvironmentConfigsMap() e = cfg.env_by_id[env] @@ -1065,6 +1072,21 @@ def _batch_read_to_versioned_items( versioned_items.append(vitem) return versioned_items + def _batch_read_with_join( + self, symbols, as_ofs, date_ranges, row_ranges, columns, per_symbol_query_builders, query_builder + ): + implement_read_index = True + if columns: + columns = [self._resolve_empty_columns(c, implement_read_index) for c in columns] + version_queries = self._get_version_queries(len(symbols), as_ofs, iterate_snapshots_if_tombstoned=False) + # Take a copy as _get_read_queries can modify the input argument, which makes reusing the input counter-intuitive + per_symbol_query_builders = copy.deepcopy(per_symbol_query_builders) + # TODO: Less hacky way for date ranges (and most likely row ranges) as part of ReadRequests to work + force_ranges_to_queries = True + read_queries = self._get_read_queries(len(symbols), date_ranges, row_ranges, columns, per_symbol_query_builders, force_ranges_to_queries) + read_options = self._get_read_options(iterate_snapshots_if_tombstoned=False) + return self._adapt_read_res(ReadResult(*self.version_store.batch_read_with_join(symbols, version_queries, read_queries, read_options, query_builder.clauses))) + def batch_read_metadata( self, symbols: List[str], as_ofs: Optional[List[VersionQueryInput]] = None, **kwargs ) -> Dict[str, VersionedItem]: @@ -1601,6 +1623,7 @@ def _get_read_queries( row_ranges: Optional[List[Optional[Tuple[int, int]]]], columns: Optional[List[List[str]]], query_builder: Optional[Union[QueryBuilder, List[QueryBuilder]]], + force_ranges_to_queries: bool = False, ): read_queries = [] @@ -1647,6 +1670,9 @@ def _get_read_queries( if query_builder is not None: query = copy.deepcopy(query_builder) if isinstance(query_builder, QueryBuilder) else query_builder[idx] + if query is None and force_ranges_to_queries: + query = QueryBuilder() + read_query = self._get_read_query( date_range=date_range, row_range=row_range, @@ -2081,22 +2107,40 @@ def _get_index_columns_from_descriptor(descriptor): return index_columns - def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem: + def _adapt_read_res(self, read_result: ReadResult) -> Union[VersionedItem, VersionedItemWithJoin]: frame_data = FrameData.from_cpp(read_result.frame_data) - meta = denormalize_user_metadata(read_result.udm, self._normalizer) data = self._normalizer.denormalize(frame_data, read_result.norm) if read_result.norm.HasField("custom"): data = self._custom_normalizer.denormalize(data, read_result.norm.custom) - return VersionedItem( - symbol=read_result.version.symbol, - library=self._library.library_path, - data=data, - version=read_result.version.version, - metadata=meta, - host=self.env, - timestamp=read_result.version.timestamp, - ) + if isinstance(read_result.version, list): + versions = [] + for idx in range(len(read_result.version)): + versions.append( + VersionedItem( + symbol=read_result.version[idx].symbol, + library=self._library.library_path, + data=None, + version=read_result.version[idx].version, + metadata=denormalize_user_metadata(read_result.udm[idx], self._normalizer), + host=self.env, + timestamp=read_result.version[idx].timestamp, + ) + ) + return VersionedItemWithJoin( + versions=versions, + data=data, + ) + else: + return VersionedItem( + symbol=read_result.version.symbol, + library=self._library.library_path, + data=data, + version=read_result.version.version, + metadata=denormalize_user_metadata(read_result.udm, self._normalizer), + host=self.env, + timestamp=read_result.version.timestamp, + ) def list_versions( self, diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index ddb070f09b..c3378ea702 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -20,7 +20,7 @@ from arcticdb.util._versions import IS_PANDAS_TWO from arcticdb.version_store.processing import ExpressionNode, QueryBuilder -from arcticdb.version_store._store import NativeVersionStore, VersionedItem, VersionQueryInput +from arcticdb.version_store._store import NativeVersionStore, VersionedItem, VersionedItemWithJoin, VersionQueryInput from arcticdb_ext.exceptions import ArcticException from arcticdb_ext.version_store import DataError import pandas as pd @@ -478,6 +478,37 @@ def __repr__(self) -> str: return self.__str__() +class LazyDataFrameAfterJoin(QueryBuilder): + def __init__( + self, + lazy_dataframes: LazyDataFrameCollection, + join: QueryBuilder, + ): + super().__init__() + self._lazy_dataframes = lazy_dataframes + self.then(join) + + def collect(self) -> VersionedItemWithJoin: + if not len(self._lazy_dataframes._lazy_dataframes): + return [] + else: + lib = self._lazy_dataframes._lib + return lib.read_batch_with_join(self._lazy_dataframes._read_requests(), self) + + def __str__(self) -> str: + query_builder_repr = super().__str__() + return f"LazyDataFrameAfterJoin({self._lazy_dataframes._lazy_dataframes} | {query_builder_repr})" + + def __repr__(self) -> str: + return self.__str__() + + +def concat(lazy_dataframes: Union[List[LazyDataFrame], LazyDataFrameCollection]) -> LazyDataFrameAfterJoin: + if not isinstance(lazy_dataframes, LazyDataFrameCollection): + lazy_dataframes = LazyDataFrameCollection(lazy_dataframes) + return LazyDataFrameAfterJoin(lazy_dataframes, QueryBuilder().concat()) + + def col(name: str) -> ExpressionNode: """ Placeholder for referencing columns by name in lazy dataframe operations before the underlying object has been @@ -1643,6 +1674,36 @@ def handle_symbol(s_): iterate_snapshots_if_tombstoned=False, ) + def read_batch_with_join( + self, + read_requests: List[ReadRequest], + query_builder: Optional[QueryBuilder] = None, + ) -> VersionedItemWithJoin: + symbol_strings = [] + as_ofs = [] + date_ranges = [] + row_ranges = [] + columns = [] + per_symbol_query_builders = [] + + for r in read_requests: + symbol_strings.append(r.symbol) + as_ofs.append(r.as_of) + date_ranges.append(r.date_range) + row_ranges.append(r.row_range) + columns.append(r.columns) + per_symbol_query_builders.append(r.query_builder) + + return self._nvs._batch_read_with_join( + symbol_strings, + as_ofs, + date_ranges, + row_ranges, + columns, + per_symbol_query_builders, + query_builder, + ) + def read_metadata(self, symbol: str, as_of: Optional[AsOf] = None) -> VersionedItem: """ Return the metadata saved for a symbol. This method is faster than read as it only loads the metadata, not the diff --git a/python/arcticdb/version_store/processing.py b/python/arcticdb/version_store/processing.py index f58944376f..993e3d9929 100644 --- a/python/arcticdb/version_store/processing.py +++ b/python/arcticdb/version_store/processing.py @@ -33,6 +33,7 @@ from arcticdb_ext.version_store import ResampleBoundary as _ResampleBoundary from arcticdb_ext.version_store import RowRangeClause as _RowRangeClause from arcticdb_ext.version_store import DateRangeClause as _DateRangeClause +from arcticdb_ext.version_store import ConcatClause as _ConcatClause from arcticdb_ext.version_store import RowRangeType as _RowRangeType from arcticdb_ext.version_store import ExpressionName as _ExpressionName from arcticdb_ext.version_store import ColumnName as _ColumnName @@ -323,6 +324,12 @@ class PythonResampleClause: origin: Union[str, pd.Timestamp] = "epoch" +# TODO: Test pickling of this +@dataclass +class PythonConcatClause: + pass + + class QueryBuilder: """ Build a query to process read results with. Syntax is designed to be similar to Pandas: @@ -904,6 +911,11 @@ def date_range(self, date_range: DateRangeInput): self._python_clauses = self._python_clauses + [PythonDateRangeClause(start.value, end.value)] return self + def concat(self): + self.clauses = self.clauses + [_ConcatClause()] + self._python_clauses = self._python_clauses + [PythonConcatClause()] + return self + def __eq__(self, right): if not isinstance(right, QueryBuilder): return False @@ -965,6 +977,8 @@ def __setstate__(self, state): self.clauses = self.clauses + [_RowRangeClause(python_clause.row_range_type, python_clause.n)] elif isinstance(python_clause, PythonDateRangeClause): self.clauses = self.clauses + [_DateRangeClause(python_clause.start, python_clause.end)] + elif isinstance(python_clause, PythonConcatClause): + self.clauses = self.clauses + [_ConcatClause()] else: raise ArcticNativeException( f"Unrecognised clause type {type(python_clause)} when unpickling QueryBuilder" diff --git a/python/tests/conftest.py b/python/tests/conftest.py index c9e3f4a8d5..a198dbaf9d 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -136,6 +136,13 @@ def lmdb_library_static_dynamic(request): yield request.getfixturevalue(request.param) +@pytest.fixture +def lmdb_library_factory(lmdb_storage, lib_name): + def f(library_options: LibraryOptions = LibraryOptions()): + return lmdb_storage.create_arctic().create_library(lib_name, library_options=library_options) + return f + + # ssl is enabled by default to maximize test coverage as ssl is enabled most of the times in real world @pytest.fixture(scope="session") def s3_storage_factory() -> Generator[MotoS3StorageFixtureFactory, None, None]: diff --git a/python/tests/unit/arcticdb/version_store/test_symbol_concatenation.py b/python/tests/unit/arcticdb/version_store/test_symbol_concatenation.py new file mode 100644 index 0000000000..7ec3954f00 --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_symbol_concatenation.py @@ -0,0 +1,309 @@ +""" +Copyright 2025 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" +import numpy as np +import pandas as pd +import pytest + +from arcticdb import col, concat, LazyDataFrame, LazyDataFrameCollection, QueryBuilder, ReadRequest +from arcticdb.exceptions import SchemaException +from arcticdb.options import LibraryOptions +from arcticdb.util.test import assert_frame_equal + +pytestmark = pytest.mark.pipeline + + +@pytest.mark.parametrize("rows_per_segment", [2, 100_000]) +@pytest.mark.parametrize("columns_per_segment", [2, 100_000]) +@pytest.mark.parametrize("index", [None, pd.date_range("2025-01-01", periods=12)]) +def test_symbol_concat_basic(lmdb_library_factory, rows_per_segment, columns_per_segment, index): + lib = lmdb_library_factory(LibraryOptions(rows_per_segment=rows_per_segment, columns_per_segment=columns_per_segment)) + df_0 = pd.DataFrame( + { + "col1": np.arange(3, dtype=np.int64), + "col2": np.arange(100, 103, dtype=np.int64), + "col3": np.arange(1000, 1003, dtype=np.int64), + }, + index=index[:3] if index is not None else None, + ) + df_1 = pd.DataFrame( + { + "col1": np.arange(4, dtype=np.int64), + "col2": np.arange(200, 204, dtype=np.int64), + "col3": np.arange(2000, 2004, dtype=np.int64), + }, + index=index[3:7] if index is not None else None, + ) + df_2 = pd.DataFrame( + { + "col1": np.arange(5, dtype=np.int64), + "col2": np.arange(300, 305, dtype=np.int64), + "col3": np.arange(3000, 3005, dtype=np.int64), + }, + index=index[7:] if index is not None else None, + ) + lib.write("sym0", df_0, metadata=0) + lib.write("sym1", df_1) + lib.write("sym2", df_2, metadata=2) + + received = concat(lib.read_batch(["sym0", "sym1", "sym2"], lazy=True)).collect() + expected = pd.concat([df_0, df_1, df_2]) + if index is None: + expected.index = pd.RangeIndex(len(expected)) + assert_frame_equal(expected, received.data) + for idx, version in enumerate(received.versions): + assert version.symbol == f"sym{idx}" + assert version.version == 0 + assert version.data is None + assert version.metadata == (None if idx == 1 else idx) + + +# TODO: Get working with column slicing +@pytest.mark.xfail(reason="Not yet working with column slicing") +@pytest.mark.parametrize("rows_per_segment", [2, 100_000]) +@pytest.mark.parametrize("columns_per_segment", [2, 100_000]) +@pytest.mark.parametrize("columns", [["col1"], ["col2"], ["col3"], ["col1", "col2"], ["col1", "col3"], ["col2", "col3"]]) +def test_symbol_concat_column_slicing(lmdb_library_factory, rows_per_segment, columns_per_segment, columns): + lib = lmdb_library_factory(LibraryOptions(rows_per_segment=rows_per_segment, columns_per_segment=columns_per_segment)) + df_0 = pd.DataFrame( + { + "col1": np.arange(3, dtype=np.int64), + "col2": np.arange(100, 103, dtype=np.int64), + "col3": np.arange(1000, 1003, dtype=np.int64), + }, + ) + df_1 = pd.DataFrame( + { + "col0": np.arange(10, 14, dtype=np.int64), + "col1": np.arange(4, dtype=np.int64), + "col2": np.arange(200, 204, dtype=np.int64), + "col3": np.arange(2000, 2004, dtype=np.int64), + }, + ) + lib.write("sym0", df_0) + lib.write("sym1", df_1) + + lazy_df_0 = lib.read("sym0", columns=columns, lazy=True) + lazy_df_1 = lib.read("sym1", columns=columns, lazy=True) + + received = concat([lazy_df_0, lazy_df_1]).collect().data + expected = pd.concat([df_0.loc[:, columns], df_1.loc[:, columns]]) + expected.index = pd.RangeIndex(len(expected)) + assert_frame_equal(expected, received) + + +@pytest.mark.parametrize("rows_per_segment", [2, 100_000]) +@pytest.mark.parametrize("columns_per_segment", [2, 100_000]) +def test_symbol_concat_multiindex(lmdb_library_factory, rows_per_segment, columns_per_segment): + lib = lmdb_library_factory(LibraryOptions(rows_per_segment=rows_per_segment, columns_per_segment=columns_per_segment)) + df = pd.DataFrame( + { + "col1": np.arange(12, dtype=np.int64), + "col2": np.arange(100, 112, dtype=np.int64), + "col3": np.arange(1000, 1012, dtype=np.int64), + }, + index=pd.MultiIndex.from_product([pd.date_range("2025-01-01", periods=4), [0, 1, 2]], names=["datetime", "level"]), + ) + lib.write("sym0", df[:3]) + lib.write("sym1", df[3:7]) + lib.write("sym2", df[7:]) + + received = concat(lib.read_batch(["sym0", "sym1", "sym2"], lazy=True)).collect().data + assert_frame_equal(df, received) + + +def test_symbol_concat_with_date_range(lmdb_library): + lib = lmdb_library + df_0 = pd.DataFrame( + { + "col1": np.arange(3, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(0), freq="1000ns", periods=3), + ) + df_1 = pd.DataFrame( + { + "col1": np.arange(4, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(1000), freq="1000ns", periods=4), + ) + lib.write("sym0", df_0) + lib.write("sym1", df_1) + + # Use date_range arg to trim last row from sym0 + lazy_df_0 = lib.read("sym0", date_range=(None, pd.Timestamp(1000)), lazy=True) + # Use date_range clause to trim first row from sym1 + lazy_df_1 = lib.read("sym1", lazy=True) + lazy_df_1 = lazy_df_1.date_range((pd.Timestamp(2000), None)) + + received = concat([lazy_df_0, lazy_df_1]).collect().data + expected = pd.concat([df_0[:2], df_1[1:]]) + assert_frame_equal(expected, received) + + +@pytest.mark.parametrize("rows_per_segment", [2, 100_000]) +@pytest.mark.parametrize("columns_per_segment", [2, 100_000]) +def test_symbol_concat_complex(lmdb_library_factory, rows_per_segment, columns_per_segment): + lib = lmdb_library_factory(LibraryOptions(rows_per_segment=rows_per_segment, columns_per_segment=columns_per_segment)) + df_0 = pd.DataFrame( + { + "col1": np.arange(3, dtype=np.int64), + "col2": np.arange(100, 103, dtype=np.int64), + "col3": np.arange(1000, 1003, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(0), freq="1000ns", periods=3), + ) + df_1 = pd.DataFrame( + { + "col1": np.arange(4, dtype=np.int64), + "col2": np.arange(200, 204, dtype=np.int64), + "col3": np.arange(2000, 2004, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(2000), freq="1000ns", periods=4), + ) + df_2 = pd.DataFrame( + { + "col1": np.arange(5, dtype=np.int64), + "col2": np.arange(300, 305, dtype=np.int64), + "col3": np.arange(3000, 3005, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(6000), freq="1000ns", periods=5), + ) + lib.write("sym0", df_0) + lib.write("sym1", df_1) + lib.write("sym2", df_2) + + lazy_df_0 = lib.read("sym0", lazy=True) + lazy_df_1 = lib.read("sym1", lazy=True) + lazy_df_1 = lazy_df_1.date_range((pd.Timestamp(pd.Timestamp(3000)), None)) + lazy_df_2 = lib.read("sym2", date_range=(None, pd.Timestamp(9000)), lazy=True) + + lazy_df = concat([lazy_df_0, lazy_df_1, lazy_df_2]) + + lazy_df.resample("2000ns").agg({"col1": "sum", "col2": "mean", "col3": "min"}) + + received = lazy_df.collect().data + received = received.reindex(columns=sorted(received.columns)) + expected = pd.concat([df_0, df_1[1:], df_2[:4]]).resample("2000ns").agg({"col1": "sum", "col2": "mean", "col3": "min"}) + assert_frame_equal(expected, received) + + +def test_symbol_concat_querybuilder_syntax(lmdb_library): + lib = lmdb_library + df_0 = pd.DataFrame( + { + "col1": np.arange(3, dtype=np.int64), + "col2": np.arange(100, 103, dtype=np.int64), + "col3": np.arange(1000, 1003, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(0), freq="1000ns", periods=3), + ) + df_1 = pd.DataFrame( + { + "col1": np.arange(4, dtype=np.int64), + "col2": np.arange(200, 204, dtype=np.int64), + "col3": np.arange(2000, 2004, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(2000), freq="1000ns", periods=4), + ) + df_2 = pd.DataFrame( + { + "col1": np.arange(5, dtype=np.int64), + "col2": np.arange(300, 305, dtype=np.int64), + "col3": np.arange(3000, 3005, dtype=np.int64), + }, + index=pd.date_range(pd.Timestamp(6000), freq="1000ns", periods=5), + ) + lib.write("sym0", df_0) + lib.write("sym1", df_1) + lib.write("sym2", df_2) + + read_request_0 = ReadRequest("sym0") + qb1 = QueryBuilder().date_range((pd.Timestamp(pd.Timestamp(3000)), None)) + read_request_1 = ReadRequest("sym1", query_builder=qb1) + read_request_2 = ReadRequest("sym2", date_range=(None, pd.Timestamp(9000))) + + q = QueryBuilder().concat().resample("2000ns").agg({"col1": "sum", "col2": "mean", "col3": "min"}) + received = lib.read_batch_with_join([read_request_0, read_request_1, read_request_2], query_builder=q).data + + received = received.reindex(columns=sorted(received.columns)) + expected = pd.concat([df_0, df_1[1:], df_2[:4]]).resample("2000ns").agg({"col1": "sum", "col2": "mean", "col3": "min"}) + assert_frame_equal(expected, received) + + +@pytest.mark.parametrize("index", [None, [pd.Timestamp(0)]]) +def test_symbol_concat_symbols_with_different_columns(lmdb_library_factory, index): + lib = lmdb_library_factory(LibraryOptions(columns_per_segment=2)) + df_0 = pd.DataFrame({"col1": [0], "col3": [0]}, index=index) + df_1 = pd.DataFrame({"col2": [0], "col3": [0]}, index=index) + df_2 = pd.DataFrame({"col1": [0], "col4": [0]}, index=index) + df_3 = pd.DataFrame({"col1": [0], "col3": [0], "col5": [0], "col6": [0]}, index=index) + df_4 = pd.DataFrame({"col1": [0], "col3": [0], "col5": [0], "col7": [0]}, index=index) + lib.write("sym0", df_0) + lib.write("sym1", df_1) + lib.write("sym2", df_2) + lib.write("sym3", df_3) + lib.write("sym4", df_4) + + # First column different + with pytest.raises(SchemaException): + concat(lib.read_batch(["sym0", "sym1"], lazy=True)).collect() + # Second column different + with pytest.raises(SchemaException): + concat(lib.read_batch(["sym0", "sym2"], lazy=True)).collect() + # First row slice with extra column slice + with pytest.raises(SchemaException): + concat(lib.read_batch(["sym3", "sym0"], lazy=True)).collect() + # Second row slice with extra column slice + with pytest.raises(SchemaException): + concat(lib.read_batch(["sym0", "sym3"], lazy=True)).collect() + # Row slices differ only in second column slice + with pytest.raises(SchemaException): + concat(lib.read_batch(["sym3", "sym4"], lazy=True)).collect() + + +def test_symbol_concat_symbols_with_different_indexes(lmdb_library): + lib = lmdb_library + df_0 = pd.DataFrame({"col": [0]}, index=pd.RangeIndex(1)) + df_1 = pd.DataFrame({"col": [0]}, index=[pd.Timestamp(0)]) + dt1 = pd.Timestamp(0) + dt2 = pd.Timestamp(1) + arr1 = [dt1, dt1, dt2, dt2] + arr2 = [0, 1, 0, 1] + df_2 = pd.DataFrame({"col": [0]}, index=pd.MultiIndex.from_arrays([arr1, arr2], names=["datetime", "level"])) + + lib.write("range_index_sym", df_0) + lib.write("timestamp_index_sym", df_1) + lib.write("multiindex_sym", df_2) + + with pytest.raises(SchemaException): + concat(lib.read_batch(["range_index_sym", "timestamp_index_sym"], lazy=True)).collect() + + with pytest.raises(SchemaException): + concat(lib.read_batch(["timestamp_index_sym", "range_index_sym"], lazy=True)).collect() + + with pytest.raises(SchemaException): + concat(lib.read_batch(["range_index_sym", "multiindex_sym"], lazy=True)).collect() + + with pytest.raises(SchemaException): + concat(lib.read_batch(["multiindex_sym", "range_index_sym"], lazy=True)).collect() + + with pytest.raises(SchemaException): + concat(lib.read_batch(["timestamp_index_sym", "multiindex_sym"], lazy=True)).collect() + + with pytest.raises(SchemaException): + concat(lib.read_batch(["timestamp_index_sym", "multiindex_sym"], lazy=True)).collect() + + +def test_symbol_concat_pickled_data(lmdb_library): + lib = lmdb_library + df = pd.DataFrame({"bytes": np.arange(10, dtype=np.uint64)}) + pickled_data = {"hi", "there"} + lib.write("sym0", df) + lib.write_pickle("sym1", pickled_data) + + with pytest.raises(SchemaException): + concat(lib.read_batch(["sym0", "sym1"], lazy=True)).collect()