Skip to content

Commit

Permalink
Post rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Jan 14, 2025
1 parent bf2f952 commit eeebb76
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 125 deletions.
7 changes: 3 additions & 4 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ find_package(prometheus-cpp CONFIG REQUIRED)
find_package(Threads REQUIRED)
find_package(double-conversion REQUIRED)
find_package(unordered_dense CONFIG REQUIRED)
#find_package(sparrow REQUIRED)

if(NOT WIN32 AND ${SSL_LINK})
find_package(Kerberos REQUIRED)
Expand All @@ -42,7 +43,6 @@ find_package(zstd CONFIG REQUIRED) # "CONFIG" bypasses our cpp/CMake/FindZstd.c

find_package(azure-identity-cpp CONFIG REQUIRED)
find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
#find_package(sparrow REQUIRED)

if(${BUILD_WITH_REMOTERY})
add_compile_definitions(USE_REMOTERY)
Expand Down Expand Up @@ -499,7 +499,6 @@ set(arcticdb_srcs
stream/protobuf_mappings.cpp
toolbox/library_tool.cpp
util/allocator.cpp
util/buffer_holder.cpp
util/allocation_tracing.cpp
util/buffer_pool.cpp
util/configs_map.cpp
Expand Down Expand Up @@ -764,8 +763,8 @@ if (SSL_LINK)
find_package(OpenSSL REQUIRED)
list(APPEND arcticdb_core_libraries OpenSSL::SSL)
if (NOT WIN32)
list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
#list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
#list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
endif()
endif ()
target_link_libraries(arcticdb_core_object PUBLIC ${arcticdb_core_libraries})
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/arrow/arrow_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#pragma once

#ifdef ARCTICDB_ARROW_SUPPORT

#include <arcticdb/column_store/memory_segment.hpp>
#include <sparrow/arrow_interface/arrow_schema/smart_pointers.hpp>
#include <sparrow/arrow_interface/arrow_array/smart_pointers.hpp>
Expand All @@ -27,4 +29,6 @@ struct ArrowData {
sparrow::arrow_schema_unique_ptr schema_;
};

} // namespace arcticdb
} // namespace arcticdb

#endif
29 changes: 28 additions & 1 deletion cpp/arcticdb/arrow/arrow_output_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

#include <arcticdb/arrow/arrow_utils.hpp>
#include <arcticdb/arrow/arrow_data.hpp>
#include <arcticdb/arrow/arrow_output_frame.hpp>

#include <vector>

#ifdef ARCTIDB_ARROW_SUPPORT

namespace arcticdb {

ArrowOutputFrame::ArrowOutputFrame(
Expand Down Expand Up @@ -61,4 +64,28 @@ std::vector<std::string> ArrowOutputFrame::names() const {
return names_;
}

} // namespace arcticdb
} // namespace arcticdb

#else

namespace arcticdb {

std::vector<std::vector<uintptr_t>> ArrowOutputFrame::arrays() {
return {};
}

std::vector<std::vector<uintptr_t>> ArrowOutputFrame::schemas() {
return {};
}

size_t ArrowOutputFrame::num_blocks() const {
return 0;
}

std::vector<std::string> ArrowOutputFrame::names() const {
return {};
}

}

#endif
2 changes: 2 additions & 0 deletions cpp/arcticdb/arrow/arrow_output_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace arcticdb {
struct ArrowData;

struct ArrowOutputFrame {
ArrowOutputFrame() = default;

ArrowOutputFrame(
std::vector<std::vector<ArrowData>>&& data,
std::vector<std::string>&& names);
Expand Down
10 changes: 6 additions & 4 deletions cpp/arcticdb/arrow/arrow_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#ifdef ARCTICDB_ARROW_SUPPORT

#include <arcticdb/arrow/arrow_utils.hpp>
#include <sparrow/array.hpp>
#include <sparrow/arrow_interface/arrow_schema.hpp>
#include <sparrow/arrow_interface/array_data_to_arrow_array_converters.hpp>
#include <sparrow/sparrow.hpp>
#include <arcticdb/arrow/arrow_data.hpp>
#include <arcticdb/entity/frame_and_descriptor.hpp>
#include <arcticdb/column_store/memory_segment.hpp>
Expand Down Expand Up @@ -113,4 +113,6 @@ ArrowReadResult create_arrow_read_result(
return {version, std::move(arrow_frame), desc_proto.user_meta()};
}

} // namespace arcticdb
} // namespace arcticdb

