Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing.functions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.ThrowingConsumer;

/**
* A function that processes elements of two keyed streams and produces a single output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero or
* more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also query the
* time (both event and processing) and set timers, through the provided {@link Context}. When
* reacting to the firing of timers the function can emit yet more elements.
*
* <p>An example use case for connected streams is the application of a set of rules that change
* over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The
* rules contained in {@code stream A} can be stored in the state and wait for new elements to
* arrive on {@code stream B}. Upon reception of a new element on {@code stream B}, the function can
* apply the previously stored rules to the element and emit a result, and/or register a timer that
* will trigger an action in the future.
*
* @param <K> Type of the key.
* @param <IN1> Type of the first input.
* @param <IN2> Type of the second input.
* @param <OUT> Output type.
*/
@Internal
public abstract class DeclaringAsyncKeyedCoProcessFunction<K, IN1, IN2, OUT>
extends KeyedCoProcessFunction<K, IN1, IN2, OUT> {

private static final long serialVersionUID = 1L;

/** Override and finalize this method. Please use {@link #declareProcess1} instead. */
@Override
public final void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception {
throw new IllegalAccessException("This method is replaced by declareProcess1.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this exception message mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is inherited from KeyedCoProcessFunction, but we want to use declareProcess1 to replace it.
This method should not be called as expected, so an exception thrown to prevent misuse.

}

/** Override and finalize this method. Please use {@link #declareProcess2} instead. */
@Override
public final void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception {
throw new IllegalAccessException("This method is replaced by declareProcess2.");
}

/** Override and finalize this method. Please use {@link #declareOnTimer} instead. */
public final void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out)
throws Exception {
throw new IllegalAccessException("This method is replaced by declareOnTimer.");
}

/**
* Declaring variables before {@link #declareProcess1} and {@link #declareProcess2} and {@link
* #declareOnTimer}.
*/
public void declareVariables(DeclarationContext context) {}

/**
* Declare a process for one element from the first of the connected streams.
*
* <p>This function can output zero or more elements using the {@link Collector} parameter and
* also update internal state or set timers using the {@link Context} parameter.
*
* @param context the context that provides useful methods to define named callbacks.
* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
* {@link TimerService} for registering timers and querying the time. The context is only
* valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
* @return the whole processing logic just like {@code processElement}.
*/
public abstract ThrowingConsumer<IN1, Exception> declareProcess1(
DeclarationContext context, Context ctx, Collector<OUT> out)
throws DeclarationException;

/**
* Declare a process for one element from the second of the connected streams.
*
* <p>This function can output zero or more elements using the {@link Collector} parameter and
* also update internal state or set timers using the {@link Context} parameter.
*
* @param context the context that provides useful methods to define named callbacks.
* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a
* {@link TimerService} for registering timers and querying the time. The context is only
* valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
* @return the whole processing logic just like {@code processElement}.
*/
public abstract ThrowingConsumer<IN2, Exception> declareProcess2(
DeclarationContext context, Context ctx, Collector<OUT> out)
throws DeclarationException;

/**
* Declare a procedure which is called when a timer set using {@link TimerService} fires.
Copy link
Contributor

@davidradl davidradl Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the words say it is declaring a procedure but the output parameter says it is a processor. I would expect these words to be the same. The method name should align with what we are declaring. maybe declareStream2Processor

*
* @param context the context that provides useful methods to define named callbacks.
* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link
* TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for
* registering timers and querying the time. The context is only valid during the invocation
* of this method, do not store it.
* @param out The processor for processing timestamps.
*/
public ThrowingConsumer<Long, Exception> declareOnTimer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: declareOnTimer -> declareOnTimerProcessor

DeclarationContext context, OnTimerContext ctx, Collector<OUT> out)
throws DeclarationException {
return (t) -> {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,18 @@ public Watermark preProcessWatermark(Watermark watermark) throws Exception {
}

/**
* A hook that will be invoked after finishing advancing the watermark. It is not recommended to
* perform async state here. Only some synchronous logic is suggested.
* A hook that will be invoked after finishing advancing the watermark and right before the
* watermark being emitting downstream. Here is a chance for customization of the emitting
* watermark. It is not recommended to perform async state here. Only some synchronous logic is
* suggested.
*
* @param watermark the advanced watermark.
* @return the watermark that should be emitted to downstream. Null if there is no need for
* following emitting.
*/
public void postProcessWatermark(Watermark watermark) throws Exception {}
public Watermark postProcessWatermark(Watermark watermark) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change the javadoc of this method to :

A hook that will be invoked after finishing advancing the watermark and right before the watermark being emitting downstream. Here is a chance for customization of the emitting watermark. ....(and following description)

return watermark;
}

/**
* Process a watermark when receiving it. Do not override this method since the async processing
Expand Down Expand Up @@ -425,8 +431,10 @@ public final void processWatermark(Watermark mark) throws Exception {
},
() -> {
if (watermarkRef.get() != null) {
output.emitWatermark(watermarkRef.get());
postProcessWatermark(watermarkRef.get());
Watermark postProcessWatermark = postProcessWatermark(watermarkRef.get());
if (postProcessWatermark != null) {
output.emitWatermark(postProcessWatermark);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing.operators;
package org.apache.flink.runtime.asyncprocessing.operators.co;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
Expand All @@ -28,6 +28,8 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
Expand Down Expand Up @@ -417,12 +419,12 @@ public <X> void output(OutputTag<X> outputTag, X value) {
}

@VisibleForTesting
MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() {
public MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because AsyncIntervalJoinOperator is moved into asyncprocessing package, and AsyncIntervalJoinOperatorTest still needs these methods.

return leftBuffer;
}

@VisibleForTesting
MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> getRightBuffer() {
public MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> getRightBuffer() {
return rightBuffer;
}
}
Loading