Skip to content

Commit

Permalink
fix: Unaligned memory access in ByteStream and PrestoSerializer (face…
Browse files Browse the repository at this point in the history
…bookincubator#11574)

Summary:

We have a few places in the serialization code that do unaligned memory
access.  This is causing crashes in `ByteOutputStream::append<int128>` and we
clean them up in this change.

Reviewed By: yuandagits

Differential Revision: D66125705
  • Loading branch information
Yuhta authored and facebook-github-bot committed Nov 19, 2024
1 parent 03deeaf commit c44d65d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 117 deletions.
51 changes: 28 additions & 23 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
#pragma once

#include <folly/io/IOBuf.h>
#include "velox/common/base/Scratch.h"
#include "velox/common/memory/StreamArena.h"
#include "velox/type/Type.h"

#include <folly/Bits.h>
#include <folly/io/IOBuf.h>

#include <memory>

namespace facebook::velox {
Expand Down Expand Up @@ -122,20 +123,24 @@ class ByteInputStream {

template <typename T>
T read() {
static_assert(std::is_trivially_copyable_v<T>);
if (current_->position + sizeof(T) <= current_->size) {
auto* source = current_->buffer + current_->position;
current_->position += sizeof(T);
return *reinterpret_cast<const T*>(
current_->buffer + current_->position - sizeof(T));
return folly::loadUnaligned<T>(source);
}
// The number straddles two buffers. We read byte by byte and make a
// little-endian uint64_t. The bytes can be cast to any integer or floating
// point type since the wire format has the machine byte order.
static_assert(sizeof(T) <= sizeof(uint64_t));
uint64_t value = 0;
union {
uint64_t bits;
T typed;
} value{};
for (int32_t i = 0; i < sizeof(T); ++i) {
value |= static_cast<uint64_t>(readByte()) << (i * 8);
value.bits |= static_cast<uint64_t>(readByte()) << (i * 8);
}
return *reinterpret_cast<const T*>(&value);
return value.typed;
}

template <typename Char>
Expand Down Expand Up @@ -288,21 +293,15 @@ class ByteOutputStream {

template <typename T>
void append(folly::Range<const T*> values) {
static_assert(std::is_trivially_copyable_v<T>);
if (current_->position + sizeof(T) * values.size() > current_->size) {
appendStringView(std::string_view(
reinterpret_cast<const char*>(&values[0]),
values.size() * sizeof(T)));
return;
}

auto* target = reinterpret_cast<T*>(current_->buffer + current_->position);
const auto* end = target + values.size();
auto* valuePtr = &values[0];
while (target != end) {
*target = *valuePtr;
++target;
++valuePtr;
}
auto* target = current_->buffer + current_->position;
memcpy(target, values.data(), values.size() * sizeof(T));
current_->position += sizeof(T) * values.size();
}

Expand All @@ -317,10 +316,11 @@ class ByteOutputStream {
// There must be 8 bytes writable. If available is 56, there are 7, so >.
if (available > 56) {
const auto offset = position & 7;
uint64_t* buffer =
reinterpret_cast<uint64_t*>(current_->buffer + (position >> 3));
const auto mask = bits::lowMask(offset);
*buffer = (*buffer & mask) | (bits[0] << offset);
auto* buffer = current_->buffer + (position >> 3);
auto value = folly::loadUnaligned<uint64_t>(buffer);
value = (value & mask) | (bits[0] << offset);
folly::storeUnaligned(buffer, value);
current_->position += end;
return;
}
Expand Down Expand Up @@ -362,23 +362,22 @@ class ByteOutputStream {
// Returns a range of 'size' items of T. If there is no contiguous space in
// 'this', uses 'scratch' to make a temp block that is appended to 'this' in
template <typename T>
T* getAppendWindow(int32_t size, ScratchPtr<T>& scratchPtr) {
uint8_t* getAppendWindow(int32_t size, ScratchPtr<T>& scratchPtr) {
const int32_t bytes = sizeof(T) * size;
if (!current_) {
extend(bytes);
}
auto available = current_->size - current_->position;
if (available >= bytes) {
current_->position += bytes;
return reinterpret_cast<T*>(
current_->buffer + current_->position - bytes);
return current_->buffer + current_->position - bytes;
}
// If the tail is not large enough, make temp of the right size
// in scratch. Extend the stream so that there is guaranteed space to copy
// the scratch to the stream. This copy takes place in destruction of
// AppendWindow and must not allocate so that it is noexcept.
ensureSpace(bytes);
return scratchPtr.get(size);
return reinterpret_cast<uint8_t*>(scratchPtr.get(size));
}

void extend(int32_t bytes);
Expand Down Expand Up @@ -425,6 +424,12 @@ class ByteOutputStream {
friend class AppendWindow;
};

template <>
inline void ByteOutputStream::append(
folly::Range<const std::shared_ptr<void>*> /*values*/) {
VELOX_FAIL("Cannot serialize OPAQUE data");
}

/// A scoped wrapper that provides 'size' T's of writable space in 'stream'.
/// Normally gives an address into 'stream's buffer but can use 'scratch' to
/// make a contiguous piece if stream does not have a suitable run.
Expand All @@ -448,7 +453,7 @@ class AppendWindow {
}
}

T* get(int32_t size) {
uint8_t* get(int32_t size) {
return stream_.getAppendWindow(size, scratchPtr_);
}

Expand Down
12 changes: 12 additions & 0 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,18 @@ TEST_F(ByteStreamTest, reuse) {
}
}

TEST_F(ByteStreamTest, unalignedWrite) {
constexpr int kSize = 1 + sizeof(int128_t);
auto arena = newArena();
ByteOutputStream stream(arena.get());
stream.startWrite(kSize);
stream.appendStringView(std::string_view("x"));
int128_t data{};
// This only crashes in opt mode.
stream.append<int128_t>(folly::Range(&data, 1));
ASSERT_EQ(stream.size(), kSize);
}

class InputByteStreamTest : public ByteStreamTest,
public testing::WithParamInterface<bool> {
protected:
Expand Down
Loading

0 comments on commit c44d65d

Please sign in to comment.