diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
new file mode 100644
index 0000000000000..ab27c0095d34a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A function that processes elements of two keyed streams and produces a single output stream.
+ *
+ *
The function will be called for every element in the input streams and can produce zero or
+ * more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also query the
+ * time (both event and processing) and set timers, through the provided {@link Context}. When
+ * reacting to the firing of timers the function can emit yet more elements.
+ *
+ *
An example use case for connected streams is the application of a set of rules that change
+ * over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The
+ * rules contained in {@code stream A} can be stored in the state and wait for new elements to
+ * arrive on {@code stream B}. Upon reception of a new element on {@code stream B}, the function can
+ * apply the previously stored rules to the element and emit a result, and/or register a timer that
+ * will trigger an action in the future.
+ *
+ * @param Type of the key.
+ * @param Type of the first input.
+ * @param Type of the second input.
+ * @param Output type.
+ */
+@Internal
+public abstract class DeclaringAsyncKeyedCoProcessFunction
+ extends KeyedCoProcessFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Override and finalize this method. Please use {@link #declareProcess1} instead. */
+ @Override
+ public final void processElement1(IN1 value, Context ctx, Collector out) throws Exception {
+ throw new IllegalAccessException("This method is replaced by declareProcess1.");
+ }
+
+ /** Override and finalize this method. Please use {@link #declareProcess2} instead. */
+ @Override
+ public final void processElement2(IN2 value, Context ctx, Collector out) throws Exception {
+ throw new IllegalAccessException("This method is replaced by declareProcess2.");
+ }
+
+ /** Override and finalize this method. Please use {@link #declareOnTimer} instead. */
+ public final void onTimer(long timestamp, OnTimerContext ctx, Collector out)
+ throws Exception {
+ throw new IllegalAccessException("This method is replaced by declareOnTimer.");
+ }
+
+ /**
+ * Declaring variables before {@link #declareProcess1} and {@link #declareProcess2} and {@link
+ * #declareOnTimer}.
+ */
+ public void declareVariables(DeclarationContext context) {}
+
+ /**
+ * Declare a process for one element from the first of the connected streams.
+ *
+ *
This function can output zero or more elements using the {@link Collector} parameter and
+ * also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param context the context that provides useful methods to define named callbacks.
+ * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
+ * {@link TimerService} for registering timers and querying the time. The context is only
+ * valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ * @return the whole processing logic just like {@code processElement}.
+ */
+ public abstract ThrowingConsumer declareProcess1(
+ DeclarationContext context, Context ctx, Collector out)
+ throws DeclarationException;
+
+ /**
+ * Declare a process for one element from the second of the connected streams.
+ *
+ *
This function can output zero or more elements using the {@link Collector} parameter and
+ * also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param context the context that provides useful methods to define named callbacks.
+ * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
+ * {@link TimerService} for registering timers and querying the time. The context is only
+ * valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ * @return the whole processing logic just like {@code processElement}.
+ */
+ public abstract ThrowingConsumer declareProcess2(
+ DeclarationContext context, Context ctx, Collector out)
+ throws DeclarationException;
+
+ /**
+ * Declare a procedure which is called when a timer set using {@link TimerService} fires.
+ *
+ * @param context the context that provides useful methods to define named callbacks.
+ * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
+ * TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
+ * registering timers and querying the time. The context is only valid during the invocation
+ * of this method, do not store it.
+ * @param out The processor for processing timestamps.
+ */
+ public ThrowingConsumer declareOnTimer(
+ DeclarationContext context, OnTimerContext ctx, Collector out)
+ throws DeclarationException {
+ return (t) -> {};
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index df79c512a1d96..0c702ed483e10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -383,12 +383,18 @@ public Watermark preProcessWatermark(Watermark watermark) throws Exception {
}
/**
- * A hook that will be invoked after finishing advancing the watermark. It is not recommended to
- * perform async state here. Only some synchronous logic is suggested.
+ * A hook that will be invoked after finishing advancing the watermark and right before the
+ * watermark being emitting downstream. Here is a chance for customization of the emitting
+ * watermark. It is not recommended to perform async state here. Only some synchronous logic is
+ * suggested.
*
* @param watermark the advanced watermark.
+ * @return the watermark that should be emitted to downstream. Null if there is no need for
+ * following emitting.
*/
- public void postProcessWatermark(Watermark watermark) throws Exception {}
+ public Watermark postProcessWatermark(Watermark watermark) throws Exception {
+ return watermark;
+ }
/**
* Process a watermark when receiving it. Do not override this method since the async processing
@@ -425,8 +431,10 @@ public final void processWatermark(Watermark mark) throws Exception {
},
() -> {
if (watermarkRef.get() != null) {
- output.emitWatermark(watermarkRef.get());
- postProcessWatermark(watermarkRef.get());
+ Watermark postProcessWatermark = postProcessWatermark(watermarkRef.get());
+ if (postProcessWatermark != null) {
+ output.emitWatermark(postProcessWatermark);
+ }
}
});
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
similarity index 97%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
index 1480fc0e4cddb..9eb54a23a5a9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.asyncprocessing.operators;
+package org.apache.flink.runtime.asyncprocessing.operators.co;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
@@ -28,6 +28,8 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -417,12 +419,12 @@ public void output(OutputTag outputTag, X value) {
}
@VisibleForTesting
- MapState>> getLeftBuffer() {
+ public MapState>> getLeftBuffer() {
return leftBuffer;
}
@VisibleForTesting
- MapState>> getRightBuffer() {
+ public MapState>> getRightBuffer() {
return rightBuffer;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
new file mode 100644
index 0000000000000..03454e2270961
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
+import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing keyed {@link
+ * KeyedCoProcessFunction KeyedCoProcessFunction}.
+ */
+@Internal
+public class AsyncKeyedCoProcessOperator
+ extends AbstractAsyncStateUdfStreamOperator>
+ implements TwoInputStreamOperator, Triggerable {
+
+ private static final long serialVersionUID = 1L;
+
+ // Shared timestamp variable for collector, context and onTimerContext.
+ private transient DeclaredVariable sharedTimestamp;
+
+ private transient TimestampedCollectorWithDeclaredVariable collector;
+
+ private transient ContextImpl context;
+
+ private transient OnTimerContextImpl onTimerContext;
+
+ private transient ThrowingConsumer processor1;
+ private transient ThrowingConsumer processor2;
+ private transient ThrowingConsumer timerProcessor;
+
+ public AsyncKeyedCoProcessOperator(
+ KeyedCoProcessFunction keyedCoProcessFunction) {
+ super(keyedCoProcessFunction);
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void open() throws Exception {
+ super.open();
+ sharedTimestamp =
+ declarationContext.declareVariable(
+ LongSerializer.INSTANCE,
+ "_AsyncCoKeyedProcessOperator$sharedTimestamp",
+ null);
+
+ collector = new TimestampedCollectorWithDeclaredVariable<>(output, sharedTimestamp);
+
+ InternalTimerService internalTimerService =
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+ TimerService timerService = new SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl<>(userFunction, timerService, sharedTimestamp);
+ onTimerContext = new OnTimerContextImpl<>(userFunction, timerService, declarationContext);
+ if (userFunction instanceof DeclaringAsyncKeyedCoProcessFunction) {
+ DeclaringAsyncKeyedCoProcessFunction declaringFunction =
+ (DeclaringAsyncKeyedCoProcessFunction) userFunction;
+ declaringFunction.declareVariables(declarationContext);
+ processor1 = declaringFunction.declareProcess1(declarationContext, context, collector);
+ processor2 = declaringFunction.declareProcess2(declarationContext, context, collector);
+ timerProcessor =
+ declaringFunction.declareOnTimer(declarationContext, onTimerContext, collector);
+ } else {
+ processor1 = (in) -> userFunction.processElement1(in, context, collector);
+ processor2 = (in) -> userFunction.processElement2(in, context, collector);
+ timerProcessor = (in) -> userFunction.onTimer(in, onTimerContext, collector);
+ }
+ }
+
+ @Override
+ public void processElement1(StreamRecord element) throws Exception {
+ collector.setTimestamp(element);
+ processor1.accept(element.getValue());
+ }
+
+ @Override
+ public void processElement2(StreamRecord element) throws Exception {
+ collector.setTimestamp(element);
+ processor2.accept(element.getValue());
+ }
+
+ @Override
+ public void onEventTime(InternalTimer timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ invokeUserFunction(TimeDomain.EVENT_TIME, timer);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer timer) throws Exception {
+ collector.eraseTimestamp();
+ invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
+ }
+
+ private void invokeUserFunction(TimeDomain timeDomain, InternalTimer timer)
+ throws Exception {
+ onTimerContext.setTime(timer.getTimestamp(), timeDomain);
+ timerProcessor.accept(timer.getTimestamp());
+ }
+
+ public class ContextImpl
+ extends KeyedCoProcessFunction.Context {
+
+ private final TimerService timerService;
+
+ private final DeclaredVariable timestamp;
+
+ ContextImpl(
+ KeyedCoProcessFunction function,
+ TimerService timerService,
+ DeclaredVariable timestamp) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public Long timestamp() {
+ return timestamp.get();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ @Override
+ public void output(OutputTag outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+
+ output.collect(outputTag, new StreamRecord<>(value, timestamp.get()));
+ }
+
+ @Override
+ public K getCurrentKey() {
+ return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey();
+ }
+ }
+
+ private class OnTimerContextImpl
+ extends KeyedCoProcessFunction.OnTimerContext {
+
+ private final TimerService timerService;
+
+ private final DeclaredVariable timeDomain;
+
+ private final DeclaredVariable timestamp;
+
+ OnTimerContextImpl(
+ KeyedCoProcessFunction function,
+ TimerService timerService,
+ DeclarationContext declarationContext) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ this.timeDomain =
+ declarationContext.declareVariable(
+ StringSerializer.INSTANCE, "_OnTimerContextImpl$timeDomain", null);
+ this.timestamp =
+ declarationContext.declareVariable(
+ LongSerializer.INSTANCE, "_OnTimerContextImpl$timestamp", null);
+ }
+
+ public void setTime(long time, TimeDomain one) {
+ timestamp.set(time);
+ timeDomain.set(one.name());
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timestamp.get() != null);
+ return timestamp.get();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+
+ @Override
+ public void output(OutputTag outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+
+ output.collect(outputTag, new StreamRecord<>(value, timestamp()));
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain.get() != null);
+ return TimeDomain.valueOf(timeDomain.get());
+ }
+
+ @Override
+ public K getCurrentKey() {
+ return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey();
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
new file mode 100644
index 0000000000000..ace1297a0a9e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.operators.co;
+
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
+
+/** A {@link KeyedCoProcessOperator} that supports holding back watermarks with a static delay. */
+public class AsyncKeyedCoProcessOperatorWithWatermarkDelay
+ extends AsyncKeyedCoProcessOperator {
+ private static final long serialVersionUID = 1L;
+
+ private final long watermarkDelay;
+
+ public AsyncKeyedCoProcessOperatorWithWatermarkDelay(
+ KeyedCoProcessFunction keyedCoProcessFunction, long watermarkDelay) {
+ super(keyedCoProcessFunction);
+ Preconditions.checkArgument(
+ watermarkDelay >= 0, "The watermark delay should be non-negative.");
+ this.watermarkDelay = watermarkDelay;
+ }
+
+ @Override
+ public Watermark postProcessWatermark(Watermark watermark) throws Exception {
+ if (watermarkDelay == 0) {
+ return watermark;
+ } else {
+ return new Watermark(watermark.getTimestamp() - watermarkDelay);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 3c9347c9a2582..45296ddfad80e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
@@ -26,6 +27,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -69,12 +71,20 @@ public class ConnectedStreams {
protected final StreamExecutionEnvironment environment;
protected final DataStream inputStream1;
protected final DataStream inputStream2;
+ protected boolean isEnableAsyncState;
protected ConnectedStreams(
StreamExecutionEnvironment env, DataStream input1, DataStream input2) {
this.environment = requireNonNull(env);
this.inputStream1 = requireNonNull(input1);
this.inputStream2 = requireNonNull(input2);
+ if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
+ this.isEnableAsyncState =
+ ((KeyedStream) inputStream1).isEnableAsyncState()
+ && ((KeyedStream) inputStream2).isEnableAsyncState();
+ } else {
+ this.isEnableAsyncState = false;
+ }
}
public StreamExecutionEnvironment getExecutionEnvironment() {
@@ -439,7 +449,12 @@ public SingleOutputStreamOperator process(
TwoInputStreamOperator operator;
if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
- operator = new KeyedCoProcessOperator<>(inputStream1.clean(keyedCoProcessFunction));
+ operator =
+ isEnableAsyncState
+ ? new AsyncKeyedCoProcessOperator<>(
+ inputStream1.clean(keyedCoProcessFunction))
+ : new KeyedCoProcessOperator<>(
+ inputStream1.clean(keyedCoProcessFunction));
} else {
throw new UnsupportedOperationException(
"KeyedCoProcessFunction can only be used "
@@ -523,4 +538,25 @@ private SingleOutputStreamOperator doTransform(
return returnStream;
}
+
+ /**
+ * Enable the async state processing for following keyed processing function on connected
+ * streams. This also requires only State V2 APIs are used in the function.
+ *
+ * @return the configured ConnectedStreams itself.
+ */
+ @Experimental
+ public ConnectedStreams enableAsyncState() {
+ if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
+ ((KeyedStream, ?>) inputStream1).enableAsyncState();
+ ((KeyedStream, ?>) inputStream2).enableAsyncState();
+ this.isEnableAsyncState = true;
+ } else {
+ throw new UnsupportedOperationException(
+ "The connected streams do not support async state, "
+ + "please ensure that two input streams of your connected streams are "
+ + "keyed stream(not behind a keyBy()).");
+ }
+ return this;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index dd4818fe30fcb..f082f11b8e607 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -35,8 +35,8 @@
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
+import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
index 007f908c80987..43566988b1e6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
@@ -27,6 +27,7 @@
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import java.util.ArrayList;
import java.util.List;
@@ -239,4 +240,14 @@ public boolean isOutputOnlyAfterEndOfStream() {
public boolean isInternalSorterSupported() {
return operatorFactory.getOperatorAttributes().isInternalSorterSupported();
}
+
+ @Override
+ public void enableAsyncState() {
+ TwoInputStreamOperator operator =
+ (TwoInputStreamOperator)
+ ((SimpleOperatorFactory) operatorFactory).getOperator();
+ if (!(operator instanceof AsyncStateProcessingOperator)) {
+ super.enableAsyncState();
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
index bb234af0eefb0..e5452f5165694 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
@@ -371,8 +371,8 @@ void testWatermarkHooks() throws Exception {
expectedOutput.add(new StreamRecord<>(1002L));
expectedOutput.add(new StreamRecord<>(1L));
expectedOutput.add(new StreamRecord<>(3L));
- expectedOutput.add(new Watermark(3L));
expectedOutput.add(new StreamRecord<>(103L));
+ expectedOutput.add(new Watermark(3L));
testHarness.processWatermark1(new Watermark(4L));
testHarness.processWatermark2(new Watermark(4L));
expectedOutput.add(new StreamRecord<>(1004L));
@@ -380,8 +380,8 @@ void testWatermarkHooks() throws Exception {
testHarness.processWatermark2(new Watermark(5L));
expectedOutput.add(new StreamRecord<>(1005L));
expectedOutput.add(new StreamRecord<>(4L));
- expectedOutput.add(new Watermark(6L));
expectedOutput.add(new StreamRecord<>(106L));
+ expectedOutput.add(new Watermark(6L));
TestHarnessUtil.assertOutputEquals(
"Output was not correct", expectedOutput, testHarness.getOutput());
@@ -690,10 +690,11 @@ public Watermark preProcessWatermark(Watermark watermark) throws Exception {
}
@Override
- public void postProcessWatermark(Watermark watermark) throws Exception {
+ public Watermark postProcessWatermark(Watermark watermark) throws Exception {
if (postProcessFunction != null) {
postProcessFunction.accept(watermark);
}
+ return watermark;
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
index dc08ed7f76c9c..2c250380969e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
@@ -25,6 +25,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
new file mode 100644
index 0000000000000..d7113471c35a9
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.asyncprocessing.operators;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
+import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
+import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AsyncKeyedCoProcessOperator}. */
+class AsyncKeyedCoProcessOperatorTest {
+
+ @Test
+ void testDeclareProcessor() throws Exception {
+ TestChainDeclarationFunction function = new TestChainDeclarationFunction();
+ AsyncKeyedCoProcessOperator operator =
+ new AsyncKeyedCoProcessOperator<>(function);
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ ArrayList> expectedOutput = new ArrayList<>();
+
+ testHarness.open();
+ testHarness.processElement1(new StreamRecord<>(5));
+ expectedOutput.add(new StreamRecord<>("11"));
+ assertThat(function.value.get()).isEqualTo(11);
+ testHarness.processElement2(new StreamRecord<>("6"));
+ expectedOutput.add(new StreamRecord<>("6"));
+ assertThat(function.value.get()).isEqualTo(17);
+ assertThat(testHarness.getOutput()).containsExactly(expectedOutput.toArray());
+ }
+
+ @Test
+ void testTimestampAndWatermarkQuerying() throws Exception {
+
+ AsyncKeyedCoProcessOperator operator =
+ new AsyncKeyedCoProcessOperator<>(new WatermarkQueryingProcessFunction());
+
+ AsyncKeyedTwoInputStreamOperatorTestHarness testHarness =
+ AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(17));
+ testHarness.processWatermark2(new Watermark(17));
+ testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+ testHarness.processWatermark1(new Watermark(42));
+ testHarness.processWatermark2(new Watermark(42));
+ testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+ ConcurrentLinkedQueue