Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vector comparisons #2119

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ set(arcticdb_srcs
util/lazy.hpp
util/type_traits.hpp
util/variant.hpp
util/min_max_integer.hpp
util/mean.hpp
util/min_max_float.hpp
util/sum.hpp
util/vector_common.hpp
version/de_dup_map.hpp
version/op_log.hpp
version/schema_checks.hpp
Expand Down Expand Up @@ -416,6 +421,7 @@ set(arcticdb_srcs
column_store/key_segment.cpp
column_store/memory_segment_impl.cpp
column_store/memory_segment_impl.cpp
column_store/statistics.hpp
column_store/string_pool.cpp
entity/data_error.cpp
entity/field_collection.cpp
Expand Down Expand Up @@ -522,7 +528,7 @@ set(arcticdb_srcs
version/version_core.cpp
version/version_store_api.cpp
version/version_utils.cpp
version/version_map_batch_methods.cpp)
version/version_map_batch_methods.cpp )

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -910,6 +916,7 @@ if(${TEST})
column_store/test/test_column_data_random_accessor.cpp
column_store/test/test_index_filtering.cpp
column_store/test/test_memory_segment.cpp
column_store/test/test_statistics.cpp
entity/test/test_atom_key.cpp
entity/test/test_key_serialization.cpp
entity/test/test_metrics.cpp
Expand Down Expand Up @@ -938,10 +945,12 @@ if(${TEST})
storage/test/test_storage_factory.cpp
storage/test/test_storage_exceptions.cpp
storage/test/test_azure_storage.cpp
storage/test/common.hpp
storage/test/test_storage_operations.cpp
stream/test/stream_test_common.cpp
stream/test/test_aggregator.cpp
stream/test/test_append_map.cpp
stream/test/test_protobuf_mappings.cpp
stream/test/test_row_builder.cpp
stream/test/test_segment_aggregator.cpp
stream/test/test_types.cpp
Expand All @@ -964,6 +973,9 @@ if(${TEST})
util/test/test_storage_lock.cpp
util/test/test_string_pool.cpp
util/test/test_string_utils.cpp
util/test/test_min_max_float.cpp
util/test/test_sum.cpp
util/test/test_mean.cpp
util/test/test_tracing_allocator.cpp
version/test/test_append.cpp
version/test/test_key_block.cpp
Expand All @@ -976,8 +988,7 @@ if(${TEST})
version/test/test_version_map_batch.cpp
version/test/test_version_store.cpp
version/test/version_map_model.hpp
python/python_handlers.cpp
storage/test/common.hpp)
python/python_handlers.cpp)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down Expand Up @@ -1081,7 +1092,7 @@ if(${TEST})
util/test/rapidcheck_string_pool.cpp
util/test/rapidcheck_main.cpp
util/test/rapidcheck_lru_cache.cpp
version/test/rapidcheck_version_map.cpp)
version/test/rapidcheck_version_map.cpp util/test/test_min_max_integer.cpp)

