From 2e765cc04c5ecdaaffcd4f52f8e7c56ee04936d7 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 7 Nov 2024 04:34:30 +0800 Subject: [PATCH] GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k (#3041) * GH-3040: DictionaryFilter.canDrop may return false positive result when dict size exceeds 8k * style * check bytesRead * import --- .../org/apache/parquet/bytes/BytesInput.java | 12 ++++++- .../bytes/AvailableAgnosticInputStream.java | 35 +++++++++++++++++++ .../apache/parquet/bytes/TestBytesInput.java | 14 ++++++++ 3 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 parquet-common/src/test/java/org/apache/parquet/bytes/AvailableAgnosticInputStream.java diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 88bb1da7cf..25ec5dc866 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -20,12 +20,14 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Arrays; import java.util.List; @@ -376,7 +378,15 @@ void writeInto(ByteBuffer buffer) { ByteBuffer workBuf = buffer.duplicate(); int pos = buffer.position(); workBuf.limit(pos + byteCount); - Channels.newChannel(in).read(workBuf); + ReadableByteChannel channel = Channels.newChannel(in); + int remaining = byteCount; + while (remaining > 0) { + int bytesRead = channel.read(workBuf); + if (bytesRead < 0) { + throw new EOFException("Reached the end of stream with " + remaining + " bytes left to read"); + } + remaining -= bytesRead; + } buffer.position(pos + byteCount); } catch (IOException e) { new RuntimeException("Exception occurred during reading input stream", e); diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/AvailableAgnosticInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/AvailableAgnosticInputStream.java new file mode 100644 index 0000000000..ca8cb38cfb --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/AvailableAgnosticInputStream.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.ByteArrayInputStream; + +public class AvailableAgnosticInputStream extends ByteArrayInputStream { + + public AvailableAgnosticInputStream(byte[] buf) { + super(buf); + } + + // In practice, there are some implementations always return 0 even if they has more data + @Override + public synchronized int available() { + return 0; + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java index d2c9e82353..6ffe3c650a 100644 --- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java @@ -140,6 +140,20 @@ public void testFromInputStream() throws IOException { validate(data, factory); } + @Test + public void testFromLargeAvailableAgnosticInputStream() throws IOException { + // allocate a bytes that large than + // java.nio.channel.Channels.ReadableByteChannelImpl.TRANSFER_SIZE = 8192 + byte[] data = new byte[9 * 1024]; + RANDOM.nextBytes(data); + byte[] input = new byte[data.length + 10]; + RANDOM.nextBytes(input); + System.arraycopy(data, 0, input, 0, data.length); + Supplier factory = () -> BytesInput.from(new AvailableAgnosticInputStream(input), 9 * 1024); + + validate(data, factory); + } + @Test public void testFromByteArrayOutputStream() throws IOException { byte[] data = new byte[1000];