Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: New serialization/deserialization for DataTypeString #9608

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions dbms/src/Core/tests/gtest_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ try
"Decimal(40,2)",
"MyDate",
"MyDateTime",
"String",
DataTypeString::getDefaultName(),
"FixedString(10)"};
for (auto & type_name : all_types)
{
Expand All @@ -80,7 +80,11 @@ try
ArenaPtr pool = std::make_shared<Arena>();
pool->alloc(1024 * 1024);
/// case 1, agg function not allocate memory in arena
std::vector<String> types{"Int64", "String", "Nullable(Int64)", "Nullable(String)"};
std::vector<String> types{
"Int64",
DataTypeString::getDefaultName(),
"Nullable(Int64)",
DataTypeString::getNullableDefaultName()};
std::vector<size_t> data_size{
16,
ColumnString::APPROX_STRING_SIZE * 2,
Expand Down Expand Up @@ -139,7 +143,7 @@ try
String long_str(ColumnString::APPROX_STRING_SIZE * 5, 'a');
String short_str(std::max(1, ColumnString::APPROX_STRING_SIZE / 10), 'a');
std::vector<String> string_values{short_str, long_str};
std::vector<String> types{"String", "Nullable(String)"};
std::vector<String> types{DataTypeString::getDefaultName(), DataTypeString::getNullableDefaultName()};
for (const auto & string_value : string_values)
{
for (const auto & type_string : types)
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Core/tests/gtest_spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,8 @@ TEST_F(SpillerTest, SpillAndRestoreStringEnumData)
try
{
NamesAndTypes spiller_schema;
spiller_schema.emplace_back("col0", DataTypeFactory::instance().get("String"));
spiller_schema.emplace_back("col1", DataTypeFactory::instance().get("Nullable(String)"));
spiller_schema.emplace_back("col0", DataTypeFactory::instance().get(DataTypeString::getDefaultName()));
spiller_schema.emplace_back("col1", DataTypeFactory::instance().get(DataTypeString::getNullableDefaultName()));
spiller_schema.emplace_back("col2", DataTypeFactory::instance().get("Enum8('a' = 0,'b' = 1,'c' = 2)"));
spiller_schema.emplace_back("col3", DataTypeFactory::instance().get("Nullable(Enum8('a' = 0,'b' = 1,'c' = 2))"));
spiller_schema.emplace_back("col4", DataTypeFactory::instance().get("Enum16('a' = 0,'b' = 1,'c' = 2)"));
Expand Down Expand Up @@ -969,4 +969,4 @@ try
CATCH

} // namespace tests
} // namespace DB
} // namespace DB
194 changes: 192 additions & 2 deletions dbms/src/DataTypes/DataTypeString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,15 @@ bool DataTypeString::equals(const IDataType & rhs) const

void registerDataTypeString(DataTypeFactory & factory)
{
auto creator = static_cast<DataTypePtr (*)()>([] { return DataTypePtr(std::make_shared<DataTypeString>()); });
std::function<DataTypePtr()> legacy_creator = [] {
return std::make_shared<DataTypeString>(DataTypeString::SerdesFormat::SizePrefix);
};
factory.registerSimpleDataType(DataTypeString::LegacyName, legacy_creator);

factory.registerSimpleDataType("String", creator);
std::function<DataTypePtr()> creator = [] {
return std::make_shared<DataTypeString>(DataTypeString::SerdesFormat::SeparateSizeAndChars);
};
factory.registerSimpleDataType(DataTypeString::NameV1, creator);

/// These synonims are added for compatibility.

Expand All @@ -310,4 +316,188 @@ void registerDataTypeString(DataTypeFactory & factory)
factory.registerSimpleDataType("LONGBLOB", creator, DataTypeFactory::CaseInsensitive);
}

namespace
{

using Offset = ColumnString::Offset;

// Returns <offsets_stream, chars_stream>.
template <typename B, typename G>
std::pair<B *, B *> getStream(const G & getter, IDataType::SubstreamPath & path)
{
auto * chars_stream = getter(path);
path.emplace_back(IDataType::Substream::StringSizes);
auto * offsets_stream = getter(path);
return {offsets_stream, chars_stream};
}

PaddedPODArray<Offset> offsetToStrSize(
const ColumnString::Offsets & chars_offsets,
const size_t begin,
const size_t end)
{
assert(!chars_offsets.empty());
// The class PODArrayBase ensure chars_offsets[-1] is well defined as 0.
// For details, check the `pad_left` argument in PODArrayBase.
// In the for loop code below, when `begin` and `i` are 0:
// str_sizes[0] = chars_offsets[0] - chars_offsets[-1];
assert(chars_offsets[-1] == 0);

PaddedPODArray<Offset> str_sizes(end - begin);
auto chars_offsets_pos = chars_offsets.begin() + begin;

// clang-format off
#pragma clang loop vectorize(enable)
// clang-format on
for (ssize_t i = 0; i < static_cast<ssize_t>(str_sizes.size()); ++i)
{
str_sizes[i] = chars_offsets_pos[i] - chars_offsets_pos[i - 1];
}
return str_sizes;
}

void strSizeToOffset(const PaddedPODArray<Offset> & str_sizes, ColumnString::Offsets & chars_offsets)
{
assert(!str_sizes.empty());
assert(chars_offsets[-1] == 0);
const auto initial_size = chars_offsets.size();
chars_offsets.resize(initial_size + str_sizes.size());
auto chars_offsets_pos = chars_offsets.begin() + initial_size;
// Cannot be vectorize by compiler because chars_offsets[i] depends on chars_offsets[i-1]
// #pragma clang loop vectorize(enable)
for (ssize_t i = 0; i < static_cast<ssize_t>(str_sizes.size()); ++i)
{
chars_offsets_pos[i] = str_sizes[i] + chars_offsets_pos[i - 1];
}
}

std::pair<size_t, size_t> serializeOffsetsBinary(
const ColumnString::Offsets & chars_offsets,
WriteBuffer & ostr,
size_t offset,
size_t limit)
{
// [begin, end) is the range that need to be serialized of `chars_offsets`.
const auto begin = offset;
const auto end = limit != 0 && offset + limit < chars_offsets.size() ? offset + limit : chars_offsets.size();

PaddedPODArray<Offset> sizes = offsetToStrSize(chars_offsets, begin, end);
ostr.write(reinterpret_cast<const char *>(sizes.data()), sizeof(Offset) * sizes.size());

// [chars_begin, chars_end) is the range that need to be serialized of `chars`.
const auto chars_begin = begin == 0 ? 0 : chars_offsets[begin - 1];
const auto chars_end = chars_offsets[end - 1];
return {chars_begin, chars_end};
}

void serializeCharsBinary(const ColumnString::Chars_t & chars, WriteBuffer & ostr, size_t begin, size_t end)
{
ostr.write(reinterpret_cast<const char *>(&chars[begin]), end - begin);
}

size_t deserializeOffsetsBinary(ColumnString::Offsets & chars_offsets, ReadBuffer & istr, size_t limit)
{
PaddedPODArray<Offset> str_sizes(limit);
const auto size = istr.readBig(reinterpret_cast<char *>(str_sizes.data()), sizeof(Offset) * limit);
str_sizes.resize(size / sizeof(Offset));
strSizeToOffset(str_sizes, chars_offsets);
return std::accumulate(str_sizes.begin(), str_sizes.end(), 0uz);
}

void deserializeCharsBinary(ColumnString::Chars_t & chars, ReadBuffer & istr, size_t bytes)
{
const auto initial_size = chars.size();
chars.resize(initial_size + bytes);
istr.readStrict(reinterpret_cast<char *>(&chars[initial_size]), bytes);
}

void serializeBinaryBulkV2(
const IColumn & column,
WriteBuffer & offsets_stream,
WriteBuffer & chars_stream,
size_t offset,
size_t limit)
{
if (column.empty())
return;
const auto & column_string = typeid_cast<const ColumnString &>(column);
const auto & chars = column_string.getChars();
const auto & offsets = column_string.getOffsets();
auto [chars_begin, chars_end] = serializeOffsetsBinary(offsets, offsets_stream, offset, limit);
serializeCharsBinary(chars, chars_stream, chars_begin, chars_end);
}

void deserializeBinaryBulkV2(IColumn & column, ReadBuffer & offsets_stream, ReadBuffer & chars_stream, size_t limit)
{
if (limit == 0)
return;
auto & column_string = typeid_cast<ColumnString &>(column);
auto & chars = column_string.getChars();
auto & offsets = column_string.getOffsets();
auto bytes = deserializeOffsetsBinary(offsets, offsets_stream, limit);
deserializeCharsBinary(chars, chars_stream, bytes);
}

} // namespace

void DataTypeString::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
if (serdes_fmt == SerdesFormat::SeparateSizeAndChars)
{
path.emplace_back(Substream::StringSizes);
callback(path);
}
}

