Skip to content

Commit

Permalink
Add function serializeSingleColumn to PrestoVectorSerde (#10657)
Browse files Browse the repository at this point in the history
Summary:
prestodb/presto#23331 adds a native expression optimizer to delegate expression evaluation to the native sidecar. This is used to constant fold expressions on the presto native sidecar, instead of on the presto java coordinator (which is the current behavior). prestodb/presto#22927 implements a proxygen endpoint to accept `RowExpression`s from `NativeSidecarExpressionInterpreter`, optimize them if possible (rewrite special form expressions), and compile the `RowExpression` to a velox expression with constant folding enabled. This velox expression is then converted back to a `RowExpression` and returned by the sidecar to the coordinator.

When the constant folded velox expression is of type `velox::exec::ConstantExpr`, we need to return a `RowExpression` of type `ConstantExpression`. This requires us to serialize the constant value from `velox::exec::ConstantExpr` into `protocol::ConstantExpression::valueBlock`. This can be done by serializing the constant value vector to presto SerializedPage::column format, followed by base 64 encoding the result (reverse engineering the logic from `Base64Util.cpp::readBlock`).

This PR adds a new function, `serializeSingleColumn`, to `PrestoVectorSerde`. This can be used to serialize input data from vectors containing a single element into a single PrestoPage column format (without the PrestoPage header).
This function is not added to `PrestoBatchVectorSerializer` alongside the existing `serialize` function since that would require adding it as a virtual function in `BatchVectorSerializer` as well, and this is not desirable since the `PrestoPage` format is not relevant in this base class. There is an existing function `deserializeSingleColumn` in `PrestoVectorSerde` which is used to deserialize data from a single column, since `serializeSingleColumn` performs the inverse operation to this function, it is added alongside it in `PrestoVectorSerde`.

Pull Request resolved: #10657

Reviewed By: amitkdutta

Differential Revision: D66044754

Pulled By: pedroerp

fbshipit-source-id: e509605067920f8207e5a3fa67552badc2ce0eba
  • Loading branch information
pramodsatya authored and facebook-github-bot committed Nov 16, 2024
1 parent 3265e79 commit 00e8149
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
28 changes: 28 additions & 0 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4272,6 +4272,34 @@ void PrestoVectorSerde::deserializeSingleColumn(
*result = row->childAt(0);
}

void PrestoVectorSerde::serializeSingleColumn(
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output) {
const auto prestoOptions = toPrestoOptions(opts);
VELOX_USER_CHECK_EQ(
prestoOptions.compressionKind,
common::CompressionKind::CompressionKind_NONE);
VELOX_USER_CHECK_EQ(prestoOptions.nullsFirst, false);

const IndexRange range{0, vector->size()};
const auto arena = std::make_unique<StreamArena>(pool);
auto stream = std::make_unique<VectorStream>(
vector->type(),
std::nullopt,
std::nullopt,
arena.get(),
vector->size(),
prestoOptions);
Scratch scratch;
serializeColumn(vector, folly::Range(&range, 1), stream.get(), scratch);

PrestoOutputStreamListener listener;
OStreamOutputStream outputStream(output, &listener);
stream->flush(&outputStream);
}

namespace {
void initBitsToMapOnce() {
static folly::once_flag initOnceFlag;
Expand Down
16 changes: 16 additions & 0 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ namespace facebook::velox::serializer::presto {
/// 2. To serialize a single RowVector, one can use the BatchVectorSerializer
/// returned by createBatchSerializer(). Since it serializes a single RowVector,
/// it tries to preserve the encodings of the input data.
///
/// 3. To serialize data from a vector into a single column, adhering to
/// PrestoPage's column format and excluding the PrestoPage header, one can use
/// serializeSingleColumn() directly.
class PrestoVectorSerde : public VectorSerde {
public:
// Input options that the serializer recognizes.
Expand Down Expand Up @@ -145,6 +149,18 @@ class PrestoVectorSerde : public VectorSerde {
VectorPtr* result,
const Options* options);

/// This function is used to serialize data from input vector into a single
/// column that conforms to PrestoPage's column format. The serialized binary
/// data is uncompressed and starts at the column header since the PrestoPage
/// header is not included. The deserializeSingleColumn function can be used
/// to deserialize the serialized binary data returned by this function back
/// to the input vector.
void serializeSingleColumn(
const VectorPtr& vector,
const Options* opts,
memory::MemoryPool* pool,
std::ostream* output);

enum class TokenType {
HEADER,
NUM_COLUMNS,
Expand Down
50 changes: 37 additions & 13 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,16 @@ class PrestoSerializerTest
makeMapVector<StringView, int32_t>({})});
}

void testDeserializeSingleColumn(
const std::string& serializedData,
const VectorPtr& expected) {
auto byteStream = toByteStream(serializedData);
VectorPtr deserialized;
serde_->deserializeSingleColumn(
byteStream.get(), pool(), expected->type(), &deserialized, nullptr);
assertEqualVectors(expected, deserialized);
}

std::unique_ptr<serializer::presto::PrestoVectorSerde> serde_;
folly::Random::DefaultGenerator rng_;
};
Expand Down Expand Up @@ -1536,14 +1546,16 @@ INSTANTIATE_TEST_SUITE_P(
common::CompressionKind::CompressionKind_LZ4,
common::CompressionKind::CompressionKind_GZIP));

TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
// Verify that deserializeSingleColumn API can handle all supported types.
static const size_t kPrestoPageHeaderBytes = 21;
static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t);
static const size_t kBytesToTrim =
kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes;
TEST_F(PrestoSerializerTest, serdeSingleColumn) {
// The difference between serialized data obtained from
// PrestoIterativeVectorSerializer and serializeSingleColumn() is the
// PrestoPage header and number of columns section in the serialized data.
auto testSerializeRoundTrip = [&](const VectorPtr& vector) {
static const size_t kPrestoPageHeaderBytes = 21;
static const size_t kNumOfColumnsSerializedBytes = sizeof(int32_t);
static const size_t kBytesToTrim =
kPrestoPageHeaderBytes + kNumOfColumnsSerializedBytes;

auto testRoundTripSingleColumn = [&](const VectorPtr& vector) {
auto rowVector = makeRowVector({vector});
// Serialize to PrestoPage format.
std::ostringstream output;
Expand All @@ -1562,14 +1574,17 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
// Remove the PrestoPage header and Number of columns section from the
// serialized data.
std::string input = output.str().substr(kBytesToTrim);
testDeserializeSingleColumn(input, vector);
};

auto byteStream = toByteStream(input);
VectorPtr deserialized;
serde_->deserializeSingleColumn(
byteStream.get(), pool(), vector->type(), &deserialized, nullptr);
assertEqualVectors(vector, deserialized);
auto testSerializeSingleColumnRoundTrip = [&](const VectorPtr& vector) {
std::ostringstream output;
serde_->serializeSingleColumn(vector, nullptr, pool_.get(), &output);
const auto serialized = output.str();
testDeserializeSingleColumn(serialized, vector);
};

// Verify that (de)serializeSingleColumn API can handle all supported types.
std::vector<TypePtr> typesToTest = {
BOOLEAN(),
TINYINT(),
Expand Down Expand Up @@ -1607,7 +1622,16 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
for (const auto& type : typesToTest) {
SCOPED_TRACE(fmt::format("Type: {}", type->toString()));
auto data = fuzzer.fuzz(type);
testRoundTripSingleColumn(data);

// Test deserializeSingleColumn() round trip with serialized data obtained
// by PrestoIterativeVectorSerializer. This serialized data includes the
// PrestoPage header and number of columns, which is removed for testing.
testSerializeRoundTrip(data);

// Test serializeSingleColumn() round trip with deserializeSingleColumn(),
// both of these functions do not consider the PrestoPage header and number
// of columns when (de)serializing the data.
testSerializeSingleColumnRoundTrip(data);
}
}

Expand Down

0 comments on commit 00e8149

Please sign in to comment.