Skip to content

Commit 08d14b3

Browse files
authored
fix: Accommodate IPC messages without continuation bytes (#629)
Before arrow 0.15, encapsulated messages started with just the length instead of with a continuation token
1 parent 57acfbe commit 08d14b3

File tree

2 files changed

+61
-52
lines changed

2 files changed

+61
-52
lines changed

src/nanoarrow/ipc/decoder.c

+43-45
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@
5252
#define ENODATA 120
5353
#endif
5454

55-
// A more readable expression way to refer to the fact that there are 8 bytes
56-
// at the beginning of every message header.
57-
static const int32_t kMessageHeaderPrefixSize = 8;
58-
5955
#define NANOARROW_IPC_MAGIC "ARROW1"
6056

6157
// Internal representation of a parsed "Field" from flatbuffers. This
@@ -272,14 +268,6 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
272268
}
273269
}
274270

275-
static inline uint32_t ArrowIpcReadContinuationBytes(struct ArrowBufferView* data) {
276-
uint32_t value;
277-
memcpy(&value, data->data.as_uint8, sizeof(uint32_t));
278-
data->data.as_uint8 += sizeof(uint32_t);
279-
data->size_bytes -= sizeof(uint32_t);
280-
return value;
281-
}
282-
283271
static inline int32_t ArrowIpcReadInt32LE(struct ArrowBufferView* data, int swap_endian) {
284272
int32_t value;
285273
memcpy(&value, data->data.as_uint8, sizeof(int32_t));
@@ -984,40 +972,49 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decode
984972

985973
// Returns NANOARROW_OK if data is large enough to read the first 8 bytes
986974
// of the message header, ESPIPE if reading more data might help, or EINVAL if the content
987-
// is not valid. Advances the input ArrowBufferView by 8 bytes.
975+
// is not valid. Advances the input ArrowBufferView by prefix_size (8 bytes or 4 bytes if
976+
// the message is pre-0.15 and has no continuation). Sets decoder->header_size_bytes
977+
// to the flatbuffers length plus the prefix_size.
988978
static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decoder,
989979
struct ArrowBufferView* data_mut,
990-
int32_t* message_size_bytes,
980+
int32_t* prefix_size_bytes,
991981
struct ArrowError* error) {
992982
struct ArrowIpcDecoderPrivate* private_data =
993983
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
994984

995-
if (data_mut->size_bytes < kMessageHeaderPrefixSize) {
985+
if (data_mut->size_bytes < 8) {
996986
ArrowErrorSet(error,
997987
"Expected data of at least 8 bytes but only %" PRId64 " bytes remain",
998988
data_mut->size_bytes);
999989
return ESPIPE;
1000990
}
1001991

1002-
uint32_t continuation = ArrowIpcReadContinuationBytes(data_mut);
1003-
if (continuation != 0xFFFFFFFF) {
1004-
ArrowErrorSet(error, "Expected 0xFFFFFFFF at start of message but found 0x%08X",
1005-
(unsigned int)continuation);
1006-
return EINVAL;
992+
int swap_endian = private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG;
993+
int32_t continuation = ArrowIpcReadInt32LE(data_mut, swap_endian);
994+
int32_t length;
995+
if ((uint32_t)continuation != 0xFFFFFFFF) {
996+
if (continuation < 0) {
997+
ArrowErrorSet(error, "Expected 0xFFFFFFFF at start of message but found 0x%08X",
998+
(unsigned int)continuation);
999+
return EINVAL;
1000+
}
1001+
// Tolerate pre-0.15 encapsulated messages which only had the length prefix
1002+
length = continuation;
1003+
*prefix_size_bytes = sizeof(length);
1004+
} else {
1005+
length = ArrowIpcReadInt32LE(data_mut, swap_endian);
1006+
*prefix_size_bytes = sizeof(continuation) + sizeof(length);
10071007
}
1008+
decoder->header_size_bytes = *prefix_size_bytes + length;
10081009

1009-
int swap_endian = private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG;
1010-
int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
1011-
*message_size_bytes = header_body_size_bytes + kMessageHeaderPrefixSize;
1012-
if (header_body_size_bytes < 0) {
1010+
if (length < 0) {
10131011
ArrowErrorSet(error,
1014-
"Expected message body size > 0 but found message body size of %" PRId32
1015-
" bytes",
1016-
header_body_size_bytes);
1012+
"Expected message size > 0 but found message size of %" PRId32 " bytes",
1013+
length);
10171014
return EINVAL;
10181015
}
10191016

1020-
if (header_body_size_bytes == 0) {
1017+
if (length == 0) {
10211018
ArrowErrorSet(error, "End of Arrow stream");
10221019
return ENODATA;
10231020
}
@@ -1029,8 +1026,10 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
10291026
struct ArrowBufferView data,
10301027
struct ArrowError* error) {
10311028
ArrowIpcDecoderResetHeaderInfo(decoder);
1032-
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
1033-
decoder, &data, &decoder->header_size_bytes, error));
1029+
int32_t prefix_size_bytes;
1030+
NANOARROW_RETURN_NOT_OK(
1031+
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, error));
1032+
NANOARROW_UNUSED(prefix_size_bytes);
10341033
return NANOARROW_OK;
10351034
}
10361035

@@ -1041,24 +1040,24 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
10411040
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
10421041