void DataTypeString::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (serdes_fmt == SerdesFormat::SeparateSizeAndChars)
{
auto [offsets_stream, chars_stream] = getStream<WriteBuffer, IDataType::OutputStreamGetter>(getter, path);
serializeBinaryBulkV2(column, *offsets_stream, *chars_stream, offset, limit);
}
else
{
serializeBinaryBulk(column, *getter(path), offset, limit);
}
}

void DataTypeString::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (serdes_fmt == SerdesFormat::SeparateSizeAndChars)
{
auto [offsets_stream, chars_stream] = getStream<ReadBuffer, IDataType::InputStreamGetter>(getter, path);
deserializeBinaryBulkV2(column, *offsets_stream, *chars_stream, limit);
}
else
{
deserializeBinaryBulk(column, *getter(path), limit, avg_value_size_hint);
}
}

String DataTypeString::getDefaultName()
{
if constexpr (DefaultSerdesFormat == SerdesFormat::SeparateSizeAndChars)
return NameV1;
return LegacyName;
}

String DataTypeString::getNullableDefaultName()
{
return fmt::format("Nullable({})", getDefaultName());
}

} // namespace DB
43 changes: 42 additions & 1 deletion dbms/src/DataTypes/DataTypeString.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <DataTypes/IDataType.h>


