From 4325e2b1e01623ffe23a60bbdee421354b490317 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 24 Oct 2025 10:54:20 -0400 Subject: [PATCH] Create a BatchOnly category to exclude some batch tests from Dataflow Validates Runner Streaming --- ...va_ValidatesRunner_Dataflow_Streaming.json | 7 +--- ...ValidatesRunner_Dataflow_V2_Streaming.json | 6 +-- .../google-cloud-dataflow-java/build.gradle | 2 + .../apache/beam/sdk/testing/BatchOnly.java | 29 +++++++++++++++ .../beam/sdk/transforms/CombineTest.java | 21 ++++++----- .../beam/sdk/transforms/CreateTest.java | 7 ++-- .../apache/beam/sdk/transforms/ParDoTest.java | 37 ++++++++++--------- 7 files changed, 67 insertions(+), 42 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BatchOnly.java diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index 24fc17d4c74a..c4edaa85a89d 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,8 +1,3 @@ { - "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", - "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", - "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test", - "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" + "comment": "Modify this file in a trivial way to cause this test suite to run" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json index 7dab8be7160a..c4edaa85a89d 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.json @@ -1,7 +1,3 @@ { - "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", - "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", - "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test" + "comment": "Modify this file in a trivial way to cause this test suite to run" } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 8729bc2032ca..4b6ecbf7e06e 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -440,6 +440,7 @@ task validatesRunnerStreaming { name: 'validatesRunnerLegacyWorkerTestStreaming', pipelineOptions: legacyPipelineOptions + ['--streaming'], excludedCategories: [ + 'org.apache.beam.sdk.testing.BatchOnly', 'org.apache.beam.sdk.testing.UsesCommittedMetrics', 'org.apache.beam.sdk.testing.UsesMapState', 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', @@ -550,6 +551,7 @@ task validatesRunnerV2Streaming { name: 'validatesRunnerV2TestStreaming', pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'], excludedCategories: [ + 'org.apache.beam.sdk.testing.BatchOnly', 'org.apache.beam.sdk.testing.LargeKeys$Above10KB', 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo', 'org.apache.beam.sdk.testing.UsesCommittedMetrics', diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BatchOnly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BatchOnly.java new file mode 100644 index 000000000000..80f85722e123 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BatchOnly.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Category tag for validation tests run on batch mode only. + * + *

Used to dedup tests from streaming test suites as long as its batch counterpart running the + * same test is considered sufficient. + */ +@Internal +public interface BatchOnly {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 993b84a528d7..8ee23947c31d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.testing.BatchOnly; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -665,13 +666,13 @@ public void testSimpleCombine() { } @Test - @Category({ValidatesRunner.class, UsesSideInputs.class}) + @Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSimpleCombineEmpty() { runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList()); } @Test - @Category({ValidatesRunner.class, UsesSideInputs.class}) + @Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testBasicCombine() { runTestBasicCombine( Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)), @@ -682,7 +683,7 @@ public void testBasicCombine() { } @Test - @Category({ValidatesRunner.class, UsesSideInputs.class}) + @Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testBasicCombineEmpty() { runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList()); } @@ -702,7 +703,7 @@ public void testCombinerNames() { } @Test - @Category({ValidatesRunner.class}) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testHotKeyCombining() { PCollection> input = copy( @@ -839,7 +840,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testCombinePerKeyPrimitiveDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); @@ -909,7 +910,7 @@ public void testCombinePerKeyLambda() { /** Tests creation of a per-key binary {@link Combine} via a Java 8 lambda. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testBinaryCombinePerKeyLambda() { PCollection> output = @@ -923,7 +924,7 @@ public void testBinaryCombinePerKeyLambda() { /** Tests creation of a per-key {@link Combine} via a Java 8 method reference. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testCombinePerKeyInstanceMethodReference() { PCollection> output = @@ -937,7 +938,7 @@ public void testCombinePerKeyInstanceMethodReference() { /** Tests creation of a per-key binary {@link Combine} via a Java 8 method reference. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testBinaryCombinePerKeyInstanceMethodReference() { PCollection> output = @@ -1001,7 +1002,7 @@ public void testSimpleCombineWithContext() { } @Test - @Category({ValidatesRunner.class, UsesSideInputs.class}) + @Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSimpleCombineWithContextEmpty() { runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {}); } @@ -1031,7 +1032,7 @@ public void testWithFanoutPreservesSideInputs() { } @Test - @Category({ValidatesRunner.class, UsesSideInputs.class}) + @Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testHotKeyCombineWithSideInputs() { PCollection> input = createInput( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 03f4f1c67c67..90ea453ab4e3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.testing.BatchOnly; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -101,7 +102,7 @@ public void testCreate() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testCreateEmpty() { PCollection output = p.apply(Create.empty(StringUtf8Coder.of())); @@ -125,7 +126,7 @@ public void testCreateEmptyIterableRequiresCoder() { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, BatchOnly.class}) public void testCreateEmptyIterableWithCoder() { PCollection output = p.apply(Create.of(Collections.emptyList()).withCoder(VoidCoder.of())); @@ -161,7 +162,7 @@ public void testPolymorphicType() throws Exception { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testCreateWithNullsAndValues() throws Exception { PCollection output = p.apply( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 8409133772eb..71428949a7fd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -98,6 +98,7 @@ import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.BatchOnly; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -405,7 +406,7 @@ public void testParDo() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testParDoEmpty() { List inputs = Arrays.asList(); @@ -421,7 +422,7 @@ public void testParDoEmpty() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testParDoEmptyOutputs() { List inputs = Arrays.asList(); @@ -442,7 +443,7 @@ public void testParDoTransformNameBasedDoFnWithTrimmedSuffix() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testParDoInCustomTransform() { List inputs = Arrays.asList(3, -42, 666); @@ -629,7 +630,7 @@ public void onTimer(BoundedWindow w) {} } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testPipelineOptionsParameter() { PCollection results = pipeline @@ -728,7 +729,7 @@ public void testParDoWithTaggedOutput() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testParDoEmptyWithTaggedOutput() { TupleTag mainOutputTag = new TupleTag("main") {}; TupleTag additionalOutputTag1 = new TupleTag("additional1") {}; @@ -770,7 +771,7 @@ public void testParDoEmptyWithTaggedOutput() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testParDoWithEmptyTaggedOutput() { TupleTag mainOutputTag = new TupleTag("main") {}; TupleTag additionalOutputTag1 = new TupleTag("additional1") {}; @@ -794,7 +795,7 @@ public void testParDoWithEmptyTaggedOutput() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, BatchOnly.class}) public void testParDoWithOnlyTaggedOutput() { List inputs = Arrays.asList(3, -42, 666); @@ -823,7 +824,7 @@ public void processElement( } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, BatchOnly.class}) public void testParDoWritingToUndeclaredTag() { List inputs = Arrays.asList(3, -42, 666); @@ -906,7 +907,7 @@ public void testSameSideInputReadTwice() { } @Test - @Category({NeedsRunner.class, UsesSideInputs.class}) + @Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSideInputAnnotationFailedValidationMissing() { // SideInput tag id final String sideInputTag1 = "tag1"; @@ -923,7 +924,7 @@ public void processElement(@SideInput(sideInputTag1) String tag1) {} } @Test - @Category({NeedsRunner.class, UsesSideInputs.class}) + @Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSideInputAnnotationFailedValidationSingletonType() { final PCollectionView sideInput1 = @@ -948,7 +949,7 @@ public void processElement(@SideInput(sideInputTag1) String tag1) {} } @Test - @Category({NeedsRunner.class, UsesSideInputs.class}) + @Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSideInputAnnotationFailedValidationListType() { final PCollectionView> sideInput1 = @@ -973,7 +974,7 @@ public void processElement(@SideInput(sideInputTag1) List tag1) {} } @Test - @Category({NeedsRunner.class, UsesSideInputs.class}) + @Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSideInputAnnotationFailedValidationIterableType() { final PCollectionView> sideInput1 = @@ -998,7 +999,7 @@ public void processElement(@SideInput(sideInputTag1) List tag1) {} } @Test - @Category({NeedsRunner.class, UsesSideInputs.class}) + @Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSideInputAnnotationFailedValidationMapType() { final PCollectionView> sideInput1 = @@ -1048,7 +1049,7 @@ public void processElement(@SideInput(sideInputTag1) Map tag1) } @Test - @Category({ValidatesRunner.class, UsesSideInputs.class}) + @Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testSideInputAnnotation() { final PCollectionView> sideInput1 = @@ -1184,7 +1185,7 @@ public void processElement( } @Test - @Category({ValidatesRunner.class, UsesSideInputs.class}) + @Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class}) public void testParDoWithSideInputsIsCumulative() { List inputs = Arrays.asList(3, -42, 666); @@ -1294,7 +1295,7 @@ public void testMultiOutputParDoWithSideInputsIsCumulative() { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, BatchOnly.class}) public void testParDoReadingFromUnknownSideInput() { List inputs = Arrays.asList(3, -42, 666); @@ -1506,7 +1507,7 @@ public void testTaggedOutputUnregisteredExplicitCoder() throws Exception { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, BatchOnly.class}) public void testMainOutputUnregisteredExplicitCoder() { PCollection input = pipeline.apply(Create.of(Arrays.asList(1, 2, 3))); @@ -1524,7 +1525,7 @@ public void testMainOutputUnregisteredExplicitCoder() { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, BatchOnly.class}) public void testMainOutputApplyTaggedOutputNoCoder() { // Regression test: applying a transform to the main output // should not cause a crash based on lack of a coder for the