diff --git a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java index 79c89aee..f3d4586f 100644 --- a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java +++ b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java @@ -19,15 +19,25 @@ package org.apache.flink.training.exercises.hourlytips; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.training.exercises.common.datatypes.TaxiFare; import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator; import org.apache.flink.training.exercises.common.utils.MissingSolutionException; +import org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution; +import org.apache.flink.util.Collector; + +import java.util.HashMap; +import java.util.Map; /** * The Hourly Tips exercise from the Flink training. @@ -37,13 +47,15 @@ */ public class HourlyTipsExercise { + private final Time windowDuration = Time.hours(1); private final SourceFunction source; private final SinkFunction> sink; - /** Creates a job using the source and sink provided. */ + /** + * Creates a job using the source and sink provided. + */ public HourlyTipsExercise( SourceFunction source, SinkFunction> sink) { - this.source = source; this.sink = sink; } @@ -72,22 +84,46 @@ public JobExecutionResult execute() throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // start the data generator - DataStream fares = env.addSource(source); + WatermarkStrategy strategy + = WatermarkStrategy.forMonotonousTimestamps() + .withTimestampAssigner((fare, assigner) -> fare.getEventTimeMillis()); - // replace this with your solution - if (true) { - throw new MissingSolutionException(); - } + DataStream fares = env.addSource(source).assignTimestampsAndWatermarks(strategy); - // the results should be sent to the sink that was passed in - // (otherwise the tests won't work) - // you can end the pipeline with something like this: + // the pipeline: - // DataStream> hourlyMax = ... - // hourlyMax.addSink(sink); + fares .keyBy((TaxiFare fare) -> fare.driverId) + .window(TumblingEventTimeWindows.of(windowDuration)) + .process(new SumWindowTips()) - // execute the pipeline and return the result - return env.execute("Hourly Tips"); + .windowAll(TumblingEventTimeWindows.of(windowDuration)) // non-key grouped stream + .maxBy(2) // max by sumOfTips (f2 in Tuple3) + + .addSink(this.sink) + ; + + return env.execute("Hourly Tips"); // execute the pipeline and return the result + } + + public static class SumWindowTips + extends ProcessWindowFunction, Long, TimeWindow> { + @Override + public void process + ( Long driverId + , Context context + , Iterable faresInWindow + , Collector> out + ) throws Exception + { + float sumOfTips = 0F; + for (TaxiFare f : faresInWindow) + sumOfTips += f.tip; + + Tuple3 rec = Tuple3.of ( context.window().getEnd() + , driverId + , sumOfTips + ); + out.collect(rec); + } } -} +} \ No newline at end of file diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java index f555e5da..65aaa5eb 100644 --- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java +++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java @@ -21,6 +21,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -30,7 +32,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; -import org.apache.flink.training.exercises.common.utils.MissingSolutionException; import org.apache.flink.util.Collector; import java.time.Duration; @@ -44,9 +45,22 @@ *

You should eventually clear any state you create. */ public class LongRidesExercise { + private final Duration maxWaitForLateNotifications = Duration.ofSeconds(60); private final SourceFunction source; private final SinkFunction sink; + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { + LongRidesExercise job = + new LongRidesExercise(new TaxiRideGenerator(), new PrintSinkFunction<>()); + + job.execute(); + } + /** Creates a job using the source and sink provided. */ public LongRidesExercise(SourceFunction source, SinkFunction sink) { this.source = source; @@ -64,51 +78,88 @@ public JobExecutionResult execute() throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // the WatermarkStrategy specifies how to extract timestamps and generate watermarks + WatermarkStrategy watermarkStrategy + = WatermarkStrategy.forBoundedOutOfOrderness(maxWaitForLateNotifications) + .withTimestampAssigner((ride, streamRecordTimestamp) -> ride.getEventTimeMillis()); + // start the data generator - DataStream rides = env.addSource(source); + DataStream rides = env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy); - // the WatermarkStrategy specifies how to extract timestamps and generate watermarks - WatermarkStrategy watermarkStrategy = - WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)) - .withTimestampAssigner( - (ride, streamRecordTimestamp) -> ride.getEventTimeMillis()); - - // create the pipeline - rides.assignTimestampsAndWatermarks(watermarkStrategy) - .keyBy(ride -> ride.rideId) + // the pipeline + rides .keyBy(ride -> ride.rideId) .process(new AlertFunction()) .addSink(sink); - // execute the pipeline and return the result - return env.execute("Long Taxi Rides"); - } - - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { - LongRidesExercise job = - new LongRidesExercise(new TaxiRideGenerator(), new PrintSinkFunction<>()); - - job.execute(); + return env.execute("Long Taxi Rides"); // execute the pipeline and return the result } @VisibleForTesting public static class AlertFunction extends KeyedProcessFunction { + private final Duration maxDuration = Duration.ofHours(2); + private ValueState rideState; + @Override public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); + ValueStateDescriptor rideStateDescriptor = + new ValueStateDescriptor<>("ride event", TaxiRide.class); + this.rideState = getRuntimeContext().getState(rideStateDescriptor); } @Override public void processElement(TaxiRide ride, Context context, Collector out) - throws Exception {} + throws Exception { + TaxiRide firstRideEvent = rideState.value(); + + if (firstRideEvent == null) { // first notification about this ride + rideState.update(ride);// whatever event comes first, remember it + + if (ride.isStart) // set timer for ride that has no end (in due time or at all) + context.timerService().registerEventTimeTimer(getMaxEndTime(ride)); + return; + } + + // we now have both ends of this ride + + if (ride.isStart) { // notification of the start after notification of end + if (rideTooLong(ride, firstRideEvent)) // just check if it was too long + out.collect(ride.rideId); + return; + } + + // the first ride was a START event, so there is a timer unless it has fired + context.timerService().deleteEventTimeTimer(getMaxEndTime(firstRideEvent)); + + if (rideTooLong(firstRideEvent, ride)) // just in case the timer didn't fire yet + out.collect(ride.rideId); + + // both events have now been seen, we can clear the state + // this solution can leak state if an event is missing + // see DISCUSSION.md for more information + rideState.clear(); + } @Override public void onTimer(long timestamp, OnTimerContext context, Collector out) - throws Exception {} + throws Exception { + + out.collect(rideState.value().rideId);// the timer only fires if the ride was too long + + rideState.clear();// clearing now prevents duplicate alerts, but will leak state if the END arrives + } + + private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) { + return Duration.between(startEvent.eventTime, endEvent.eventTime) + .compareTo(maxDuration) + > 0; + } + + private long getMaxEndTime(TaxiRide ride) throws RuntimeException { + if (!ride.isStart) + throw new RuntimeException("Can not get start time from END event."); + return ride.eventTime.plusSeconds(120 * 60).toEpochMilli(); + } + } } diff --git a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java index 1f07312f..4acb4cd7 100644 --- a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java +++ b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; +import org.apache.flink.training.exercises.common.utils.GeoUtils; import org.apache.flink.training.exercises.common.utils.MissingSolutionException; /** @@ -80,7 +81,9 @@ public JobExecutionResult execute() throws Exception { public static class NYCFilter implements FilterFunction { @Override public boolean filter(TaxiRide taxiRide) throws Exception { - throw new MissingSolutionException(); + return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) + && GeoUtils.isInNYC(taxiRide.endLon , taxiRide.endLat ) + ; } } } diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java index 0662dfc0..40e1bb57 100644 --- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java +++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java @@ -19,6 +19,8 @@ package org.apache.flink.training.exercises.ridesandfares; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -32,6 +34,7 @@ import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; import org.apache.flink.training.exercises.common.utils.MissingSolutionException; +import org.apache.flink.training.solutions.ridesandfares.RidesAndFaresSolution; import org.apache.flink.util.Collector; /** @@ -74,7 +77,10 @@ public JobExecutionResult execute() throws Exception { DataStream fares = env.addSource(fareSource).keyBy(fare -> fare.rideId); // Create the pipeline. - rides.connect(fares).flatMap(new EnrichmentFunction()).addSink(sink); + rides .connect(fares) + .flatMap(new EnrichmentFunction()) + .addSink(sink) + ; // Execute the pipeline and return the result. return env.execute("Join Rides with Fares"); @@ -98,20 +104,37 @@ public static void main(String[] args) throws Exception { public static class EnrichmentFunction extends RichCoFlatMapFunction { + private ValueState rideState; + private ValueState fareState; @Override public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); + rideState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class)); + fareState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class)); } @Override public void flatMap1(TaxiRide ride, Collector out) throws Exception { - throw new MissingSolutionException(); + TaxiFare fare = fareState.value(); + if (fare == null) { // 1st fare on stream + rideState.update(ride); + return; + } + fareState.clear(); + out.collect(new RideAndFare(ride, fare)); } @Override public void flatMap2(TaxiFare fare, Collector out) throws Exception { - throw new MissingSolutionException(); + TaxiRide ride = rideState.value(); + if (ride == null) { // 1st ride on stream + fareState.update(fare); + return; + } + rideState.clear(); + out.collect(new RideAndFare(ride, fare)); } } }