namespace DB
{
class DataTypeString final : public IDataType
Expand All @@ -27,6 +26,8 @@ class DataTypeString final : public IDataType

const char * getFamilyName() const override { return "String"; }

String getName() const override { return serdes_fmt == SerdesFormat::SeparateSizeAndChars ? NameV1 : LegacyName; }

TypeIndex getTypeId() const override { return TypeIndex::String; }

void serializeBinary(const Field & field, WriteBuffer & ostr) const override;
Expand Down Expand Up @@ -64,6 +65,46 @@ class DataTypeString final : public IDataType
bool isString() const override { return true; }
bool isCategorial() const override { return true; }
bool canBeInsideNullable() const override { return true; }

void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;

void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath & path) const override;

void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath & path) const override;

enum class SerdesFormat
{
SizePrefix = 0, // Legacy format, corresponding to `LegacyName`
SeparateSizeAndChars = 1, // New format, corresponding to `NameV1`
};

inline static constexpr auto DefaultSerdesFormat = SerdesFormat::SizePrefix;

inline static const String LegacyName{"String"}; // For compatibility of size-prefix format.
inline static const String NameV1{"StringV1"}; // The separate size and chars format.

// Both getDefaultName and getNullableDefaultName are unit-tests helpers.
static String getDefaultName();
static String getNullableDefaultName();

explicit DataTypeString(SerdesFormat serdes_fmt_ = DefaultSerdesFormat)
: serdes_fmt(serdes_fmt_)
{}

private:
const SerdesFormat serdes_fmt;
};

} // namespace DB
9 changes: 9 additions & 0 deletions dbms/src/DataTypes/IDataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ bool IDataType::isArraySizes(const SubstreamPath & path)
return false;
}

bool IDataType::isStringSizes(const SubstreamPath & path)
{
return std::any_of(path.cbegin(), path.cend(), [](const auto & elem) {
return elem.type == IDataType::Substream::StringSizes;
});
}

String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = Nested::extractTableName(column_name);
Expand All @@ -127,6 +134,8 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
/// and name is encoded as a whole.
stream_name += "%2E" + escapeForFileName(elem.tuple_element_name);
}
else if (elem.type == Substream::StringSizes)
stream_name += ".size";
}
return stream_name;
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class IDataType : private boost::noncopyable
NullMap,

TupleElement,

StringSizes,
};
Type type;

Expand Down Expand Up @@ -421,6 +423,7 @@ class IDataType : private boost::noncopyable

static bool isNullMap(const SubstreamPath & path);
static bool isArraySizes(const SubstreamPath & path);
static bool isStringSizes(const SubstreamPath & path);
};


Expand Down
Loading