From bf869f9a5d79c2fe5ad58c89ea59c08126a96221 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 17 Jun 2024 13:05:02 +0000 Subject: [PATCH 1/2] PubsubMessageWithTopicCoder was never used --- .../beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java index d10b9a2f1066..768aebe54e65 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java @@ -45,8 +45,8 @@ public static Coder of(TypeDescriptor ignored) { return of(); } - public static PubsubMessageWithAttributesAndMessageIdCoder of() { - return new PubsubMessageWithAttributesAndMessageIdCoder(); + public static PubsubMessageWithTopicCoder of() { + return new PubsubMessageWithTopicCoder(); } @Override From 9f65fe727d89de81242fe0bba761017f9f432ca7 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 18 Jun 2024 08:01:05 +0000 Subject: [PATCH 2/2] refactor --- .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 2 +- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 6 +++--- .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 01848d92d928..6233cf669080 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1488,7 +1488,7 @@ public PDone expand(PCollection input) { .get(BAD_RECORD_TAG) .setCoder(BadRecord.getCoder(input.getPipeline()))); PCollection pubsubMessages = - pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(new PubsubMessageWithTopicCoder()); + pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(PubsubMessageWithTopicCoder.of()); switch (input.isBounded()) { case BOUNDED: pubsubMessages.apply( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index fe6338a501c4..3027db6aee9d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -732,7 +732,7 @@ public void testWriteMalformedMessagesWithErrorHandler() throws Exception { PCollection messages = pipeline.apply( Create.timestamped(ImmutableList.of(pubsubMsg, failingPubsubMsg)) - .withCoder(new PubsubMessageWithTopicCoder())); + .withCoder(PubsubMessageWithTopicCoder.of())); messages.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); ErrorHandler> badRecordErrorHandler = pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); @@ -882,7 +882,7 @@ public void testDynamicTopics(boolean isBounded) throws IOException { PCollection messages = pipeline.apply( - Create.timestamped(pubsubMessages).withCoder(new PubsubMessageWithTopicCoder())); + Create.timestamped(pubsubMessages).withCoder(PubsubMessageWithTopicCoder.of())); if (!isBounded) { messages = messages.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); } @@ -919,7 +919,7 @@ public void testBigMessageBounded() throws IOException { PCollection messages = pipeline.apply( Create.timestamped(ImmutableList.of(pubsubMsg)) - .withCoder(new PubsubMessageWithTopicCoder())); + .withCoder(PubsubMessageWithTopicCoder.of())); messages.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); messages.apply(PubsubIO.writeMessagesDynamic().withClientFactory(factory)); pipeline.run(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index c9b6bae45b98..be68083bb28c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -223,7 +223,7 @@ public void testDynamicTopics() throws IOException { Instant.ofEpochMilli(o.getTimestampMsSinceEpoch()))) .collect(Collectors.toList()); - p.apply(Create.timestamped(pubsubMessages).withCoder(new PubsubMessageWithTopicCoder())) + p.apply(Create.timestamped(pubsubMessages).withCoder(PubsubMessageWithTopicCoder.of())) .apply(sink); p.run(); }