10431042
ArrowIpcDecoderResetHeaderInfo(decoder);
1044-
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
1045-
decoder, &data, &decoder->header_size_bytes, error));
1043+
int32_t prefix_size_bytes;
1044+
NANOARROW_RETURN_NOT_OK(
1045+
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, error));
10461046

10471047
// Check that data contains at least the entire header (return ESPIPE to signal
10481048
// that reading more data may help).
1049-
int64_t message_body_size = decoder->header_size_bytes - kMessageHeaderPrefixSize;
1050-
if (data.size_bytes < message_body_size) {
1049+
if (data.size_bytes < decoder->header_size_bytes - prefix_size_bytes) {
10511050
ArrowErrorSet(error,
1052-
"Expected >= %" PRId64 " bytes of remaining data but found %" PRId64
1051+
"Expected >= %d bytes of remaining data but found %" PRId64
10531052
" bytes in buffer",
1054-
message_body_size + kMessageHeaderPrefixSize,
1055-
data.size_bytes + kMessageHeaderPrefixSize);
1053+
decoder->header_size_bytes, data.size_bytes + prefix_size_bytes);
10561054
return ESPIPE;
10571055
}
10581056

10591057
// Run flatbuffers verification
10601058
enum flatcc_verify_error_no verify_error =
1061-
ns(Message_verify_as_root(data.data.as_uint8, message_body_size);
1059+
ns(Message_verify_as_root(data.data.as_uint8,
1060+
decoder->header_size_bytes - prefix_size_bytes);
10621061
if (verify_error != flatcc_verify_ok)) {
10631062
ArrowErrorSet(error, "Message flatbuffer verification failed (%d) %s",
10641063
(int)verify_error, flatcc_verify_error_string(verify_error));
@@ -1163,18 +1162,17 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
11631162
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
11641163

11651164
ArrowIpcDecoderResetHeaderInfo(decoder);
1166-
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
1167-
decoder, &data, &decoder->header_size_bytes, error));
1165+
int32_t prefix_size_bytes;
1166+
NANOARROW_RETURN_NOT_OK(
1167+
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, error));
11681168

11691169
// Check that data contains at least the entire header (return ESPIPE to signal
11701170
// that reading more data may help).
1171-
int64_t message_body_size = decoder->header_size_bytes - kMessageHeaderPrefixSize;
1172-
if (data.size_bytes < message_body_size) {
1171+
if (data.size_bytes < decoder->header_size_bytes - prefix_size_bytes) {
11731172
ArrowErrorSet(error,
1174-
"Expected >= %" PRId64 " bytes of remaining data but found %" PRId64
1173+
"Expected >= %d bytes of remaining data but found %" PRId64
11751174
" bytes in buffer",
1176-
message_body_size + kMessageHeaderPrefixSize,
1177-
data.size_bytes + kMessageHeaderPrefixSize);
1175+
decoder->header_size_bytes, data.size_bytes + prefix_size_bytes);
11781176
return ESPIPE;
11791177
}
11801178

src/nanoarrow/ipc/decoder_test.cc

+18-7
Original file line numberDiff line numberDiff line change
@@ -129,28 +129,28 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
129129
EXPECT_STREQ(error.message,
130130
"Expected data of at least 8 bytes but only 1 bytes remain");
131131

132-
uint32_t eight_bad_bytes[] = {0, 0};
133-
data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes);
134-
data.size_bytes = 8;
132+
uint32_t eight_bad_bytes[] = {negative_one_le * 256, 999};
133+
data.data.as_uint32 = eight_bad_bytes;
134+
data.size_bytes = sizeof(eight_bad_bytes);
135135
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
136136
EXPECT_STREQ(error.message,
137-
"Expected 0xFFFFFFFF at start of message but found 0x00000000");
137+
"Expected 0xFFFFFFFF at start of message but found 0xFFFFFF00");
138138

139139
ArrowErrorInit(&error);
140140
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
141141
EXPECT_STREQ(error.message,
142-
"Expected 0xFFFFFFFF at start of message but found 0x00000000");
142+
"Expected 0xFFFFFFFF at start of message but found 0xFFFFFF00");
143143

144144
eight_bad_bytes[0] = 0xFFFFFFFF;
145145
eight_bad_bytes[1] = negative_one_le;
146146
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
147147
EXPECT_STREQ(error.message,
148-
"Expected message body size > 0 but found message body size of -1 bytes");
148+
"Expected message size > 0 but found message size of -1 bytes");
149149

150150
ArrowErrorInit(&error);
151151
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
152152
EXPECT_STREQ(error.message,
153-
"Expected message body size > 0 but found message body size of -1 bytes");
153+
"Expected message size > 0 but found message size of -1 bytes");
154154

155155
eight_bad_bytes[1] = one_le;
156156
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
@@ -169,6 +169,17 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
169169
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ENODATA);
170170
EXPECT_STREQ(error.message, "End of Arrow stream");
171171

172+
uint32_t pre_continuation[] = {0, 0};
173+
data.data.as_uint32 = pre_continuation;
174+
data.size_bytes = sizeof(pre_continuation);
175+
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA);
176+
EXPECT_STREQ(error.message, "End of Arrow stream");
177+
178+
pre_continuation[0] = one_le << 3;
179+
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
180+
EXPECT_STREQ(error.message,
181+
"Expected >= 12 bytes of remaining data but found 8 bytes in buffer");
182+
172183
ArrowIpcDecoderReset(&decoder);
173184
}
174185

0 commit comments

Comments
 (0)