Skip to content

Commit

Permalink
[FLINK-37268][example] Use async state in flink-examples-streaming (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored Feb 11, 2025
1 parent 064bfe4 commit 759635d
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
Expand Down Expand Up @@ -95,6 +96,8 @@ public static void main(String[] args) throws Exception {
text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
}

final boolean asyncState = params.has("async-state");

SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized =
textWithTimestampAndWatermark.process(new Tokenizer());

Expand All @@ -103,9 +106,15 @@ public static void main(String[] args) throws Exception {
.getSideOutput(rejectedWordsTag)
.map(value -> "rejected: " + value, Types.STRING);

KeyedStream<Tuple2<String, Integer>, String> keyedTokenized =
tokenized.keyBy(value -> value.f0);

if (asyncState) {
keyedTokenized.enableAsyncState();
}

DataStream<Tuple2<String, Integer>> counts =
tokenized
.keyBy(value -> value.f0)
keyedTokenized
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
// group by the tuple field "0" and sum up tuple field "1"
.sum(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.ParameterTool;
Expand All @@ -46,14 +47,16 @@ public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
final boolean asyncState;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
asyncState = params.has("async-state");
} catch (Exception e) {
System.err.println(
"No port specified. Please run 'SocketWindowWordCount "
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
+ "--hostname <hostname> --port <port> [--asyncState]', where hostname (localhost by default) "
+ "and port is the address of the text server");
System.err.println(
"To start a simple text server, run 'netcat -l <port>' and "
Expand All @@ -68,7 +71,7 @@ public static void main(String[] args) throws Exception {
DataStream<String> text = env.socketTextStream(hostname, port, "\n");

// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts =
KeyedStream<WordWithCount, String> keyedStream =
text.flatMap(
(FlatMapFunction<String, WordWithCount>)
(value, out) -> {
Expand All @@ -77,7 +80,12 @@ public static void main(String[] args) throws Exception {
}
},
Types.POJO(WordWithCount.class))
.keyBy(value -> value.word)
.keyBy(value -> value.word);
if (asyncState) {
keyedStream = keyedStream.enableAsyncState();
}
DataStream<WordWithCount> windowCounts =
keyedStream
.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
.reduce((a, b) -> new WordWithCount(a.word, a.count + b.count))
.returns(WordWithCount.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
Expand Down Expand Up @@ -71,7 +72,7 @@ public static void main(String[] args) throws Exception {
System.out.println(
"Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]");
System.out.println("Options for both the above setups: ");
System.out.println("\t[--backend <hashmap|rocks>]");
System.out.println("\t[--backend <hashmap|rocksdb|forst>]");
System.out.println("\t[--checkpoint-dir <filepath>]");
System.out.println("\t[--incremental-checkpoints <true|false>]");
System.out.println("\t[--output <filepath> OR null for stdout]");
Expand All @@ -91,12 +92,10 @@ public static void main(String[] args) throws Exception {
configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
} else if ("rocks".equals(stateBackend)) {
} else if ("rocksdb".equals(stateBackend) || "forst".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
configuration.set(
StateBackendOptions.STATE_BACKEND,
"org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory");
configuration.set(StateBackendOptions.STATE_BACKEND, stateBackend);
configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incrementalCheckpoints);
configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
Expand Down Expand Up @@ -164,7 +163,7 @@ public static void main(String[] args) throws Exception {
// partition on the address to make sure equal addresses
// end up in the same state machine flatMap function
.keyBy(Event::sourceAddress)

.enableAsyncState()
// the function that evaluates the state machine over the sequence of events
.flatMap(new StateMachineMapper());

Expand Down Expand Up @@ -206,32 +205,41 @@ static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {
public void open(OpenContext openContext) {
// get access to the state object
currentState =
getRuntimeContext().getState(new ValueStateDescriptor<>("state", State.class));
getRuntimeContext()
.getState(
new ValueStateDescriptor<>(
"state", TypeExtractor.createTypeInfo(State.class)));
}

@Override
public void flatMap(Event evt, Collector<Alert> out) throws Exception {
// get the current state for the key (source address)
// if no state exists, yet, the state must be the state machine's initial state
State state = currentState.value();
if (state == null) {
state = State.Initial;
}

// ask the state machine what state we should go to based on the given event
State nextState = state.transition(evt.type());

if (nextState == State.InvalidTransition) {
// the current event resulted in an invalid transition
// raise an alert!
out.collect(new Alert(evt.sourceAddress(), state, evt.type()));
} else if (nextState.isTerminal()) {
// we reached a terminal state, clean up the current state
currentState.clear();
} else {
// remember the new state
currentState.update(nextState);
}
currentState
.asyncValue()
.thenAccept(
state -> {
// if no state exists, yet, the state must be the state machine's
// initial state
if (state == null) {
state = State.Initial;
}

// ask the state machine what state we should go to based on the
// given event

State nextState = state.transition(evt.type());
if (nextState == State.InvalidTransition) {
// the current event resulted in an invalid transition
// raise an alert!
out.collect(new Alert(evt.sourceAddress(), state, evt.type()));
} else if (nextState.isTerminal()) {
// we reached a terminal state, clean up the current state
currentState.asyncClear();
} else {
// remember the new state
currentState.asyncUpdate(nextState);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ParameterTool;

import java.time.Duration;

/** An example of grouped stream windowing into sliding time windows. */
public class GroupedProcessingTimeWindowExample {

public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final boolean asyncState = params.has("async-state");

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Expand All @@ -57,8 +61,12 @@ public static void main(String[] args) throws Exception {

DataStream<Tuple2<Long, Long>> stream =
env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator");
KeyedStream<Tuple2<Long, Long>, Long> keyedStream = stream.keyBy(value -> value.f0);
if (asyncState) {
keyedStream = keyedStream.enableAsyncState();
}

stream.keyBy(value -> value.f0)
keyedStream
.window(
SlidingProcessingTimeWindows.of(
Duration.ofMillis(2500), Duration.ofMillis(500)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
Expand All @@ -52,6 +53,7 @@ public static void main(String[] args) throws Exception {
env.setParallelism(1);

final boolean fileOutput = params.has("output");
final boolean asyncState = params.has("async-state");

final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();

Expand Down Expand Up @@ -81,11 +83,15 @@ public static void main(String[] args) throws Exception {
.withTimestampAssigner((event, timestamp) -> event.f1),
"Generated data source");

KeyedStream<Tuple3<String, Long, Integer>, String> keyedStream =
source.keyBy(value -> value.f0);
if (asyncState) {
keyedStream = keyedStream.enableAsyncState();
}

// We create sessions for each id with max timeout of 3 time units
DataStream<Tuple3<String, Long, Integer>> aggregated =
source.keyBy(value -> value.f0)
.window(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)))
.sum(2);
keyedStream.window(EventTimeSessionWindows.withGap(Duration.ofMillis(3L))).sum(2);

if (fileOutput) {
aggregated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
Expand Down Expand Up @@ -120,13 +121,18 @@ public static void main(String[] args) throws Exception {

int evictionSec = 10;
double triggerMeters = 50;
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds =
KeyedStream<Tuple4<Integer, Integer, Double, Long>, Integer> keyedStream =
carData.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<Integer, Integer, Double, Long>>
forMonotonousTimestamps()
.withTimestampAssigner((car, ts) -> car.f3))
.keyBy(value -> value.f0)
.keyBy(value -> value.f0);
if (params.isAsyncState()) {
keyedStream = keyedStream.enableAsyncState();
}
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds =
keyedStream
.window(GlobalWindows.create())
.evictor(TimeEvictor.of(Duration.ofSeconds(evictionSec)))
.trigger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* </code>.
* <li><code>--execution-mode &lt;mode&gt;</code>The execution mode (BATCH, STREAMING, or
* AUTOMATIC) of this pipeline.
* <li><code>--async-state</code> Whether enable async state.
* </ul>
*
* <p>This example shows how to:
Expand Down
Loading

0 comments on commit 759635d

Please sign in to comment.