add_executable(arcticdb_rapidcheck_tests ${rapidcheck_srcs})
install(TARGETS arcticdb_rapidcheck_tests RUNTIME
Expand Down
3 changes: 3 additions & 0 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ void decode_v2(const Segment& segment,
auto& col = res.column(static_cast<position_t>(*col_index));

data += decode_field(res.field(*col_index).type(), *encoded_field, data, col, col.opt_sparse_map(), hdr.encoding_version());
col.set_statistics(encoded_field->get_statistics());

seg_row_count = std::max(seg_row_count, calculate_last_row(col));
} else {
data += encoding_sizes::field_compressed_size(*encoded_field) + sizeof(ColumnMagic);
Expand Down Expand Up @@ -533,6 +535,7 @@ void decode_v1(const Segment& segment,
hdr.encoding_version()
);
seg_row_count = std::max(seg_row_count, calculate_last_row(col));
col.set_statistics(field.get_statistics());
ARCTICDB_TRACE(log::codec(), "Decoded column {} to position {}", i, data - begin);
} else {
data += encoding_sizes::field_compressed_size(field);
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/codec/encode_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ namespace arcticdb {
encoded_fields.reserve(encoded_buffer_size, in_mem_seg.num_columns());
ARCTICDB_TRACE(log::codec(), "Encoding fields");
for (std::size_t column_index = 0; column_index < in_mem_seg.num_columns(); ++column_index) {
auto column_data = in_mem_seg.column_data(column_index);
const auto& column = in_mem_seg.column(column_index);
auto column_data = column.data();
auto* column_field = encoded_fields.add_field(column_data.num_blocks());
if(column_data.num_blocks() > 0) {
encoder.encode(codec_opts, column_data, *column_field, *out_buffer, pos);
Expand All @@ -147,6 +148,7 @@ namespace arcticdb {
auto* ndarray = column_field->mutable_ndarray();
ndarray->set_items_count(0);
}
column_field->set_statistics(column.get_statistics());
}
encode_string_pool<EncodingPolicyV1>(in_mem_seg, segment_header, codec_opts, *out_buffer, pos);
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/codec/encode_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,12 @@ static void encode_encoded_fields(
ARCTICDB_TRACE(log::codec(), "Encoding fields");
for (std::size_t column_index = 0; column_index < in_mem_seg.num_columns(); ++column_index) {
write_magic<ColumnMagic>(*out_buffer, pos);
auto column_data = in_mem_seg.column_data(column_index);
const auto& column = in_mem_seg.column(column_index);
auto column_data = column.data();
auto* column_field = encoded_fields.add_field(column_data.num_blocks());
if(column.has_statistics())
column_field->set_statistics(column.get_statistics());

ARCTICDB_TRACE(log::codec(),"Beginning encoding of column {}: ({}) to position {}", column_index, in_mem_seg.descriptor().field(column_index).name(), pos);

if(column_data.num_blocks() > 0) {
Expand Down
11 changes: 10 additions & 1 deletion cpp/arcticdb/codec/encoded_field.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ struct EncodedFieldImpl : public EncodedField {
sizeof(values_count_) +
sizeof(sparse_map_bytes_) +
sizeof(items_count_) +
sizeof(format_);
sizeof(format_) +
sizeof(stats_);

EncodedFieldImpl() = default;

Expand Down Expand Up @@ -366,6 +367,14 @@ struct EncodedFieldImpl : public EncodedField {
sparse_map_bytes_ = bytes;
}

void set_statistics(FieldStats stats) {
stats_ = stats;
}

FieldStats get_statistics() const {
return stats_;
}

EncodedBlock *add_values(EncodingVersion encoding_version) {
const bool old_style = encoding_version == EncodingVersion::V1;
size_t pos;
Expand Down
5 changes: 5 additions & 0 deletions cpp/arcticdb/codec/protobuf_mappings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "arcticdb/storage/memory_layout.hpp"
#include <arcticdb/codec/segment_header.hpp>
#include <arcticdb/codec/protobuf_mappings.hpp>
#include <arcticdb/stream/protobuf_mappings.hpp>
#include <folly/container/Enumerate.h>

namespace arcticdb {
Expand Down Expand Up @@ -78,6 +79,8 @@ void encoded_field_from_proto(const arcticdb::proto::encoding::EncodedField& inp
auto* value_block = output_ndarray->add_values(EncodingVersion::V1);
block_from_proto(input_ndarray.values(i), *value_block, false);
}

output.set_statistics(create_from_proto(input.stats()));
}

void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto::encoding::EncodedField& output) {
Expand All @@ -97,6 +100,8 @@ void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto:
auto* value_block = output_ndarray->add_values();
proto_from_block(input_ndarray.values(i), *value_block);
}

field_stats_to_proto(input.get_statistics(), *output.mutable_stats());
}

size_t num_blocks(const arcticdb::proto::encoding::EncodedField& field) {
Expand Down
80 changes: 80 additions & 0 deletions cpp/arcticdb/codec/test/test_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,86 @@ TEST(Segment, RoundtripTimeseriesDescriptorWriteToBufferV2) {
ASSERT_EQ(decoded, copy);
}

TEST(Segment, RoundtripStatisticsV1) {
ScopedConfig reload_interval("Statistics.GenerateOnWrite", 1);
const auto stream_desc = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "int8"),
scalar_field(DataType::FLOAT64, "doubles")
});

SegmentInMemory in_mem_seg{stream_desc.clone()};
constexpr size_t num_rows = 10;
for(auto i = 0UL; i < num_rows; ++i) {
in_mem_seg.set_scalar<uint8_t>(0, static_cast<uint8_t>(i));
in_mem_seg.set_scalar<double>(1, static_cast<double>(i * 2));
in_mem_seg.end_row();
}
in_mem_seg.calculate_statistics();
auto copy = in_mem_seg.clone();
auto seg = encode_v1(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
SegmentInMemory decoded{stream_desc.clone()};
decode_v1(unserialized, unserialized.header(), decoded, unserialized.descriptor());
auto col1_stats = decoded.column(0).get_statistics();
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_max<uint8_t>(), 9);
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col1_stats.has_unique());
ASSERT_EQ(col1_stats.get_unique_count(), 10);
auto col2_stats = decoded.column(1).get_statistics();
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_max<double>(), 18.0);
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col2_stats.has_unique());
ASSERT_EQ(col2_stats.get_unique_count(), 10);
}

TEST(Segment, RoundtripStatisticsV2) {
ScopedConfig reload_interval("Statistics.GenerateOnWrite", 1);
const auto stream_desc = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "int8"),
scalar_field(DataType::FLOAT64, "doubles")
});

