Skip to content

Commit

Permalink
Correctly skip buffer in pre-V5 metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
CurtHagenlocher committed Sep 23, 2023
1 parent 5ebefcf commit 143a469
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ protected RecordBatch CreateArrowObjectFromMessage(
break;
case Flatbuf.MessageHeader.DictionaryBatch:
Flatbuf.DictionaryBatch dictionaryBatch = message.Header<Flatbuf.DictionaryBatch>().Value;
ReadDictionaryBatch(dictionaryBatch, bodyByteBuffer, memoryOwner);
ReadDictionaryBatch(message.Version, dictionaryBatch, bodyByteBuffer, memoryOwner);
break;
case Flatbuf.MessageHeader.RecordBatch:
Flatbuf.RecordBatch rb = message.Header<Flatbuf.RecordBatch>().Value;
List<IArrowArray> arrays = BuildArrays(Schema, bodyByteBuffer, rb);
List<IArrowArray> arrays = BuildArrays(message.Version, Schema, bodyByteBuffer, rb);
return new RecordBatch(Schema, memoryOwner, arrays, (int)rb.Length);
default:
// NOTE: Skip unsupported message type
Expand All @@ -136,7 +136,11 @@ internal static ByteBuffer CreateByteBuffer(ReadOnlyMemory<byte> buffer)
return new ByteBuffer(new ReadOnlyMemoryBufferAllocator(buffer), 0);
}

private void ReadDictionaryBatch(Flatbuf.DictionaryBatch dictionaryBatch, ByteBuffer bodyByteBuffer, IMemoryOwner<byte> memoryOwner)
private void ReadDictionaryBatch(
MetadataVersion version,
Flatbuf.DictionaryBatch dictionaryBatch,
ByteBuffer bodyByteBuffer,
IMemoryOwner<byte> memoryOwner)
{
long id = dictionaryBatch.Id;
IArrowType valueType = DictionaryMemo.GetDictionaryType(id);
Expand All @@ -149,7 +153,7 @@ private void ReadDictionaryBatch(Flatbuf.DictionaryBatch dictionaryBatch, ByteBu

Field valueField = new Field("dummy", valueType, true);
var schema = new Schema(new[] { valueField }, default);
IList<IArrowArray> arrays = BuildArrays(schema, bodyByteBuffer, recordBatch.Value);
IList<IArrowArray> arrays = BuildArrays(version, schema, bodyByteBuffer, recordBatch.Value);

if (arrays.Count != 1)
{
Expand All @@ -167,6 +171,7 @@ private void ReadDictionaryBatch(Flatbuf.DictionaryBatch dictionaryBatch, ByteBu
}

private List<IArrowArray> BuildArrays(
MetadataVersion version,
Schema schema,
ByteBuffer messageBuffer,
Flatbuf.RecordBatch recordBatchMessage)
Expand All @@ -187,8 +192,8 @@ private List<IArrowArray> BuildArrays(
Flatbuf.FieldNode fieldNode = recordBatchEnumerator.CurrentNode;

ArrayData arrayData = field.DataType.IsFixedPrimitive()
? LoadPrimitiveField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer, bufferCreator)
: LoadVariableField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer, bufferCreator);
? LoadPrimitiveField(version, ref recordBatchEnumerator, field, in fieldNode, messageBuffer, bufferCreator)
: LoadVariableField(version, ref recordBatchEnumerator, field, in fieldNode, messageBuffer, bufferCreator);

arrays.Add(ArrowArrayFactory.BuildArray(arrayData));
} while (recordBatchEnumerator.MoveNextNode());
Expand Down Expand Up @@ -225,6 +230,7 @@ private IBufferCreator GetBufferCreator(BodyCompression? compression)
}

private ArrayData LoadPrimitiveField(
MetadataVersion version,
ref RecordBatchEnumerator recordBatchEnumerator,
Field field,
in Flatbuf.FieldNode fieldNode,
Expand All @@ -251,13 +257,16 @@ private ArrayData LoadPrimitiveField(
case ArrowTypeId.Null:
return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, System.Array.Empty<ArrowBuffer>());
case ArrowTypeId.Union:
if (fieldNullCount > 0)
if (version < MetadataVersion.V5)
{
if (recordBatchEnumerator.CurrentBuffer.Length > 0)
if (fieldNullCount > 0)
{
// With V4 metadata we can get a validity bitmap. Fixing up union data is hard,
// so we will just quit.
throw new NotSupportedException("Cannot read pre-1.0.0 Union array with top-level validity bitmap");
if (recordBatchEnumerator.CurrentBuffer.Length > 0)
{
// With older metadata we can get a validity bitmap. Fixing up union data is hard,
// so we will just quit.
throw new NotSupportedException("Cannot read pre-1.0.0 Union array with top-level validity bitmap");
}
}
recordBatchEnumerator.MoveNextBuffer();
}
Expand All @@ -279,7 +288,7 @@ private ArrayData LoadPrimitiveField(
recordBatchEnumerator.MoveNextBuffer();
}

ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData, bufferCreator);
ArrayData[] children = GetChildren(version, ref recordBatchEnumerator, field, bodyData, bufferCreator);

IArrowArray dictionary = null;
if (field.DataType.TypeId == ArrowTypeId.Dictionary)
Expand All @@ -292,6 +301,7 @@ private ArrayData LoadPrimitiveField(
}

private ArrayData LoadVariableField(
MetadataVersion version,
ref RecordBatchEnumerator recordBatchEnumerator,
Field field,
in Flatbuf.FieldNode fieldNode,
Expand Down Expand Up @@ -326,7 +336,7 @@ private ArrayData LoadVariableField(
}

ArrowBuffer[] arrowBuff = new[] { nullArrowBuffer, offsetArrowBuffer, valueArrowBuffer };
ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData, bufferCreator);
ArrayData[] children = GetChildren(version, ref recordBatchEnumerator, field, bodyData, bufferCreator);

IArrowArray dictionary = null;
if (field.DataType.TypeId == ArrowTypeId.Dictionary)
Expand All @@ -339,6 +349,7 @@ private ArrayData LoadVariableField(
}

private ArrayData[] GetChildren(
MetadataVersion version,
ref RecordBatchEnumerator recordBatchEnumerator,
Field field,
ByteBuffer bodyData,
Expand All @@ -355,8 +366,8 @@ private ArrayData[] GetChildren(

Field childField = type.Fields[index];
ArrayData child = childField.DataType.IsFixedPrimitive()
? LoadPrimitiveField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData, bufferCreator)
: LoadVariableField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData, bufferCreator);
? LoadPrimitiveField(version, ref recordBatchEnumerator, childField, in childFieldNode, bodyData, bufferCreator)
: LoadVariableField(version, ref recordBatchEnumerator, childField, in childFieldNode, bodyData, bufferCreator);

children[index] = child;
}
Expand Down

0 comments on commit 143a469

Please sign in to comment.