diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java index c0bd1a1cbe1c..9ce2d687b3fe 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.dataflow.worker.windmill.state; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.sdk.util.ByteStringOutputStream; @@ -27,6 +25,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; class WindmillStateUtil { + /** Encodes the given namespace and address as {@code <namespace>+<address>}. */ @VisibleForTesting static ByteString encodeKey(StateNamespace namespace, StateTag address) { @@ -34,15 +33,12 @@ static ByteString encodeKey(StateNamespace namespace, StateTag address) { // Use ByteStringOutputStream rather than concatenation and String.format. We build these keys // a lot, and this leads to better performance results. See associated benchmarks. ByteStringOutputStream stream = new ByteStringOutputStream(); - OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8); - // stringKey starts and ends with a slash. We separate it from the // StateTag ID by a '+' (which is guaranteed not to be in the stringKey) because the // ID comes from the user. - namespace.appendTo(writer); - writer.write('+'); - address.appendTo(writer); - writer.flush(); + namespace.appendTo(stream); + stream.append('+'); + address.appendTo(stream); return stream.toByteString(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java index a1f8c4f1a8a7..76a6b18890ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.util; +import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.UnsafeByteOperations; @@ -32,7 +35,7 @@ * manner. This differs from {@link ByteString.Output} which synchronizes its writes. */ @NotThreadSafe -public final class ByteStringOutputStream extends OutputStream { +public final class ByteStringOutputStream extends OutputStream implements Appendable { // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which // isn't public to prevent copying the bytes again when concatenating ByteStrings instead @@ -203,6 +206,28 @@ public int size() { return result.size() + bufferPos; } + @Override + public Appendable append(@Nullable CharSequence csq) throws IOException { + write(Preconditions.checkNotNull(csq).toString().getBytes(StandardCharsets.UTF_8)); + return this; + } + + @Override + public Appendable append(@Nullable CharSequence csq, int start, int end) throws IOException { + write( + Preconditions.checkNotNull(csq) + .subSequence(start, end) + .toString() + .getBytes(StandardCharsets.UTF_8)); + return this; + } + + @Override + public Appendable append(char c) throws IOException { + write(String.valueOf(c).getBytes(StandardCharsets.UTF_8)); + return this; + } + @Override public String toString() { return String.format( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java index a2b8feec5d40..37ce6a385abb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java @@ -22,6 +22,10 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.UnsafeByteOperations; import org.junit.Test; import org.junit.runner.RunWith; @@ -158,6 +162,67 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception { } } + @Test + public void appendEquivalentToOutputStreamWriter() throws IOException { + String randomString = "⣏⓫⦎⊺ⱄ\u243B♼⢓␜\u2065✝⡳oⶤⱲ⨻1⅒ↀ◅⡪⋲"; + ByteString byteString1, byteString2; + { + ByteStringOutputStream stream = new ByteStringOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8); + writer.append(randomString); + writer.flush(); + byteString1 = stream.toByteString(); + } + + { + ByteStringOutputStream stream = new ByteStringOutputStream(); + stream.append(randomString); + byteString2 = stream.toByteString(); + } + assertEquals(byteString1, byteString2); + } + + @Test + public void appendEquivalentToOutputStreamWriterSubstr() throws IOException { + String randomString = "⣏⓫⦎⊺ⱄ\u243B♼⢓␜\u2065✝⡳oⶤⱲ⨻1⅒ↀ◅⡪⋲"; + ByteString byteString1, byteString2; + { + ByteStringOutputStream stream = new ByteStringOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8); + writer.append(randomString, 3, 10); + writer.flush(); + byteString1 = stream.toByteString(); + } + + { + ByteStringOutputStream stream = new ByteStringOutputStream(); + stream.append(randomString, 3, 10); + byteString2 = stream.toByteString(); + } + assertEquals(byteString1, byteString2); + } + + @Test + public void appendEquivalentToOutputStreamWriterChar() throws IOException { + for (char c = 0; c <= 255; ++c) { + ByteString byteString1, byteString2; + { + ByteStringOutputStream stream = new ByteStringOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8); + writer.append(c); + writer.flush(); + byteString1 = stream.toByteString(); + } + + { + ByteStringOutputStream stream = new ByteStringOutputStream(); + stream.append(c); + byteString2 = stream.toByteString(); + } + assertEquals(byteString1, byteString2); + } + } + // Grow the elements based upon an approximation of the fibonacci sequence. private static int next(int current) { double a = Math.max(1, current * (1 + Math.sqrt(5)) / 2.0);