diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java index 7bbbf42ec8c1d..033e6c7d7a921 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java @@ -165,6 +165,13 @@ public void testOnNext_writeValidation() { .setSinkParam(fileSinkParam)) .build(); + // Encoded StreamChunk: 1 'test' + byte[] data1 = + new byte[] { + 8, 1, 18, 1, 1, 26, 20, 8, 2, 18, 6, 8, 1, 18, 2, 1, 1, 26, 8, 8, 1, 18, 4, 0, + 0, 0, 1, 26, 42, 8, 6, 18, 6, 8, 1, 18, 2, 1, 1, 26, 20, 8, 1, 18, 16, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 26, 8, 8, 1, 18, 4, 116, 101, 115, 116 + }; ConnectorServiceProto.SinkWriterStreamRequest firstWrite = ConnectorServiceProto.SinkWriterStreamRequest.newBuilder() .setWriteBatch( @@ -175,21 +182,7 @@ public void testOnNext_writeValidation() { .setStreamChunkPayload( ConnectorServiceProto.SinkWriterStreamRequest .WriteBatch.StreamChunkPayload.newBuilder() - .setBinaryData( - ByteString.copyFrom( - new byte[] { - 8, 1, 18, 1, 1, 26, 20, - 8, 2, 18, 6, 8, 1, 18, - 2, 1, 1, 26, 8, 8, 1, - 18, 4, 0, 0, 0, 1, 26, - 42, 8, 6, 18, 6, 8, 1, - 18, 2, 1, 1, 26, 20, 8, - 1, 18, 16, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 4, 26, 8, 8, 1, - 18, 4, 116, 101, 115, - 116 - })) + .setBinaryData(ByteString.copyFrom(data1)) .build())) .build(); @@ -202,6 +195,13 @@ public void testOnNext_writeValidation() { .build()) .build(); + // Encoded StreamChunk: 2 'test' + byte[] data2 = + new byte[] { + 8, 1, 18, 1, 1, 26, 20, 8, 2, 18, 6, 8, 1, 18, 2, 1, 1, 26, 8, 8, 1, 18, 4, 0, + 0, 0, 2, 26, 42, 8, 6, 18, 6, 8, 1, 18, 2, 1, 1, 26, 20, 8, 1, 18, 16, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 26, 8, 8, 1, 18, 4, 116, 101, 115, 116 + }; ConnectorServiceProto.SinkWriterStreamRequest secondWrite = ConnectorServiceProto.SinkWriterStreamRequest.newBuilder() .setWriteBatch( @@ -212,21 +212,7 @@ public void testOnNext_writeValidation() { .setStreamChunkPayload( ConnectorServiceProto.SinkWriterStreamRequest .WriteBatch.StreamChunkPayload.newBuilder() - .setBinaryData( - ByteString.copyFrom( - new byte[] { - 8, 1, 18, 1, 1, 26, 20, - 8, 2, 18, 6, 8, 1, 18, - 2, 1, 1, 26, 8, 8, 1, - 18, 4, 0, 0, 0, 2, 26, - 42, 8, 6, 18, 6, 8, 1, - 18, 2, 1, 1, 26, 20, 8, - 1, 18, 16, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 4, 26, 8, 8, 1, - 18, 4, 116, 101, 115, - 116 - })) + .setBinaryData(ByteString.copyFrom(data2)) .build())) .build(); @@ -240,21 +226,7 @@ public void testOnNext_writeValidation() { .setStreamChunkPayload( ConnectorServiceProto.SinkWriterStreamRequest .WriteBatch.StreamChunkPayload.newBuilder() - .setBinaryData( - ByteString.copyFrom( - new byte[] { - 8, 1, 18, 1, 1, 26, 20, - 8, 2, 18, 6, 8, 1, 18, - 2, 1, 1, 26, 8, 8, 1, - 18, 4, 0, 0, 0, 2, 26, - 42, 8, 6, 18, 6, 8, 1, - 18, 2, 1, 1, 26, 20, 8, - 1, 18, 16, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 4, 26, 8, 8, 1, - 18, 4, 116, 101, 115, - 116 - })) + .setBinaryData(ByteString.copyFrom(data2)) .build())) .build(); @@ -282,7 +254,10 @@ public void testOnNext_writeValidation() { sinkWriterStreamObserver.onNext(secondWriteWrongEpoch); } catch (RuntimeException e) { exceptionThrown = true; - Assert.assertTrue(e.getMessage().toLowerCase().contains("invalid epoch")); + if (!e.getMessage().toLowerCase().contains("invalid epoch")) { + e.printStackTrace(); + Assert.fail("Expected `invalid epoch`, but got " + e.getMessage()); + } } if (!exceptionThrown) { Assert.fail(