Skip to content

Commit

Permalink
Finish tests
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Jan 10, 2025
1 parent f77a722 commit 2856226
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 8 deletions.
3 changes: 3 additions & 0 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ void decode_v2(const Segment& segment,
auto& col = res.column(static_cast<position_t>(*col_index));

data += decode_field(res.field(*col_index).type(), *encoded_field, data, col, col.opt_sparse_map(), hdr.encoding_version());
col.set_statistics(encoded_field->get_statistics());

seg_row_count = std::max(seg_row_count, calculate_last_row(col));
} else {
data += encoding_sizes::field_compressed_size(*encoded_field) + sizeof(ColumnMagic);
Expand Down Expand Up @@ -533,6 +535,7 @@ void decode_v1(const Segment& segment,
hdr.encoding_version()
);
seg_row_count = std::max(seg_row_count, calculate_last_row(col));
col.set_statistics(field.get_statistics());
ARCTICDB_TRACE(log::codec(), "Decoded column {} to position {}", i, data - begin);
} else {
data += encoding_sizes::field_compressed_size(field);
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/codec/encode_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ namespace arcticdb {
encoded_fields.reserve(encoded_buffer_size, in_mem_seg.num_columns());
ARCTICDB_TRACE(log::codec(), "Encoding fields");
for (std::size_t column_index = 0; column_index < in_mem_seg.num_columns(); ++column_index) {
auto column_data = in_mem_seg.column_data(column_index);
const auto& column = in_mem_seg.column(column_index);
auto column_data = column.data();
auto* column_field = encoded_fields.add_field(column_data.num_blocks());
if(column_data.num_blocks() > 0) {
encoder.encode(codec_opts, column_data, *column_field, *out_buffer, pos);
Expand All @@ -147,6 +148,7 @@ namespace arcticdb {
auto* ndarray = column_field->mutable_ndarray();
ndarray->set_items_count(0);
}
column_field->set_statistics(column.get_statistics());
}
encode_string_pool<EncodingPolicyV1>(in_mem_seg, segment_header, codec_opts, *out_buffer, pos);
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/codec/encode_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,12 @@ static void encode_encoded_fields(
ARCTICDB_TRACE(log::codec(), "Encoding fields");
for (std::size_t column_index = 0; column_index < in_mem_seg.num_columns(); ++column_index) {
write_magic<ColumnMagic>(*out_buffer, pos);
auto column_data = in_mem_seg.column_data(column_index);
const auto& column = in_mem_seg.column(column_index);
auto column_data = column.data();
auto* column_field = encoded_fields.add_field(column_data.num_blocks());
if(column.has_statistics())
column_field->set_statistics(column.get_statistics());

ARCTICDB_TRACE(log::codec(),"Beginning encoding of column {}: ({}) to position {}", column_index, in_mem_seg.descriptor().field(column_index).name(), pos);

if(column_data.num_blocks() > 0) {
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/codec/encoded_field.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@ struct EncodedFieldImpl : public EncodedField {
sparse_map_bytes_ = bytes;
}

void set_statistics(FieldStats stats) {
stats_ = stats;
}

FieldStats get_statistics() const {
return stats_;
}

EncodedBlock *add_values(EncodingVersion encoding_version) {
const bool old_style = encoding_version == EncodingVersion::V1;
size_t pos;
Expand Down
5 changes: 5 additions & 0 deletions cpp/arcticdb/codec/protobuf_mappings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "arcticdb/storage/memory_layout.hpp"
#include <arcticdb/codec/segment_header.hpp>
#include <arcticdb/codec/protobuf_mappings.hpp>
#include <arcticdb/stream/protobuf_mappings.hpp>
#include <folly/container/Enumerate.h>

namespace arcticdb {
Expand Down Expand Up @@ -78,6 +79,8 @@ void encoded_field_from_proto(const arcticdb::proto::encoding::EncodedField& inp
auto* value_block = output_ndarray->add_values(EncodingVersion::V1);
block_from_proto(input_ndarray.values(i), *value_block, false);
}

output.set_statistics(create_from_proto(input.stats()));
}

void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto::encoding::EncodedField& output) {
Expand All @@ -97,6 +100,8 @@ void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto:
auto* value_block = output_ndarray->add_values();
proto_from_block(input_ndarray.values(i), *value_block);
}

field_stats_to_proto(input.get_statistics(), *output.mutable_stats());
}

size_t num_blocks(const arcticdb::proto::encoding::EncodedField& field) {
Expand Down
80 changes: 80 additions & 0 deletions cpp/arcticdb/codec/test/test_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,86 @@ TEST(Segment, RoundtripTimeseriesDescriptorWriteToBufferV2) {
ASSERT_EQ(decoded, copy);
}

TEST(Segment, RoundtripStatisticsV1) {
ScopedConfig reload_interval("Statistics.GenerateOnWrite", 1);
const auto stream_desc = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "int8"),
scalar_field(DataType::FLOAT64, "doubles")
});

SegmentInMemory in_mem_seg{stream_desc.clone()};
constexpr size_t num_rows = 10;
for(auto i = 0UL; i < num_rows; ++i) {
in_mem_seg.set_scalar<uint8_t>(0, static_cast<uint8_t>(i));
in_mem_seg.set_scalar<double>(1, static_cast<double>(i * 2));
in_mem_seg.end_row();
}
in_mem_seg.calculate_statistics();
auto copy = in_mem_seg.clone();
auto seg = encode_v1(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
SegmentInMemory decoded{stream_desc.clone()};
decode_v1(unserialized, unserialized.header(), decoded, unserialized.descriptor());
auto col1_stats = decoded.column(0).get_statistics();
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_max<uint8_t>(), 9);
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col1_stats.has_unique());
ASSERT_EQ(col1_stats.get_unique_count(), 10);
auto col2_stats = decoded.column(1).get_statistics();
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_max<double>(), 18.0);
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col2_stats.has_unique());
ASSERT_EQ(col2_stats.get_unique_count(), 10);
}

