From 2e1bfce5a6c6aa6942fc60d2632957bd4c814b45 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 26 Feb 2025 16:41:40 +0100 Subject: [PATCH 1/8] Remove unused method --- .../server/protocol/spooling/SpoolingQueryDataProducer.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingQueryDataProducer.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingQueryDataProducer.java index 7b90575c5905..bb6a8731877d 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingQueryDataProducer.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingQueryDataProducer.java @@ -120,9 +120,4 @@ private boolean hasSpoolingMetadata(Page page, int outputColumnsSize) { return page.getChannelCount() == outputColumnsSize + 1 && page.getPositionCount() == 1 && !page.getBlock(outputColumnsSize).isNull(0); } - - public static QueryDataProducer createSpooledQueryDataProducer(QueryDataEncoder.Factory encoder) - { - return new SpoolingQueryDataProducer(encoder); - } } From 65403c9af8530c6a4003cdf1e4c57cb0823c9718 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 26 Feb 2025 21:13:29 +0100 Subject: [PATCH 2/8] Switch JsonResponse back to InputStream --- .../java/io/trino/client/JsonResponse.java | 12 +++--- ...der.java => MaterializingInputStream.java} | 40 +++++++++++-------- ...java => TestMaterializingInputStream.java} | 17 ++++---- 3 files changed, 38 insertions(+), 31 deletions(-) rename client/trino-client/src/main/java/io/trino/client/{MaterializingReader.java => MaterializingInputStream.java} (58%) rename client/trino-client/src/test/java/io/trino/client/{TestMaterializingReader.java => TestMaterializingInputStream.java} (73%) diff --git a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java index f9577eb3e04f..6d530f04fcfb 100644 --- a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java +++ b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java @@ -23,7 +23,7 @@ import okhttp3.ResponseBody; import java.io.IOException; -import java.io.Reader; +import java.io.InputStream; import java.io.UncheckedIOException; import java.util.Optional; @@ -115,16 +115,16 @@ public static JsonResponse execute(TrinoJsonCodec codec, Call.Factory if (isJson(responseBody.contentType())) { T value = null; IllegalArgumentException exception = null; - MaterializingReader reader = new MaterializingReader(responseBody.charStream(), 128 * 1024); - try (Reader ignored = reader) { + MaterializingInputStream stream = new MaterializingInputStream(responseBody.byteStream(), 128 * 1024); + try (InputStream ignored = stream) { // Parse from input stream, response is either of unknown size or too large to materialize. Raw response body // will not be available if parsing fails - value = codec.fromJson(reader); + value = codec.fromJson(stream); } catch (JsonProcessingException e) { - exception = new IllegalArgumentException(format("Unable to create %s from JSON response:\n[%s]", codec.getType(), reader.getHeadString()), e); + exception = new IllegalArgumentException(format("Unable to create %s from JSON response:\n[%s]", codec.getType(), stream.getHeadString()), e); } - return new JsonResponse<>(response.code(), response.headers(), reader.getHeadString(), value, exception); + return new JsonResponse<>(response.code(), response.headers(), stream.getHeadString(), value, exception); } return new JsonResponse<>(response.code(), response.headers(), responseBody.string()); } diff --git a/client/trino-client/src/main/java/io/trino/client/MaterializingReader.java b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java similarity index 58% rename from client/trino-client/src/main/java/io/trino/client/MaterializingReader.java rename to client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java index aca4ceb02a7a..a6a9046d8050 100644 --- a/client/trino-client/src/main/java/io/trino/client/MaterializingReader.java +++ b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java @@ -13,25 +13,26 @@ */ package io.trino.client; -import java.io.FilterReader; +import java.io.FilterInputStream; import java.io.IOException; -import java.io.Reader; +import java.io.InputStream; import static com.google.common.base.Verify.verify; import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; -class MaterializingReader - extends FilterReader +class MaterializingInputStream + extends FilterInputStream { - private final char[] headChars; + private final byte[] head; private int remaining; private int currentOffset; - protected MaterializingReader(Reader reader, int maxHeadChars) + protected MaterializingInputStream(InputStream stream, int maxBytes) { - super(reader); - verify(maxHeadChars > 0 && maxHeadChars <= 128 * 1024, "maxHeadChars must be between 1 and 128 KB"); - this.headChars = new char[maxHeadChars]; + super(stream); + verify(maxBytes > 0 && maxBytes <= 128 * 1024, "maxBytes must be between 1 and 128 KB"); + this.head = new byte[maxBytes]; } @Override @@ -40,8 +41,8 @@ public int read() { int value = super.read(); if (value != -1) { - if (currentOffset < headChars.length) { - headChars[currentOffset++] = (char) value; + if (currentOffset < head.length) { + head[currentOffset++] = (byte) value; } else { remaining++; @@ -51,26 +52,33 @@ public int read() } @Override - public int read(char[] cbuf, int off, int len) + public int read(byte[] buffer, int off, int len) throws IOException { - int read = super.read(cbuf, off, len); + int read = super.read(buffer, off, len); if (read > 0) { - int copyLength = Math.min(read, headChars.length - currentOffset); + int copyLength = Math.min(read, head.length - currentOffset); if (read > copyLength) { remaining += read - copyLength; } if (copyLength > 0) { - System.arraycopy(cbuf, off, headChars, currentOffset, copyLength); + System.arraycopy(buffer, off, head, currentOffset, copyLength); currentOffset += copyLength; } } return read; } + @Override + public int read(byte[] buffer) + throws IOException + { + return read(buffer, 0, buffer.length); + } + public String getHeadString() { - return String.valueOf(headChars, 0, currentOffset) + (remaining > 0 ? format("... [" + bytesOmitted(remaining) + "]", remaining) : ""); + return new String(head, 0, currentOffset, UTF_8) + (remaining > 0 ? format("... [" + bytesOmitted(remaining) + "]", remaining) : ""); } private String bytesOmitted(long bytes) diff --git a/client/trino-client/src/test/java/io/trino/client/TestMaterializingReader.java b/client/trino-client/src/test/java/io/trino/client/TestMaterializingInputStream.java similarity index 73% rename from client/trino-client/src/test/java/io/trino/client/TestMaterializingReader.java rename to client/trino-client/src/test/java/io/trino/client/TestMaterializingInputStream.java index 777bd81ccd05..51e2ab97e9db 100644 --- a/client/trino-client/src/test/java/io/trino/client/TestMaterializingReader.java +++ b/client/trino-client/src/test/java/io/trino/client/TestMaterializingInputStream.java @@ -16,26 +16,25 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.StringWriter; import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; -class TestMaterializingReader +class TestMaterializingInputStream { @Test void testHeadBufferOverflow() throws IOException { InputStream stream = new ByteArrayInputStream("abcd".repeat(1337).getBytes(UTF_8)); - MaterializingReader reader = new MaterializingReader(new InputStreamReader(stream, UTF_8), 4); + MaterializingInputStream reader = new MaterializingInputStream(stream, 4); int remainingBytes = 4 * 1337 - 4; - reader.transferTo(new StringWriter()); // Trigger reading + reader.transferTo(new ByteArrayOutputStream()); // Trigger reading assertThat(reader.getHeadString()) .isEqualTo("abcd... [" + remainingBytes + " more bytes]"); } @@ -45,9 +44,9 @@ void testHeadBufferNotFullyUsed() throws IOException { InputStream stream = new ByteArrayInputStream("abcdabc".getBytes(UTF_8)); - MaterializingReader reader = new MaterializingReader(new InputStreamReader(stream, UTF_8), 8); + MaterializingInputStream reader = new MaterializingInputStream(stream, 8); - reader.transferTo(new StringWriter()); // Trigger reading + reader.transferTo(new ByteArrayOutputStream()); // Trigger reading assertThat(reader.getHeadString()).isEqualTo("abcdabc"); } @@ -56,9 +55,9 @@ void testHeadBufferFullyUsed() throws IOException { InputStream stream = new ByteArrayInputStream("a".repeat(8).getBytes(UTF_8)); - MaterializingReader reader = new MaterializingReader(new InputStreamReader(stream, UTF_8), 8); + MaterializingInputStream reader = new MaterializingInputStream(stream, 8); - reader.transferTo(new StringWriter()); // Trigger reading + reader.transferTo(new ByteArrayOutputStream()); // Trigger reading assertThat(reader.getHeadString()).isEqualTo("a".repeat(8)); } } From c363d8fe5fb4b98c01fd48f77cf33122c36800ab Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 26 Feb 2025 21:17:16 +0100 Subject: [PATCH 3/8] Decrease materialized JSON to 8K It's only used to capture error message when server fails to process the request. Thus it doesn't make sense to keep longer buffer as it won't improve the situation after all. --- .../src/main/java/io/trino/client/JsonResponse.java | 6 +++--- .../main/java/io/trino/client/MaterializingInputStream.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java index 6d530f04fcfb..cb8a061ed894 100644 --- a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java +++ b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java @@ -115,10 +115,10 @@ public static JsonResponse execute(TrinoJsonCodec codec, Call.Factory if (isJson(responseBody.contentType())) { T value = null; IllegalArgumentException exception = null; - MaterializingInputStream stream = new MaterializingInputStream(responseBody.byteStream(), 128 * 1024); + MaterializingInputStream stream = new MaterializingInputStream(responseBody.byteStream(), 8 * 1024); try (InputStream ignored = stream) { - // Parse from input stream, response is either of unknown size or too large to materialize. Raw response body - // will not be available if parsing fails + // Parse from input stream, response is either of unknown size or too large to materialize. + // 8K of the response body will be available if parsing fails. value = codec.fromJson(stream); } catch (JsonProcessingException e) { diff --git a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java index a6a9046d8050..c1b8ec13e551 100644 --- a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java +++ b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java @@ -31,7 +31,7 @@ class MaterializingInputStream protected MaterializingInputStream(InputStream stream, int maxBytes) { super(stream); - verify(maxBytes > 0 && maxBytes <= 128 * 1024, "maxBytes must be between 1 and 128 KB"); + verify(maxBytes > 0 && maxBytes <= 8 * 1024, "maxBytes must be between 1B and 8 KB"); this.head = new byte[maxBytes]; } From c2659284e1f0b07a47b986d471ca37e98eff11aa Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 26 Feb 2025 21:22:35 +0100 Subject: [PATCH 4/8] Clean up buffer when MaterializedInputStream is closed --- .../java/io/trino/client/JsonResponse.java | 19 ++++++++++++------- .../client/MaterializingInputStream.java | 13 ++++++++++++- .../java/io/trino/client/TrinoJsonCodec.java | 1 + 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java index cb8a061ed894..cb913b818504 100644 --- a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java +++ b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java @@ -23,7 +23,6 @@ import okhttp3.ResponseBody; import java.io.IOException; -import java.io.InputStream; import java.io.UncheckedIOException; import java.util.Optional; @@ -113,18 +112,24 @@ public static JsonResponse execute(TrinoJsonCodec codec, Call.Factory try (Response response = client.newCall(request).execute()) { ResponseBody responseBody = requireNonNull(response.body()); if (isJson(responseBody.contentType())) { - T value = null; - IllegalArgumentException exception = null; MaterializingInputStream stream = new MaterializingInputStream(responseBody.byteStream(), 8 * 1024); - try (InputStream ignored = stream) { + try { // Parse from input stream, response is either of unknown size or too large to materialize. // 8K of the response body will be available if parsing fails. - value = codec.fromJson(stream); + T value = codec.fromJson(stream); + return new JsonResponse<>(response.code(), response.headers(), stream.getHeadString(), value, null); } catch (JsonProcessingException e) { - exception = new IllegalArgumentException(format("Unable to create %s from JSON response:\n[%s]", codec.getType(), stream.getHeadString()), e); + return new JsonResponse<>( + response.code(), + response.headers(), + stream.getHeadString(), + null, + new IllegalArgumentException(format("Unable to create %s from JSON response:\n[%s]", codec.getType(), stream.getHeadString()), e)); + } + finally { + stream.close(); } - return new JsonResponse<>(response.code(), response.headers(), stream.getHeadString(), value, exception); } return new JsonResponse<>(response.code(), response.headers(), responseBody.string()); } diff --git a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java index c1b8ec13e551..98abcf1abbd0 100644 --- a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java +++ b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java @@ -24,7 +24,7 @@ class MaterializingInputStream extends FilterInputStream { - private final byte[] head; + private byte[] head; private int remaining; private int currentOffset; @@ -78,6 +78,9 @@ public int read(byte[] buffer) public String getHeadString() { + if (head == null) { + return ""; + } return new String(head, 0, currentOffset, UTF_8) + (remaining > 0 ? format("... [" + bytesOmitted(remaining) + "]", remaining) : ""); } @@ -88,4 +91,12 @@ private String bytesOmitted(long bytes) } return format("%d more bytes", bytes); } + + @Override + public void close() + throws IOException + { + super.close(); + head = null; + } } diff --git a/client/trino-client/src/main/java/io/trino/client/TrinoJsonCodec.java b/client/trino-client/src/main/java/io/trino/client/TrinoJsonCodec.java index a8982393a8f6..466f00beb71f 100644 --- a/client/trino-client/src/main/java/io/trino/client/TrinoJsonCodec.java +++ b/client/trino-client/src/main/java/io/trino/client/TrinoJsonCodec.java @@ -60,6 +60,7 @@ public class TrinoJsonCodec .disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS) .disable(MapperFeature.INFER_PROPERTY_MUTATORS) .disable(MapperFeature.ALLOW_FINAL_FIELDS_AS_MUTATORS) + .disable(StreamReadFeature.AUTO_CLOSE_SOURCE) .addModule(new Jdk8Module()) .addModule(new QueryDataClientJacksonModule()) .build(); From 257fc76690c45b8f89731a24ab852aa9fd0bb892 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 27 Feb 2025 12:15:39 +0100 Subject: [PATCH 5/8] Ignore charset decoding errors --- .../java/io/trino/client/JsonResponse.java | 2 +- .../client/MaterializingInputStream.java | 22 ++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java index cb913b818504..0f0ed2f0afc3 100644 --- a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java +++ b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java @@ -117,7 +117,7 @@ public static JsonResponse execute(TrinoJsonCodec codec, Call.Factory // Parse from input stream, response is either of unknown size or too large to materialize. // 8K of the response body will be available if parsing fails. T value = codec.fromJson(stream); - return new JsonResponse<>(response.code(), response.headers(), stream.getHeadString(), value, null); + return new JsonResponse<>(response.code(), response.headers(), stream.getHeadString(responseBody.contentType().charset()), value, null); } catch (JsonProcessingException e) { return new JsonResponse<>( diff --git a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java index 98abcf1abbd0..a4848415568b 100644 --- a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java +++ b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java @@ -16,9 +16,15 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Verify.verify; import static java.lang.String.format; +import static java.nio.charset.CodingErrorAction.IGNORE; import static java.nio.charset.StandardCharsets.UTF_8; class MaterializingInputStream @@ -77,11 +83,25 @@ public int read(byte[] buffer) } public String getHeadString() + { + return getHeadString(UTF_8); + } + + public String getHeadString(Charset charset) { if (head == null) { return ""; } - return new String(head, 0, currentOffset, UTF_8) + (remaining > 0 ? format("... [" + bytesOmitted(remaining) + "]", remaining) : ""); + + CharsetDecoder charsetDecoder = firstNonNull(charset, UTF_8).newDecoder() + .onMalformedInput(IGNORE) + .onUnmappableCharacter(IGNORE); + try { + return charsetDecoder.decode(ByteBuffer.wrap(head, 0, currentOffset)) + (remaining > 0 ? format("... [" + bytesOmitted(remaining) + "]", remaining) : ""); + } + catch (CharacterCodingException e) { + return format("", e.getMessage()); + } } private String bytesOmitted(long bytes) From b2357ce2d1f46e03672b088a809f623240f9d32c Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 27 Feb 2025 12:18:42 +0100 Subject: [PATCH 6/8] Increase minimum materialized buffer size to 1K --- .../src/main/java/io/trino/client/MaterializingInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java index a4848415568b..8ecdd37e7e31 100644 --- a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java +++ b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java @@ -37,7 +37,7 @@ class MaterializingInputStream protected MaterializingInputStream(InputStream stream, int maxBytes) { super(stream); - verify(maxBytes > 0 && maxBytes <= 8 * 1024, "maxBytes must be between 1B and 8 KB"); + verify(maxBytes > 1024 && maxBytes <= 8 * 1024, "maxBytes must be between 1 KB and 8 KB"); this.head = new byte[maxBytes]; } From f71659f07431c7973e9894437d9eebe3400a9003 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 27 Feb 2025 12:19:59 +0100 Subject: [PATCH 7/8] Static import min --- .../main/java/io/trino/client/MaterializingInputStream.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java index 8ecdd37e7e31..a21a83ccbe13 100644 --- a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java +++ b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java @@ -23,6 +23,7 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Verify.verify; +import static java.lang.Math.min; import static java.lang.String.format; import static java.nio.charset.CodingErrorAction.IGNORE; import static java.nio.charset.StandardCharsets.UTF_8; @@ -63,7 +64,7 @@ public int read(byte[] buffer, int off, int len) { int read = super.read(buffer, off, len); if (read > 0) { - int copyLength = Math.min(read, head.length - currentOffset); + int copyLength = min(read, head.length - currentOffset); if (read > copyLength) { remaining += read - copyLength; } From a924d59010554b28af567b132b99e9f178487c57 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 27 Feb 2025 12:25:28 +0100 Subject: [PATCH 8/8] Use DataSize for materialized buffer size --- .../src/main/java/io/trino/client/JsonResponse.java | 6 +++++- .../java/io/trino/client/MaterializingInputStream.java | 9 +++++---- .../io/trino/client/TestMaterializingInputStream.java | 7 ++++--- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java index 0f0ed2f0afc3..7ef5bd885b86 100644 --- a/client/trino-client/src/main/java/io/trino/client/JsonResponse.java +++ b/client/trino-client/src/main/java/io/trino/client/JsonResponse.java @@ -14,6 +14,7 @@ package io.trino.client; import com.fasterxml.jackson.core.JsonProcessingException; +import io.airlift.units.DataSize; import jakarta.annotation.Nullable; import okhttp3.Call; import okhttp3.Headers; @@ -27,11 +28,14 @@ import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.units.DataSize.Unit.KILOBYTE; import static java.lang.String.format; import static java.util.Objects.requireNonNull; public final class JsonResponse { + private static final DataSize MATERIALIZED_BUFFER_SIZE = DataSize.of(8, KILOBYTE); + private final int statusCode; private final Headers headers; @Nullable @@ -112,7 +116,7 @@ public static JsonResponse execute(TrinoJsonCodec codec, Call.Factory try (Response response = client.newCall(request).execute()) { ResponseBody responseBody = requireNonNull(response.body()); if (isJson(responseBody.contentType())) { - MaterializingInputStream stream = new MaterializingInputStream(responseBody.byteStream(), 8 * 1024); + MaterializingInputStream stream = new MaterializingInputStream(responseBody.byteStream(), MATERIALIZED_BUFFER_SIZE); try { // Parse from input stream, response is either of unknown size or too large to materialize. // 8K of the response body will be available if parsing fails. diff --git a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java index a21a83ccbe13..0310cea9e431 100644 --- a/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java +++ b/client/trino-client/src/main/java/io/trino/client/MaterializingInputStream.java @@ -13,6 +13,8 @@ */ package io.trino.client; +import io.airlift.units.DataSize; + import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -22,8 +24,8 @@ import java.nio.charset.CharsetDecoder; import static com.google.common.base.MoreObjects.firstNonNull; -import static com.google.common.base.Verify.verify; import static java.lang.Math.min; +import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.nio.charset.CodingErrorAction.IGNORE; import static java.nio.charset.StandardCharsets.UTF_8; @@ -35,11 +37,10 @@ class MaterializingInputStream private int remaining; private int currentOffset; - protected MaterializingInputStream(InputStream stream, int maxBytes) + protected MaterializingInputStream(InputStream stream, DataSize maxBytes) { super(stream); - verify(maxBytes > 1024 && maxBytes <= 8 * 1024, "maxBytes must be between 1 KB and 8 KB"); - this.head = new byte[maxBytes]; + this.head = new byte[toIntExact(maxBytes.toBytes())]; // caller is responsible for reasonable sizing } @Override diff --git a/client/trino-client/src/test/java/io/trino/client/TestMaterializingInputStream.java b/client/trino-client/src/test/java/io/trino/client/TestMaterializingInputStream.java index 51e2ab97e9db..74b9eef811d3 100644 --- a/client/trino-client/src/test/java/io/trino/client/TestMaterializingInputStream.java +++ b/client/trino-client/src/test/java/io/trino/client/TestMaterializingInputStream.java @@ -13,6 +13,7 @@ */ package io.trino.client; +import io.airlift.units.DataSize; import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; @@ -30,7 +31,7 @@ void testHeadBufferOverflow() throws IOException { InputStream stream = new ByteArrayInputStream("abcd".repeat(1337).getBytes(UTF_8)); - MaterializingInputStream reader = new MaterializingInputStream(stream, 4); + MaterializingInputStream reader = new MaterializingInputStream(stream, DataSize.ofBytes(4)); int remainingBytes = 4 * 1337 - 4; @@ -44,7 +45,7 @@ void testHeadBufferNotFullyUsed() throws IOException { InputStream stream = new ByteArrayInputStream("abcdabc".getBytes(UTF_8)); - MaterializingInputStream reader = new MaterializingInputStream(stream, 8); + MaterializingInputStream reader = new MaterializingInputStream(stream, DataSize.ofBytes(8)); reader.transferTo(new ByteArrayOutputStream()); // Trigger reading assertThat(reader.getHeadString()).isEqualTo("abcdabc"); @@ -55,7 +56,7 @@ void testHeadBufferFullyUsed() throws IOException { InputStream stream = new ByteArrayInputStream("a".repeat(8).getBytes(UTF_8)); - MaterializingInputStream reader = new MaterializingInputStream(stream, 8); + MaterializingInputStream reader = new MaterializingInputStream(stream, DataSize.ofBytes(8)); reader.transferTo(new ByteArrayOutputStream()); // Trigger reading assertThat(reader.getHeadString()).isEqualTo("a".repeat(8));