SegmentInMemory in_mem_seg{stream_desc.clone()};
constexpr size_t num_rows = 10;
for(auto i = 0UL; i < num_rows; ++i) {
in_mem_seg.set_scalar<uint8_t>(0, static_cast<uint8_t>(i));
in_mem_seg.set_scalar<double>(1, static_cast<double>(i * 2));
in_mem_seg.end_row();
}
in_mem_seg.calculate_statistics();
auto copy = in_mem_seg.clone();
auto seg = encode_v2(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
SegmentInMemory decoded{stream_desc.clone()};
decode_v2(unserialized, unserialized.header(), decoded, unserialized.descriptor());
auto col1_stats = decoded.column(0).get_statistics();
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_max<uint8_t>(), 9);
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col1_stats.has_unique());
ASSERT_EQ(col1_stats.get_unique_count(), 10);
auto col2_stats = decoded.column(1).get_statistics();
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_max<double>(), 18.0);
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col2_stats.has_unique());
ASSERT_EQ(col2_stats.get_unique_count(), 10);
}

TEST(Segment, ColumnNamesProduceDifferentHashes) {
const auto stream_desc_1 = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "ints1"),
Expand Down
21 changes: 12 additions & 9 deletions cpp/arcticdb/column_store/block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
namespace arcticdb {

struct MemBlock {
static const size_t Align = 128;
static const size_t MinSize = 64;
using magic_t = arcticdb::util::MagicNum<'M', 'e', 'm', 'b'>;
magic_t magic_;
Expand Down Expand Up @@ -136,17 +135,21 @@ struct MemBlock {
bool owns_external_data_ = false;

static const size_t HeaderDataSize =
sizeof(magic_) + // 8 bytes
sizeof(bytes_) + // 8 bytes
sizeof(capacity_) + // 8 bytes
sizeof(magic_) +
sizeof(bytes_) +
sizeof(capacity_) +
sizeof(external_data_) +
sizeof(offset_) +
sizeof(timestamp_) +
sizeof(timestamp_) +
sizeof(owns_external_data_);

uint8_t pad[Align - HeaderDataSize];
static const size_t HeaderSize = HeaderDataSize + sizeof(pad);
static_assert(HeaderSize == Align);
uint8_t data_[MinSize];
static const size_t DataAlignment = 64;
static const size_t PadSize = (DataAlignment - (HeaderDataSize % DataAlignment)) % DataAlignment;

uint8_t pad[PadSize];
static const size_t HeaderSize = HeaderDataSize + PadSize;
static_assert(HeaderSize % DataAlignment == 0, "Header size must be aligned to 64 bytes");

alignas(DataAlignment) uint8_t data_[MinSize];
};
}
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/chunked_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ std::vector<ChunkedBufferImpl<BlockSize>> split(const ChunkedBufferImpl<BlockSiz
}

template std::vector<ChunkedBufferImpl<64>> split(const ChunkedBufferImpl<64>& input, size_t nbytes);
template std::vector<ChunkedBufferImpl<3968>> split(const ChunkedBufferImpl<3968>& input, size_t nbytes);
template std::vector<ChunkedBufferImpl<4032ul>> split(const ChunkedBufferImpl<4032ul>& input, size_t nbytes);

// Inclusive of start_byte, exclusive of end_byte
template <size_t BlockSize>
Expand Down Expand Up @@ -112,6 +112,6 @@ ChunkedBufferImpl<BlockSize> truncate(const ChunkedBufferImpl<BlockSize>& input,
}

template ChunkedBufferImpl<64> truncate(const ChunkedBufferImpl<64>& input, size_t start_byte, size_t end_byte);
template ChunkedBufferImpl<3968> truncate(const ChunkedBufferImpl<3968>& input, size_t start_byte, size_t end_byte);
template ChunkedBufferImpl<4032ul> truncate(const ChunkedBufferImpl<4032ul>& input, size_t start_byte, size_t end_byte);

} //namespace arcticdb
1 change: 0 additions & 1 deletion cpp/arcticdb/column_store/chunked_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class ChunkedBufferImpl {

using BlockType = MemBlock;

static_assert(sizeof(BlockType) == BlockType::Align + BlockType::MinSize);
static_assert(DefaultBlockSize >= BlockType::MinSize);

public:
Expand Down
14 changes: 14 additions & 0 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <arcticdb/column_store/chunked_buffer.hpp>
#include <arcticdb/column_store/column_data.hpp>
#include <arcticdb/column_store/statistics.hpp>
#include <arcticdb/column_store/column_data_random_accessor.hpp>
#include <arcticdb/entity/native_tensor.hpp>
#include <arcticdb/entity/performance_tracing.hpp>
Expand Down Expand Up @@ -251,6 +252,18 @@ class Column {

bool sparse_permitted() const;

void set_statistics(FieldStatsImpl stats) {
stats_ = stats;
}

bool has_statistics() const {
return stats_.set_;
};

FieldStatsImpl get_statistics() const {
return stats_;
}

void backfill_sparse_map(ssize_t to_row) {
ARCTICDB_TRACE(log::version(), "Backfilling sparse map to position {}", to_row);
// Initialise the optional to an empty bitset if it has not been created yet
Expand Down Expand Up @@ -936,6 +949,7 @@ class Column {
Sparsity allow_sparse_ = Sparsity::NOT_PERMITTED;

std::optional<util::BitMagic> sparse_map_;
FieldStatsImpl stats_;
util::MagicNum<'D', 'C', 'o', 'l'> magic_;
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ class SegmentInMemory {
impl_->reset_timeseries_descriptor();
}

void calculate_statistics() {
impl_->calculate_statistics();
}

[[nodiscard]] size_t num_columns() const { return impl_->num_columns(); }

[[nodiscard]] size_t row_count() const { return impl_->row_count(); }
Expand Down
14 changes: 14 additions & 0 deletions cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,20 @@ void SegmentInMemoryImpl::reset_timeseries_descriptor() {
tsd_.reset();
}

void SegmentInMemoryImpl::calculate_statistics() {
for(auto& column : columns_) {
if(column->type().dimension() == Dimension::Dim0) {
const auto type = column->type();
if(is_numeric_type(type.data_type()) || is_sequence_type(type.data_type())) {
type.visit_tag([&column] (auto tdt) {
using TagType = std::decay_t<decltype(tdt)>;
column->set_statistics(generate_column_statistics<TagType>(column->data()));
});
}
}
}
}

void SegmentInMemoryImpl::reset_metadata() {
metadata_.reset();
}
Expand Down
Loading
Loading