TEST(Segment, RoundtripStatisticsV2) {
ScopedConfig reload_interval("Statistics.GenerateOnWrite", 1);
const auto stream_desc = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "int8"),
scalar_field(DataType::FLOAT64, "doubles")
});

SegmentInMemory in_mem_seg{stream_desc.clone()};
constexpr size_t num_rows = 10;
for(auto i = 0UL; i < num_rows; ++i) {
in_mem_seg.set_scalar<uint8_t>(0, static_cast<uint8_t>(i));
in_mem_seg.set_scalar<double>(1, static_cast<double>(i * 2));
in_mem_seg.end_row();
}
in_mem_seg.calculate_statistics();
auto copy = in_mem_seg.clone();
auto seg = encode_v2(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
SegmentInMemory decoded{stream_desc.clone()};
decode_v2(unserialized, unserialized.header(), decoded, unserialized.descriptor());
auto col1_stats = decoded.column(0).get_statistics();
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_max<uint8_t>(), 9);
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col1_stats.has_unique());
ASSERT_EQ(col1_stats.get_unique_count(), 10);
auto col2_stats = decoded.column(1).get_statistics();
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_max<double>(), 18.0);
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col2_stats.has_unique());
ASSERT_EQ(col2_stats.get_unique_count(), 10);
}

TEST(Segment, ColumnNamesProduceDifferentHashes) {
const auto stream_desc_1 = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "ints1"),
Expand Down
12 changes: 12 additions & 0 deletions cpp/arcticdb/column_store/statistics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ struct FieldStatsImpl : public FieldStats {
return value;
}

size_t get_unique_count() const {
return unique_count_;
}

FieldStatsImpl(FieldStats base) {
min_ = base.min_;
max_ = base.max_;
unique_count_ = base.unique_count_;
unique_count_precision_ = base.unique_count_precision_;
set_ = base.set_;
}

template<typename T>
FieldStatsImpl(
T min,
Expand Down
5 changes: 4 additions & 1 deletion cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ std::tuple<stream::StreamSink::PartialKey, SegmentInMemory, FrameSlice> WriteToS
}

agg.end_block_write(rows_to_write);
agg.segment().calculate_statistics();

if(ConfigsMap().instance()->get_int("Statistics.GenerateOnWrite", 0) == 1)
agg.segment().calculate_statistics();

agg.finalize();
return output;
});
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/test/test_version_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,4 +852,4 @@ TEST(VersionStore, TestWriteAppendMapHead) {
auto [next_key, total_rows] = read_head(version_store._test_get_store(), symbol);
ASSERT_EQ(next_key, key);
ASSERT_EQ(total_rows, num_rows);
}
}
7 changes: 3 additions & 4 deletions cpp/arcticdb/version/version_store_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ using namespace arcticdb::entity;
namespace as = arcticdb::stream;

/**
* PythonVersionStore contains all the Python cruft that isn't portable, as well as non-essential features that are
* part of the backwards-compatibility with Arctic Python but that we think are random/a bad idea and aren't part of
* the main product.
* The purpose of this class is to perform python-specific translations into either native C++ or protobuf objects
* so that the LocalVersionedEngine contains only partable C++ code.
*/
class PythonVersionStore : public LocalVersionedEngine {

Expand Down Expand Up @@ -73,7 +72,7 @@ class PythonVersionStore : public LocalVersionedEngine {
VersionedItem write_versioned_composite_data(
const StreamId& stream_id,
const py::object &metastruct,
const std::vector<StreamId> &sub_keys, // TODO: make this optional?
const std::vector<StreamId> &sub_keys,
const std::vector<py::tuple> &items,
const std::vector<py::object> &norm_metas,
const py::object &user_meta,
Expand Down

0 comments on commit 2856226

Please sign in to comment.