Skip to content

Commit

Permalink
[Dataflow Streaming] Optimize WindmillStateUtil.encodeKey by appendin…
Browse files Browse the repository at this point in the history
…g directly to ByteStringOutputStream (#33735)

OutputStreamWriter is thread safe and has synchronization overhead.
  • Loading branch information
arunpandianp authored Jan 29, 2025
1 parent b5884e8 commit e688295
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,27 @@
package org.apache.beam.runners.dataflow.worker.windmill.state;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;

class WindmillStateUtil {

/** Encodes the given namespace and address as {@code <namespace>+<address>}. */
@VisibleForTesting
static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
try {
// Use ByteStringOutputStream rather than concatenation and String.format. We build these keys
// a lot, and this leads to better performance results. See associated benchmarks.
ByteStringOutputStream stream = new ByteStringOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);

// stringKey starts and ends with a slash. We separate it from the
// StateTag ID by a '+' (which is guaranteed not to be in the stringKey) because the
// ID comes from the user.
namespace.appendTo(writer);
writer.write('+');
address.appendTo(writer);
writer.flush();
namespace.appendTo(stream);
stream.append('+');
address.appendTo(stream);
return stream.toByteString();
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.util;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.UnsafeByteOperations;
Expand All @@ -32,7 +35,7 @@
* manner. This differs from {@link ByteString.Output} which synchronizes its writes.
*/
@NotThreadSafe
public final class ByteStringOutputStream extends OutputStream {
public final class ByteStringOutputStream extends OutputStream implements Appendable {

// This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
// isn't public to prevent copying the bytes again when concatenating ByteStrings instead
Expand Down Expand Up @@ -203,6 +206,28 @@ public int size() {
return result.size() + bufferPos;
}

@Override
public Appendable append(@Nullable CharSequence csq) throws IOException {
write(Preconditions.checkNotNull(csq).toString().getBytes(StandardCharsets.UTF_8));
return this;
}

@Override
public Appendable append(@Nullable CharSequence csq, int start, int end) throws IOException {
write(
Preconditions.checkNotNull(csq)
.subSequence(start, end)
.toString()
.getBytes(StandardCharsets.UTF_8));
return this;
}

@Override
public Appendable append(char c) throws IOException {
write(String.valueOf(c).getBytes(StandardCharsets.UTF_8));
return this;
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.UnsafeByteOperations;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -158,6 +162,67 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception {
}
}

@Test
public void appendEquivalentToOutputStreamWriter() throws IOException {
String randomString = "⣏⓫⦎⊺ⱄ\u243B♼⢓␜\u2065✝⡳oⶤⱲ⨻1⅒ↀ◅⡪⋲";
ByteString byteString1, byteString2;
{
ByteStringOutputStream stream = new ByteStringOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);
writer.append(randomString);
writer.flush();
byteString1 = stream.toByteString();
}

{
ByteStringOutputStream stream = new ByteStringOutputStream();
stream.append(randomString);
byteString2 = stream.toByteString();
}
assertEquals(byteString1, byteString2);
}

@Test
public void appendEquivalentToOutputStreamWriterSubstr() throws IOException {
String randomString = "⣏⓫⦎⊺ⱄ\u243B♼⢓␜\u2065✝⡳oⶤⱲ⨻1⅒ↀ◅⡪⋲";
ByteString byteString1, byteString2;
{
ByteStringOutputStream stream = new ByteStringOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);
writer.append(randomString, 3, 10);
writer.flush();
byteString1 = stream.toByteString();
}

{
ByteStringOutputStream stream = new ByteStringOutputStream();
stream.append(randomString, 3, 10);
byteString2 = stream.toByteString();
}
assertEquals(byteString1, byteString2);
}

@Test
public void appendEquivalentToOutputStreamWriterChar() throws IOException {
for (char c = 0; c <= 255; ++c) {
ByteString byteString1, byteString2;
{
ByteStringOutputStream stream = new ByteStringOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);
writer.append(c);
writer.flush();
byteString1 = stream.toByteString();
}

{
ByteStringOutputStream stream = new ByteStringOutputStream();
stream.append(c);
byteString2 = stream.toByteString();
}
assertEquals(byteString1, byteString2);
}
}

// Grow the elements based upon an approximation of the fibonacci sequence.
private static int next(int current) {
double a = Math.max(1, current * (1 + Math.sqrt(5)) / 2.0);
Expand Down

0 comments on commit e688295

Please sign in to comment.