Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
"comment": "Modify this file in a trivial way to cause this test suite to run"
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test"
"comment": "Modify this file in a trivial way to cause this test suite to run"
}
2 changes: 2 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ task validatesRunnerStreaming {
name: 'validatesRunnerLegacyWorkerTestStreaming',
pipelineOptions: legacyPipelineOptions + ['--streaming'],
excludedCategories: [
'org.apache.beam.sdk.testing.BatchOnly',
'org.apache.beam.sdk.testing.UsesCommittedMetrics',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
Expand Down Expand Up @@ -550,6 +551,7 @@ task validatesRunnerV2Streaming {
name: 'validatesRunnerV2TestStreaming',
pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'],
excludedCategories: [
'org.apache.beam.sdk.testing.BatchOnly',
'org.apache.beam.sdk.testing.LargeKeys$Above10KB',
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo',
'org.apache.beam.sdk.testing.UsesCommittedMetrics',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;

import org.apache.beam.sdk.annotations.Internal;

/**
* Category tag for validation tests run on batch mode only.
*
* <p>Used to dedup tests from streaming test suites as long as its batch counterpart running the
* same test is considered sufficient.
*/
@Internal
public interface BatchOnly {}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.BatchOnly;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -665,13 +666,13 @@ public void testSimpleCombine() {
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
@Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSimpleCombineEmpty() {
runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList());
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
@Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since streaming and batch have really different execution models, I would not opt this in to being batch only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, and the name is subject to change. Currently just use if for manual testing

public void testBasicCombine() {
runTestBasicCombine(
Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)),
Expand All @@ -682,7 +683,7 @@ public void testBasicCombine() {
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
@Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testBasicCombineEmpty() {
runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList());
}
Expand All @@ -702,7 +703,7 @@ public void testCombinerNames() {
}

@Test
@Category({ValidatesRunner.class})
@Category({ValidatesRunner.class, BatchOnly.class})
public void testHotKeyCombining() {
PCollection<KV<String, Integer>> input =
copy(
Expand Down Expand Up @@ -839,7 +840,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testCombinePerKeyPrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();

Expand Down Expand Up @@ -909,7 +910,7 @@ public void testCombinePerKeyLambda() {

/** Tests creation of a per-key binary {@link Combine} via a Java 8 lambda. */
@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testBinaryCombinePerKeyLambda() {

PCollection<KV<String, Integer>> output =
Expand All @@ -923,7 +924,7 @@ public void testBinaryCombinePerKeyLambda() {

/** Tests creation of a per-key {@link Combine} via a Java 8 method reference. */
@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testCombinePerKeyInstanceMethodReference() {

PCollection<KV<String, Integer>> output =
Expand All @@ -937,7 +938,7 @@ public void testCombinePerKeyInstanceMethodReference() {

/** Tests creation of a per-key binary {@link Combine} via a Java 8 method reference. */
@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testBinaryCombinePerKeyInstanceMethodReference() {

PCollection<KV<String, Integer>> output =
Expand Down Expand Up @@ -1001,7 +1002,7 @@ public void testSimpleCombineWithContext() {
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
@Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSimpleCombineWithContextEmpty() {
runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {});
}
Expand Down Expand Up @@ -1031,7 +1032,7 @@ public void testWithFanoutPreservesSideInputs() {
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
@Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testHotKeyCombineWithSideInputs() {
PCollection<KV<String, Integer>> input =
createInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.BatchOnly;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
Expand Down Expand Up @@ -101,7 +102,7 @@ public void testCreate() {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming and batch actually have somewhat different Create implementations I think

public void testCreateEmpty() {
PCollection<String> output = p.apply(Create.empty(StringUtf8Coder.of()));

Expand All @@ -125,7 +126,7 @@ public void testCreateEmptyIterableRequiresCoder() {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, BatchOnly.class})
public void testCreateEmptyIterableWithCoder() {
PCollection<Void> output =
p.apply(Create.of(Collections.<Void>emptyList()).withCoder(VoidCoder.of()));
Expand Down Expand Up @@ -161,7 +162,7 @@ public void testPolymorphicType() throws Exception {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testCreateWithNullsAndValues() throws Exception {
PCollection<String> output =
p.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.BatchOnly;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -405,7 +406,7 @@ public void testParDo() {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testParDoEmpty() {

List<Integer> inputs = Arrays.asList();
Expand All @@ -421,7 +422,7 @@ public void testParDoEmpty() {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testParDoEmptyOutputs() {

List<Integer> inputs = Arrays.asList();
Expand All @@ -442,7 +443,7 @@ public void testParDoTransformNameBasedDoFnWithTrimmedSuffix() {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testParDoInCustomTransform() {

List<Integer> inputs = Arrays.asList(3, -42, 666);
Expand Down Expand Up @@ -629,7 +630,7 @@ public void onTimer(BoundedWindow w) {}
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testPipelineOptionsParameter() {
PCollection<String> results =
pipeline
Expand Down Expand Up @@ -728,7 +729,7 @@ public void testParDoWithTaggedOutput() {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testParDoEmptyWithTaggedOutput() {
TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1") {};
Expand Down Expand Up @@ -770,7 +771,7 @@ public void testParDoEmptyWithTaggedOutput() {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testParDoWithEmptyTaggedOutput() {
TupleTag<String> mainOutputTag = new TupleTag<String>("main") {};
TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1") {};
Expand All @@ -794,7 +795,7 @@ public void testParDoWithEmptyTaggedOutput() {
}

@Test
@Category(ValidatesRunner.class)
@Category({ValidatesRunner.class, BatchOnly.class})
public void testParDoWithOnlyTaggedOutput() {

List<Integer> inputs = Arrays.asList(3, -42, 666);
Expand Down Expand Up @@ -823,7 +824,7 @@ public void processElement(
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, BatchOnly.class})
public void testParDoWritingToUndeclaredTag() {
List<Integer> inputs = Arrays.asList(3, -42, 666);

Expand Down Expand Up @@ -906,7 +907,7 @@ public void testSameSideInputReadTwice() {
}

@Test
@Category({NeedsRunner.class, UsesSideInputs.class})
@Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSideInputAnnotationFailedValidationMissing() {
// SideInput tag id
final String sideInputTag1 = "tag1";
Expand All @@ -923,7 +924,7 @@ public void processElement(@SideInput(sideInputTag1) String tag1) {}
}

@Test
@Category({NeedsRunner.class, UsesSideInputs.class})
@Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSideInputAnnotationFailedValidationSingletonType() {

final PCollectionView<Integer> sideInput1 =
Expand All @@ -948,7 +949,7 @@ public void processElement(@SideInput(sideInputTag1) String tag1) {}
}

@Test
@Category({NeedsRunner.class, UsesSideInputs.class})
@Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSideInputAnnotationFailedValidationListType() {

final PCollectionView<List<Integer>> sideInput1 =
Expand All @@ -973,7 +974,7 @@ public void processElement(@SideInput(sideInputTag1) List<String> tag1) {}
}

@Test
@Category({NeedsRunner.class, UsesSideInputs.class})
@Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSideInputAnnotationFailedValidationIterableType() {

final PCollectionView<Iterable<Integer>> sideInput1 =
Expand All @@ -998,7 +999,7 @@ public void processElement(@SideInput(sideInputTag1) List<String> tag1) {}
}

@Test
@Category({NeedsRunner.class, UsesSideInputs.class})
@Category({NeedsRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSideInputAnnotationFailedValidationMapType() {

final PCollectionView<Map<Integer, Integer>> sideInput1 =
Expand Down Expand Up @@ -1048,7 +1049,7 @@ public void processElement(@SideInput(sideInputTag1) Map<Integer, Integer> tag1)
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
@Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testSideInputAnnotation() {

final PCollectionView<List<Integer>> sideInput1 =
Expand Down Expand Up @@ -1184,7 +1185,7 @@ public void processElement(
}

@Test
@Category({ValidatesRunner.class, UsesSideInputs.class})
@Category({ValidatesRunner.class, UsesSideInputs.class, BatchOnly.class})
public void testParDoWithSideInputsIsCumulative() {

List<Integer> inputs = Arrays.asList(3, -42, 666);
Expand Down Expand Up @@ -1294,7 +1295,7 @@ public void testMultiOutputParDoWithSideInputsIsCumulative() {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, BatchOnly.class})
public void testParDoReadingFromUnknownSideInput() {

List<Integer> inputs = Arrays.asList(3, -42, 666);
Expand Down Expand Up @@ -1506,7 +1507,7 @@ public void testTaggedOutputUnregisteredExplicitCoder() throws Exception {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, BatchOnly.class})
public void testMainOutputUnregisteredExplicitCoder() {

PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
Expand All @@ -1524,7 +1525,7 @@ public void testMainOutputUnregisteredExplicitCoder() {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, BatchOnly.class})
public void testMainOutputApplyTaggedOutputNoCoder() {
// Regression test: applying a transform to the main output
// should not cause a crash based on lack of a coder for the
Expand Down
Loading