Skip to content

Commit

Permalink
Improve parquet column adapter merge
Browse files Browse the repository at this point in the history
Avoids allocating intermediate nested arrays and implements primitive
buffer concatenation directly. Also ensures that the ByteColumnAdapter
checks for overflow on merged buffers, since guava's Bytes.concat did
not previously check for that.
  • Loading branch information
pettyjamesm committed Sep 11, 2024
1 parent ef11615 commit e21984d
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package io.trino.parquet.reader.flat;

import com.google.common.primitives.Bytes;
import io.trino.spi.block.Block;
import io.trino.spi.block.ByteArrayBlock;

import java.util.List;
import java.util.Optional;

import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.Math.toIntExact;

public class ByteColumnAdapter
implements ColumnAdapter<byte[]>
Expand Down Expand Up @@ -68,6 +68,16 @@ public long getSizeInBytes(byte[] values)
@Override
public byte[] merge(List<byte[]> buffers)
{
return Bytes.concat(buffers.toArray(byte[][]::new));
long resultSize = 0;
for (byte[] buffer : buffers) {
resultSize += buffer.length;
}
byte[] result = new byte[toIntExact(resultSize)];
int offset = 0;
for (byte[] buffer : buffers) {
System.arraycopy(buffer, 0, result, offset, buffer.length);
offset += buffer.length;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.parquet.reader.flat;

import com.google.common.primitives.Ints;
import io.trino.spi.block.Block;
import io.trino.spi.block.Fixed12Block;

Expand Down Expand Up @@ -74,6 +73,6 @@ public long getSizeInBytes(int[] values)
@Override
public int[] merge(List<int[]> buffers)
{
return Ints.concat(buffers.toArray(int[][]::new));
return IntColumnAdapter.concatIntArrays(buffers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.parquet.reader.flat;

import com.google.common.primitives.Longs;
import io.trino.spi.block.Block;
import io.trino.spi.block.Int128ArrayBlock;

Expand Down Expand Up @@ -72,6 +71,6 @@ public long getSizeInBytes(long[] values)
@Override
public long[] merge(List<long[]> buffers)
{
return Longs.concat(buffers.toArray(long[][]::new));
return LongColumnAdapter.concatLongArrays(buffers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package io.trino.parquet.reader.flat;

import com.google.common.primitives.Ints;
import io.trino.spi.block.Block;
import io.trino.spi.block.IntArrayBlock;

import java.util.List;
import java.util.Optional;

import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.Math.toIntExact;

public class IntColumnAdapter
implements ColumnAdapter<int[]>
Expand Down Expand Up @@ -68,6 +68,21 @@ public long getSizeInBytes(int[] values)
@Override
public int[] merge(List<int[]> buffers)
{
return Ints.concat(buffers.toArray(int[][]::new));
return concatIntArrays(buffers);
}

static int[] concatIntArrays(List<int[]> buffers)
{
long resultSize = 0;
for (int[] buffer : buffers) {
resultSize += buffer.length;
}
int[] result = new int[toIntExact(resultSize)];
int offset = 0;
for (int[] buffer : buffers) {
System.arraycopy(buffer, 0, result, offset, buffer.length);
offset += buffer.length;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package io.trino.parquet.reader.flat;

import com.google.common.primitives.Longs;
import io.trino.spi.block.Block;
import io.trino.spi.block.LongArrayBlock;

import java.util.List;
import java.util.Optional;

import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.Math.toIntExact;

public class LongColumnAdapter
implements ColumnAdapter<long[]>
Expand Down Expand Up @@ -68,6 +68,21 @@ public long getSizeInBytes(long[] values)
@Override
public long[] merge(List<long[]> buffers)
{
return Longs.concat(buffers.toArray(long[][]::new));
return concatLongArrays(buffers);
}

static long[] concatLongArrays(List<long[]> buffers)
{
long resultSize = 0;
for (long[] buffer : buffers) {
resultSize += buffer.length;
}
long[] result = new long[toIntExact(resultSize)];
int offset = 0;
for (long[] buffer : buffers) {
System.arraycopy(buffer, 0, result, offset, buffer.length);
offset += buffer.length;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
*/
package io.trino.parquet.reader.flat;

import com.google.common.primitives.Shorts;
import io.trino.spi.block.Block;
import io.trino.spi.block.ShortArrayBlock;

import java.util.List;
import java.util.Optional;

import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.Math.toIntExact;

public class ShortColumnAdapter
implements ColumnAdapter<short[]>
Expand Down Expand Up @@ -68,6 +68,16 @@ public long getSizeInBytes(short[] values)
@Override
public short[] merge(List<short[]> buffers)
{
return Shorts.concat(buffers.toArray(short[][]::new));
long resultSize = 0;
for (short[] buffer : buffers) {
resultSize += buffer.length;
}
short[] result = new short[toIntExact(resultSize)];
int offset = 0;
for (short[] buffer : buffers) {
System.arraycopy(buffer, 0, result, offset, buffer.length);
offset += buffer.length;
}
return result;
}
}

0 comments on commit e21984d

Please sign in to comment.