From b823ccdcf1ab2814c5d9f2c3532d54992088efca Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Fri, 25 Oct 2024 11:04:00 -0700 Subject: [PATCH] feat: Add arrow Schema as supported type (#6285) This is in support of #6023, which needs a way to encode Schema as VARBINARY. This also serves as the potential hook points needed to implement something like #58. --- ...faultChunkInputStreamGeneratorFactory.java | 9 +- .../chunk/DefaultChunkReadingFactory.java | 168 ++++++++++++++---- .../extensions/barrage/util/ArrowIpcUtil.java | 31 ++++ .../extensions/barrage/util/BarrageUtil.java | 6 +- .../barrage/util/ArrowIpcUtilTest.java | 44 +++++ 5 files changed, 218 insertions(+), 40 deletions(-) create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowIpcUtil.java create mode 100644 extensions/barrage/src/test/java/io/deephaven/extensions/barrage/util/ArrowIpcUtilTest.java diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkInputStreamGeneratorFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkInputStreamGeneratorFactory.java index 8255b870fc1..2d27195a4b5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkInputStreamGeneratorFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkInputStreamGeneratorFactory.java @@ -10,9 +10,11 @@ import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; +import io.deephaven.extensions.barrage.util.ArrowIpcUtil; import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; import io.deephaven.vector.Vector; +import org.apache.arrow.vector.types.pojo.Schema; import java.math.BigDecimal; import java.math.BigInteger; @@ -167,8 +169,13 @@ public ChunkInputStreamGenerator makeInputStreamGenerator(ChunkType chunkTyp return nanoOfDay; }); } + // TODO (core#58): add custom barrage serialization/deserialization support + // Migrate Schema to custom format when available. + if (type == Schema.class) { + return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset, + ArrowIpcUtil::serialize); + } // TODO (core#936): support column conversion modes - return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset, (out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8))); default: diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java index dc1a7895aea..18b96bbc9a4 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java @@ -4,13 +4,19 @@ package io.deephaven.extensions.barrage.chunk; import com.google.common.base.Charsets; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; import io.deephaven.extensions.barrage.ColumnConversionMode; +import io.deephaven.extensions.barrage.util.ArrowIpcUtil; import io.deephaven.extensions.barrage.util.StreamReaderOptions; import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.type.TypeUtils; import io.deephaven.vector.Vector; +import org.apache.arrow.vector.types.pojo.Schema; +import java.io.DataInput; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; @@ -18,6 +24,8 @@ import java.time.LocalTime; import java.time.ZonedDateTime; import java.util.Arrays; +import java.util.Iterator; +import java.util.PrimitiveIterator; import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY; @@ -60,13 +68,7 @@ public ChunkReader getReader(StreamReaderOptions options, int factor, case Object: if (typeInfo.type().isArray()) { if (typeInfo.componentType() == byte.class) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - (buf, off, len) -> Arrays.copyOfRange(buf, off, off + len), - outChunk, outOffset, totalRows); + return ByteArrayChunkReader.BYTEARRAY_READER; } else { return new VarListChunkReader<>(options, typeInfo, this); } @@ -75,30 +77,10 @@ public ChunkReader getReader(StreamReaderOptions options, int factor, return new VectorChunkReader(options, typeInfo, this); } if (typeInfo.type() == BigInteger.class) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - BigInteger::new, - outChunk, outOffset, totalRows); + return BigIntegerChunkReader.BIG_INTEGER_CHUNK_READER; } if (typeInfo.type() == BigDecimal.class) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - (final byte[] buf, final int offset, final int length) -> { - // read the int scale value as little endian, arrow's endianness. - final byte b1 = buf[offset]; - final byte b2 = buf[offset + 1]; - final byte b3 = buf[offset + 2]; - final byte b4 = buf[offset + 3]; - final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF); - return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale); - }, - outChunk, outOffset, totalRows); + return BigDecimalChunkReader.BIG_DECIMAL_CHUNK_READER; } if (typeInfo.type() == Instant.class) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, @@ -184,19 +166,131 @@ public ChunkReader getReader(StreamReaderOptions options, int factor, return new LongChunkReader(options).transform( value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value)); } - if (typeInfo.type() == String.class || - options.columnConversionMode().equals(ColumnConversionMode.Stringify)) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, - fieldNodeIter, - bufferInfoIter, - (buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset, - totalRows); + if (typeInfo.type() == String.class) { + return StringChunkReader.STRING_CHUNK_READER; + } + // TODO (core#58): add custom barrage serialization/deserialization support + // // Migrate Schema to custom format when available. + if (typeInfo.type() == Schema.class) { + return SchemaChunkReader.SCHEMA_CHUNK_READER; } + // Note: this Stringify check should come last + if (options.columnConversionMode().equals(ColumnConversionMode.Stringify)) { + return StringChunkReader.STRING_CHUNK_READER; + } + // TODO (core#936): support column conversion modes throw new UnsupportedOperationException( "Do not yet support column conversion mode: " + options.columnConversionMode()); default: throw new UnsupportedOperationException(); } } + + private enum ByteArrayChunkReader implements ChunkReader { + BYTEARRAY_READER; + + @Override + public WritableChunk readChunk(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + ByteArrayChunkReader::readBytes, + outChunk, + outOffset, + totalRows); + } + + private static byte[] readBytes(byte[] buf, int off, int len) { + return Arrays.copyOfRange(buf, off, off + len); + } + } + + private enum BigIntegerChunkReader implements ChunkReader { + BIG_INTEGER_CHUNK_READER; + + @Override + public WritableChunk readChunk(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + BigInteger::new, + outChunk, + outOffset, + totalRows); + } + } + + private enum BigDecimalChunkReader implements ChunkReader { + BIG_DECIMAL_CHUNK_READER; + + @Override + public WritableChunk readChunk(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + BigDecimalChunkReader::readBigDecimal, + outChunk, + outOffset, + totalRows); + } + + private static BigDecimal readBigDecimal(byte[] buf, int offset, int length) { + // read the int scale value as little endian, arrow's endianness. + final byte b1 = buf[offset]; + final byte b2 = buf[offset + 1]; + final byte b3 = buf[offset + 2]; + final byte b4 = buf[offset + 3]; + final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF); + return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale); + } + } + + private enum StringChunkReader implements ChunkReader { + STRING_CHUNK_READER; + + @Override + public WritableChunk readChunk(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + StringChunkReader::readString, + outChunk, + outOffset, + totalRows); + } + + private static String readString(byte[] buf, int off, int len) { + return new String(buf, off, len, Charsets.UTF_8); + } + } + + private enum SchemaChunkReader implements ChunkReader { + SCHEMA_CHUNK_READER; + + @Override + public WritableChunk readChunk(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + ArrowIpcUtil::deserialize, + outChunk, + outOffset, + totalRows); + } + } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowIpcUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowIpcUtil.java new file mode 100644 index 00000000000..b85b672e8d4 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowIpcUtil.java @@ -0,0 +1,31 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.util; + +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; + +public class ArrowIpcUtil { + public static long serialize(OutputStream outputStream, Schema schema) throws IOException { + // not buffered. no flushing needed. not closing write channel + return MessageSerializer.serialize(new WriteChannel(Channels.newChannel(outputStream)), schema); + } + + public static Schema deserialize(InputStream in) throws IOException { + // not buffered. not closing read channel + return MessageSerializer.deserializeSchema(new ReadChannel(Channels.newChannel(in))); + } + + public static Schema deserialize(byte[] buf, int offset, int length) throws IOException { + return deserialize(new ByteArrayInputStream(buf, offset, length)); + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index b11cc5f2a08..8c5abd669ee 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -187,7 +187,8 @@ private static Optional extractFlatBufferVersion(String method) { Instant.class, Boolean.class, LocalDate.class, - LocalTime.class)); + LocalTime.class, + Schema.class)); public static ByteString schemaBytesFromTable(@NotNull final Table table) { return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes(), table.isFlat()); @@ -745,7 +746,8 @@ private static ArrowType arrowTypeFor(Class type) { return Types.MinorType.TIMENANO.getType(); } if (type == BigDecimal.class - || type == BigInteger.class) { + || type == BigInteger.class + || type == Schema.class) { return Types.MinorType.VARBINARY.getType(); } if (type == Instant.class || type == ZonedDateTime.class) { diff --git a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/util/ArrowIpcUtilTest.java b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/util/ArrowIpcUtilTest.java new file mode 100644 index 00000000000..8c8781ab20a --- /dev/null +++ b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/util/ArrowIpcUtilTest.java @@ -0,0 +1,44 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.util; + +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ArrowIpcUtilTest { + + public static final Field FOO = new Field("Foo", FieldType.nullable(Types.MinorType.INT.getType()), null); + public static final Field BAR = new Field("Bar", FieldType.notNullable(Types.MinorType.INT.getType()), null); + public static final Field BAZ = new Field("Baz", + new FieldType(true, Types.MinorType.VARCHAR.getType(), null, Map.of("k1", "v1", "k2", "v2")), null); + + private static final Schema SCHEMA_1 = new Schema(List.of(FOO, BAR, BAZ)); + private static final Schema SCHEMA_2 = + new Schema(List.of(FOO, BAR, BAZ), Map.of("key1", "value1", "key2", "value2")); + + @Test + public void testSchemas() throws IOException { + verifySerDeser(SCHEMA_1); + verifySerDeser(SCHEMA_2); + } + + // A bit circular, but better than nothing. + public static void verifySerDeser(Schema schema) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final long length = ArrowIpcUtil.serialize(baos, schema); + assertThat(length).isEqualTo(baos.size()); + Schema deserialized = ArrowIpcUtil.deserialize(baos.toByteArray(), 0, (int) length); + assertThat(deserialized).isEqualTo(schema); + } +}