#endif
4 changes: 3 additions & 1 deletion cpp/arcticdb/arrow/arrow_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ namespace arcticdb {

class SegmentInMemory;
struct ArrowData;
class Column;
struct FrameAndDescriptor;
struct DecodePathData;
class Column;

struct ArrowReadResult {

ArrowReadResult() = default;

ArrowReadResult(
const VersionedItem& versioned_item,
ArrowOutputFrame&& frame_data,
Expand Down
8 changes: 7 additions & 1 deletion cpp/arcticdb/arrow/test/test_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@

#include <gtest/gtest.h>

#ifdef ARCTICDB_ARROW_SUPPORT

#include <arcticdb/util/test/generators.hpp>
#include <arcticdb/arrow/arrow_utils.hpp>
#include <arcticdb/arrow/arrow_data.hpp>



TEST(Arrow, ConvertColumn) {
using namespace arcticdb;
using TDT = TypeDescriptorTag<DataTypeTag<DataType::UINT16>, DimensionTag<Dimension ::Dim0>>;
Expand Down Expand Up @@ -42,4 +46,6 @@ TEST(Arrow, ConvertSegment) {

auto vec = segment_to_arrow_data(segment);
ASSERT_EQ(vec.size(), 2);
}
}

#endif
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/chunked_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ class ChunkedBufferImpl {
#else
std::vector<BlockType*> blocks_;
std::vector<size_t> block_offsets_;
entity::AllocationType allocation_type_ = entity::AllocationType::DYNAMIC;
#endif
entity::AllocationType allocation_type_ = entity::AllocationType::DYNAMIC;
};

constexpr size_t PageSize = 4096;
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ folly::Future<folly::Unit> reduce_and_fix_columns(
std::vector<size_t> fields_to_reduce;
for (size_t idx=0; idx<frame.descriptor().fields().size(); ++idx) {
const auto& frame_field = frame.field(idx);
if (dynamic_schema ||
if (read_options.dynamic_schema_ ||
(slice_map->columns_.contains(frame_field.name()) && is_sequence_type(frame_field.type().data_type()))) {
fields_to_reduce.emplace_back(idx);
}
Expand All @@ -845,7 +845,7 @@ folly::Future<folly::Unit> reduce_and_fix_columns(
static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100);
return folly::collect(
folly::window(std::move(fields_to_reduce),
[context, frame, slice_map, shared_data, dynamic_schema, &handler_data] (size_t field) mutable {
[context, frame, slice_map, shared_data, read_options, &handler_data] (size_t field) mutable {
return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, read_options));
}, batch_size)).via(&async::io_executor()).unit();
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/python/adapt_read_dataframe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ inline auto adapt_read_df = [](ReadResult && ret) -> py::tuple{
return py::make_tuple(ret.item, std::move(ret.frame_data), pynorm, pyuser_meta, multi_key_meta, ret.multi_keys);
};

auto adapt_arrow_df = [](ArrowReadResult && ret) -> py::tuple{
inline auto adapt_arrow_df = [](ArrowReadResult && ret) -> py::tuple{
auto pyuser_meta = python_util::pb_to_python(ret.user_meta_);
return py::make_tuple(ret.versioned_item_, std::move(ret.frame_), pyuser_meta);
};
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/storage/file/file_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ version_store::ReadVersionOutput read_dataframe_from_file_internal(
const auto header_offset = key_data.key_offset_ + key_data.key_size_;
ARCTICDB_DEBUG(log::storage(), "Got header offset at {}", header_offset);
single_file_storage->load_header(header_offset, data_end - header_offset);
auto frame_and_descriptor = version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data).get();
return {std::move(versioned_item), std::move(frame_and_descriptor)};
return version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data).get();
}
} //namespace arcticdb
4 changes: 2 additions & 2 deletions cpp/arcticdb/util/decode_path_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ struct DecodePathDataImpl {

struct DecodePathData {
public:
const std::shared_ptr<UniqueStringMapType>& unique_string_map() const {
[[nodiscard]] const std::shared_ptr<UniqueStringMapType>& unique_string_map() const {
return data_->unique_string_map_.instance();
}

bool optimize_for_memory() const {
[[nodiscard]] bool optimize_for_memory() const {
return data_->optimize_for_memory_;
}

Expand Down
10 changes: 5 additions & 5 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept

version.def("write_dataframe_to_file", &write_dataframe_to_file);
version.def("read_dataframe_from_file",
[] (StreamId sid, const std::string(path), ReadQuery& read_query, const ReadOptions& read_options){
[] (StreamId sid, const std::string(path), std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options){
return adapt_read_df(read_dataframe_from_file(sid, path, read_query, read_options));
});

Expand Down Expand Up @@ -675,14 +675,14 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
&PythonVersionStore::write_dataframe_specific_version,
py::call_guard<SingleThreadMutexHolder>(), "Write a specific version of this dataframe to the store")
.def("read_dataframe_version",
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, ReadQuery& read_query, const ReadOptions& read_options) {
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options) {
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
return adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data));
},
py::call_guard<SingleThreadMutexHolder>(),
"Read the specified version of the dataframe from the store")
.def("read_dataframe_version_arrow",
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, ReadQuery& read_query, const ReadOptions& read_options) {
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options) {
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
return adapt_arrow_df(v.read_dataframe_version_arrow(sid, version_query, read_query, read_options, handler_data));
},
Expand Down Expand Up @@ -781,8 +781,8 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
const std::vector<VersionQuery>& version_queries,
std::vector<std::shared_ptr<ReadQuery>>& read_queries,
const ReadOptions& read_options){
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data();
return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options, handler_data));
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options));
},
py::call_guard<SingleThreadMutexHolder>(), "Read a dataframe from the store")
.def("batch_read_keys",
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/test/test_version_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,9 @@ TEST_F(VersionStoreTest, StressBatchReadUncompressed) {
std::vector<std::shared_ptr<ReadQuery>> read_queries;
ReadOptions read_options;
register_native_handler_data_factory();
auto handler_data = get_type_handler_data();
auto handler_data = get_type_handler_data(read_options.output_format());
read_options.set_batch_throw_on_error(true);
auto latest_versions = test_store_->batch_read(symbols, std::vector<VersionQuery>(10), read_queries, read_options, handler_data);
auto latest_versions = test_store_->batch_read(symbols, std::vector<VersionQuery>(10), read_queries, read_options);
for(auto&& [idx, version] : folly::enumerate(latest_versions)) {
auto expected = get_test_simple_frame(std::get<ReadResult>(version).item.symbol(), 10, idx);
bool equal = expected.segment_ == std::get<ReadResult>(version).frame_data.frame();
Expand Down
Loading

0 comments on commit eeebb76

Please sign in to comment.