Skip to content

Commit

Permalink
Correct the Big Endian state serialization/deserialization for variou…
Browse files Browse the repository at this point in the history
…s aggregate functions.
  • Loading branch information
kothiga committed Oct 18, 2023
1 parent e4e9291 commit fcd4c4f
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 48 deletions.
4 changes: 4 additions & 0 deletions base/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ else ()
target_compile_definitions(common PUBLIC WITH_COVERAGE=0)
endif ()

if (TARGET ch_contrib::crc32_s390x)
target_link_libraries(common PUBLIC ch_contrib::crc32_s390x)
endif()

target_include_directories(common PUBLIC .. "${CMAKE_CURRENT_BINARY_DIR}/..")

target_link_libraries (common
Expand Down
16 changes: 10 additions & 6 deletions base/base/StringRef.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
#pragma clang diagnostic ignored "-Wreserved-identifier"
#endif

#if defined(__s390x__)
#include <base/crc32_s390x.h>
#define CRC_INT s390x_crc32
#endif

/**
* The std::string_view-like container to avoid creating strings to find substrings in the hash table.
Expand Down Expand Up @@ -264,8 +268,8 @@ inline size_t hashLessThan8(const char * data, size_t size)

if (size >= 4)
{
UInt64 a = unalignedLoad<uint32_t>(data);
return hashLen16(size + (a << 3), unalignedLoad<uint32_t>(data + size - 4));
UInt64 a = unalignedLoadLittleEndian<uint32_t>(data);
return hashLen16(size + (a << 3), unalignedLoadLittleEndian<uint32_t>(data + size - 4));
}

if (size > 0)
Expand All @@ -285,8 +289,8 @@ inline size_t hashLessThan16(const char * data, size_t size)
{
if (size > 8)
{
UInt64 a = unalignedLoad<UInt64>(data);
UInt64 b = unalignedLoad<UInt64>(data + size - 8);
UInt64 a = unalignedLoadLittleEndian<UInt64>(data);
UInt64 b = unalignedLoadLittleEndian<UInt64>(data + size - 8);
return hashLen16(a, rotateByAtLeast1(b + size, static_cast<UInt8>(size))) ^ b;
}

Expand Down Expand Up @@ -315,13 +319,13 @@ struct CRC32Hash

do
{
UInt64 word = unalignedLoad<UInt64>(pos);
UInt64 word = unalignedLoadLittleEndian<UInt64>(pos);
res = static_cast<unsigned>(CRC_INT(res, word));

pos += 8;
} while (pos + 8 < end);

UInt64 word = unalignedLoad<UInt64>(end - 8); /// I'm not sure if this is normal.
UInt64 word = unalignedLoadLittleEndian<UInt64>(end - 8); /// I'm not sure if this is normal.
res = static_cast<unsigned>(CRC_INT(res, word));

return res;
Expand Down
26 changes: 26 additions & 0 deletions base/base/crc32_s390x.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <crc32-s390x.h>

inline uint32_t s390x_crc32_u8(uint32_t crc, uint8_t v)
{
return crc32c_le_vx(crc, reinterpret_cast<unsigned char *>(&v), sizeof(v));
}

inline uint32_t s390x_crc32_u16(uint32_t crc, uint16_t v)
{
v = __builtin_bswap16(v);
return crc32c_le_vx(crc, reinterpret_cast<unsigned char *>(&v), sizeof(v));
}

inline uint32_t s390x_crc32_u32(uint32_t crc, uint32_t v)
{
v = __builtin_bswap32(v);
return crc32c_le_vx(crc, reinterpret_cast<unsigned char *>(&v), sizeof(v));
}

inline uint64_t s390x_crc32(uint64_t crc, uint64_t v)
{
v = __builtin_bswap64(v);
return crc32c_le_vx(static_cast<uint32_t>(crc), reinterpret_cast<unsigned char *>(&v), sizeof(uint64_t));
}
2 changes: 1 addition & 1 deletion src/AggregateFunctions/AggregateFunctionGroupUniqArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AggregateFunctionGroupUniqArray
size_t size = set.size();
writeVarUInt(size, buf);
for (const auto & elem : set)
writeIntBinary(elem, buf);
writeBinaryLittleEndian(elem.key, buf);
}

void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
Expand Down
4 changes: 2 additions & 2 deletions src/AggregateFunctions/AggregateFunctionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ struct AggregateFunctionMapCombinatorData
using SearchType = KeyType;
std::unordered_map<KeyType, AggregateDataPtr> merged_maps;

static void writeKey(KeyType key, WriteBuffer & buf) { writeBinary(key, buf); }
static void readKey(KeyType & key, ReadBuffer & buf) { readBinary(key, buf); }
static void writeKey(KeyType key, WriteBuffer & buf) { writeBinaryLittleEndian(key, buf); }
static void readKey(KeyType & key, ReadBuffer & buf) { readBinaryLittleEndian(key, buf); }
};

template <>
Expand Down
8 changes: 4 additions & 4 deletions src/AggregateFunctions/AggregateFunctionMinMaxAny.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ struct SingleValueDataFixed
{
writeBinary(has(), buf);
if (has())
writeBinary(value, buf);
writeBinaryLittleEndian(value, buf);
}

void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *)
{
readBinary(has_value, buf);
if (has())
readBinary(value, buf);
readBinaryLittleEndian(value, buf);
}


Expand Down Expand Up @@ -1275,13 +1275,13 @@ struct AggregateFunctionAnyHeavyData : Data
void write(WriteBuffer & buf, const ISerialization & serialization) const
{
Data::write(buf, serialization);
writeBinary(counter, buf);
writeBinaryLittleEndian(counter, buf);
}

void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena)
{
Data::read(buf, serialization, arena);
readBinary(counter, buf);
readBinaryLittleEndian(counter, buf);
}

static const char * name() { return "anyHeavy"; }
Expand Down
4 changes: 0 additions & 4 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,6 @@ if (TARGET ch_contrib::cpuid)
target_link_libraries(clickhouse_common_io PRIVATE ch_contrib::cpuid)
endif()

if (TARGET ch_contrib::crc32_s390x)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::crc32_s390x)
endif()

if (TARGET ch_contrib::crc32-vpmsum)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::crc32-vpmsum)
endif()
Expand Down
30 changes: 5 additions & 25 deletions src/Common/HashTable/Hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,7 @@ inline DB::UInt64 intHash64(DB::UInt64 x)
#endif

#if defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__
#include <crc32-s390x.h>

inline uint32_t s390x_crc32_u8(uint32_t crc, uint8_t v)
{
return crc32c_le_vx(crc, reinterpret_cast<unsigned char *>(&v), sizeof(v));
}

inline uint32_t s390x_crc32_u16(uint32_t crc, uint16_t v)
{
v = std::byteswap(v);
return crc32c_le_vx(crc, reinterpret_cast<unsigned char *>(&v), sizeof(v));
}

inline uint32_t s390x_crc32_u32(uint32_t crc, uint32_t v)
{
v = std::byteswap(v);
return crc32c_le_vx(crc, reinterpret_cast<unsigned char *>(&v), sizeof(v));
}

inline uint64_t s390x_crc32(uint64_t crc, uint64_t v)
{
v = std::byteswap(v);
return crc32c_le_vx(static_cast<uint32_t>(crc), reinterpret_cast<unsigned char *>(&v), sizeof(uint64_t));
}
#include <base/crc32_s390x.h>
#endif

/// NOTE: Intel intrinsic can be confusing.
Expand Down Expand Up @@ -557,7 +534,10 @@ struct IntHash32
else if constexpr (sizeof(T) <= sizeof(UInt64))
{
DB::UInt64 out {0};
std::memcpy(&out, &key, sizeof(T));
if constexpr (std::endian::native == std::endian::little)
std::memcpy(&out, &key, sizeof(T));
else
std::memcpy(reinterpret_cast<char*>(&out) + sizeof(DB::UInt64) - sizeof(T), &key, sizeof(T));
return intHash32<salt>(out);
}

Expand Down
44 changes: 42 additions & 2 deletions src/Common/HyperLogLogCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Common/HyperLogLogBiasEstimator.h>
#include <Common/CompactArray.h>
#include <Common/HashTable/Hash.h>
#include <Common/TransformEndianness.hpp>

#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
Expand Down Expand Up @@ -330,7 +331,26 @@ class HyperLogLogCounter : private Hash

void read(DB::ReadBuffer & in)
{
in.readStrict(reinterpret_cast<char *>(this), sizeof(*this));
if constexpr (std::endian::native == std::endian::little)
in.readStrict(reinterpret_cast<char *>(this), sizeof(*this));
else
{
in.readStrict(reinterpret_cast<char *>(&rank_store), sizeof(RankStore));

constexpr size_t denom_size = sizeof(DenominatorCalculatorType);
std::array<char, denom_size> denominator_copy;
in.readStrict(denominator_copy.begin(), denom_size);

for (size_t i = 0; i < denominator_copy.size(); i += (sizeof(UInt32) / sizeof(char)))
{
UInt32 * cur = reinterpret_cast<UInt32 *>(&denominator_copy[i]);
DB::transformEndianness<std::endian::native, std::endian::little>(*cur);
}
memcpy(reinterpret_cast<char *>(&denominator), denominator_copy.begin(), denom_size);

in.readStrict(reinterpret_cast<char *>(&zeros), sizeof(ZerosCounterType));
DB::transformEndianness<std::endian::native, std::endian::little>(zeros);
}
}

void readAndMerge(DB::ReadBuffer & in)
Expand All @@ -352,7 +372,27 @@ class HyperLogLogCounter : private Hash

void write(DB::WriteBuffer & out) const
{
out.write(reinterpret_cast<const char *>(this), sizeof(*this));
if constexpr (std::endian::native == std::endian::little)
out.write(reinterpret_cast<const char *>(this), sizeof(*this));
else
{
out.write(reinterpret_cast<const char *>(&rank_store), sizeof(RankStore));

constexpr size_t denom_size = sizeof(DenominatorCalculatorType);
std::array<char, denom_size> denominator_copy;
memcpy(denominator_copy.begin(), reinterpret_cast<const char *>(&denominator), denom_size);

for (size_t i = 0; i < denominator_copy.size(); i += (sizeof(UInt32) / sizeof(char)))
{
UInt32 * cur = reinterpret_cast<UInt32 *>(&denominator_copy[i]);
DB::transformEndianness<std::endian::little, std::endian::native>(*cur);
}
out.write(denominator_copy.begin(), denom_size);

auto zeros_copy = zeros;
DB::transformEndianness<std::endian::little, std::endian::native>(zeros_copy);
out.write(reinterpret_cast<const char *>(&zeros_copy), sizeof(ZerosCounterType));
}
}

/// Read and write in text mode is suboptimal (but compatible with OLAPServer and Metrage).
Expand Down
10 changes: 8 additions & 2 deletions src/Common/SpaceSaving.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,20 @@ class SpaceSaving

void write(WriteBuffer & wb) const
{
writeBinary(key, wb);
if constexpr(std::is_same_v<TKey, StringRef>)
writeBinary(key, wb);
else
writeBinaryLittleEndian(key, wb);
writeVarUInt(count, wb);
writeVarUInt(error, wb);
}

void read(ReadBuffer & rb)
{
readBinary(key, rb);
if constexpr(std::is_same_v<TKey, StringRef>)
readBinary(key, rb);
else
readBinaryLittleEndian(key, rb);
readVarUInt(count, rb);
readVarUInt(error, rb);
}
Expand Down
10 changes: 8 additions & 2 deletions src/DataTypes/Serializations/SerializationIPv4andIPv6.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,18 @@ class SerializationIP : public SimpleTextSerialization
void serializeBinary(const Field & field, WriteBuffer & ostr, const FormatSettings &) const override
{
IPv x = field.get<IPv>();
writeBinaryLittleEndian(x, ostr);
if constexpr (std::is_same_v<IPv, IPv6>)
writeBinary(x, ostr);
else
writeBinaryLittleEndian(x, ostr);
}
void deserializeBinary(Field & field, ReadBuffer & istr, const FormatSettings &) const override
{
IPv x;
readBinaryLittleEndian(x.toUnderType(), istr);
if constexpr (std::is_same_v<IPv, IPv6>)
readBinary(x, istr);
else
readBinaryLittleEndian(x, istr);
field = NearestFieldType<IPv>(x);
}
void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override
Expand Down

0 comments on commit fcd4c4f

Please sign in to comment.