Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mircea - module 1 #68

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,25 @@
package org.apache.flink.training.exercises.hourlytips;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

/**
* The Hourly Tips exercise from the Flink training.
Expand All @@ -37,13 +47,15 @@
*/
public class HourlyTipsExercise {

private final Time windowDuration = Time.hours(1);
private final SourceFunction<TaxiFare> source;
private final SinkFunction<Tuple3<Long, Long, Float>> sink;

/** Creates a job using the source and sink provided. */
/**
* Creates a job using the source and sink provided.
*/
public HourlyTipsExercise(
SourceFunction<TaxiFare> source, SinkFunction<Tuple3<Long, Long, Float>> sink) {

this.source = source;
this.sink = sink;
}
Expand Down Expand Up @@ -72,22 +84,46 @@ public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start the data generator
DataStream<TaxiFare> fares = env.addSource(source);
WatermarkStrategy<TaxiFare> strategy
= WatermarkStrategy.<TaxiFare>forMonotonousTimestamps()
.withTimestampAssigner((fare, assigner) -> fare.getEventTimeMillis());

// replace this with your solution
if (true) {
throw new MissingSolutionException();
}
DataStream<TaxiFare> fares = env.addSource(source).assignTimestampsAndWatermarks(strategy);

// the results should be sent to the sink that was passed in
// (otherwise the tests won't work)
// you can end the pipeline with something like this:
// the pipeline:

// DataStream<Tuple3<Long, Long, Float>> hourlyMax = ...
// hourlyMax.addSink(sink);
fares .keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(windowDuration))
.process(new SumWindowTips())

// execute the pipeline and return the result
return env.execute("Hourly Tips");
.windowAll(TumblingEventTimeWindows.of(windowDuration)) // non-key grouped stream
.maxBy(2) // max by sumOfTips (f2 in Tuple3)

.addSink(this.sink)
;

return env.execute("Hourly Tips"); // execute the pipeline and return the result
}

public static class SumWindowTips
extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
@Override
public void process
( Long driverId
, Context context
, Iterable<TaxiFare> faresInWindow
, Collector<Tuple3<Long, Long, Float>> out
) throws Exception
{
float sumOfTips = 0F;
for (TaxiFare f : faresInWindow)
sumOfTips += f.tip;

Tuple3<Long, Long, Float> rec = Tuple3.of ( context.window().getEnd()
, driverId
, sumOfTips
);
out.collect(rec);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -30,7 +32,6 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;

import java.time.Duration;
Expand All @@ -44,9 +45,22 @@
* <p>You should eventually clear any state you create.
*/
public class LongRidesExercise {
private final Duration maxWaitForLateNotifications = Duration.ofSeconds(60);
private final SourceFunction<TaxiRide> source;
private final SinkFunction<Long> sink;

/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
LongRidesExercise job =
new LongRidesExercise(new TaxiRideGenerator(), new PrintSinkFunction<>());

job.execute();
}

/** Creates a job using the source and sink provided. */
public LongRidesExercise(SourceFunction<TaxiRide> source, SinkFunction<Long> sink) {
this.source = source;
Expand All @@ -64,51 +78,88 @@ public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// the WatermarkStrategy specifies how to extract timestamps and generate watermarks
WatermarkStrategy<TaxiRide> watermarkStrategy
= WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(maxWaitForLateNotifications)
.withTimestampAssigner((ride, streamRecordTimestamp) -> ride.getEventTimeMillis());

// start the data generator
DataStream<TaxiRide> rides = env.addSource(source);
DataStream<TaxiRide> rides = env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy);

// the WatermarkStrategy specifies how to extract timestamps and generate watermarks
WatermarkStrategy<TaxiRide> watermarkStrategy =
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner(
(ride, streamRecordTimestamp) -> ride.getEventTimeMillis());

// create the pipeline
rides.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(ride -> ride.rideId)
// the pipeline
rides .keyBy(ride -> ride.rideId)
.process(new AlertFunction())
.addSink(sink);

// execute the pipeline and return the result
return env.execute("Long Taxi Rides");
}

/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
LongRidesExercise job =
new LongRidesExercise(new TaxiRideGenerator(), new PrintSinkFunction<>());

job.execute();
return env.execute("Long Taxi Rides"); // execute the pipeline and return the result
}

@VisibleForTesting
public static class AlertFunction extends KeyedProcessFunction<Long, TaxiRide, Long> {

private final Duration maxDuration = Duration.ofHours(2);
private ValueState<TaxiRide> rideState;

@Override
public void open(Configuration config) throws Exception {
throw new MissingSolutionException();
ValueStateDescriptor<TaxiRide> rideStateDescriptor =
new ValueStateDescriptor<>("ride event", TaxiRide.class);
this.rideState = getRuntimeContext().getState(rideStateDescriptor);
}

@Override
public void processElement(TaxiRide ride, Context context, Collector<Long> out)
throws Exception {}
throws Exception {
TaxiRide firstRideEvent = rideState.value();

if (firstRideEvent == null) { // first notification about this ride
rideState.update(ride);// whatever event comes first, remember it

if (ride.isStart) // set timer for ride that has no end (in due time or at all)
context.timerService().registerEventTimeTimer(getMaxEndTime(ride));
return;
}

// we now have both ends of this ride

if (ride.isStart) { // notification of the start after notification of end
if (rideTooLong(ride, firstRideEvent)) // just check if it was too long
out.collect(ride.rideId);
return;
}

// the first ride was a START event, so there is a timer unless it has fired
context.timerService().deleteEventTimeTimer(getMaxEndTime(firstRideEvent));

if (rideTooLong(firstRideEvent, ride)) // just in case the timer didn't fire yet
out.collect(ride.rideId);

// both events have now been seen, we can clear the state
// this solution can leak state if an event is missing
// see DISCUSSION.md for more information
rideState.clear();
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out)
throws Exception {}
throws Exception {

out.collect(rideState.value().rideId);// the timer only fires if the ride was too long

rideState.clear();// clearing now prevents duplicate alerts, but will leak state if the END arrives
}

private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) {
return Duration.between(startEvent.eventTime, endEvent.eventTime)
.compareTo(maxDuration)
> 0;
}

private long getMaxEndTime(TaxiRide ride) throws RuntimeException {
if (!ride.isStart)
throw new RuntimeException("Can not get start time from END event.");
return ride.eventTime.plusSeconds(120 * 60).toEpochMilli();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.GeoUtils;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;

/**
Expand Down Expand Up @@ -80,7 +81,9 @@ public JobExecutionResult execute() throws Exception {
public static class NYCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
throw new MissingSolutionException();
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
&& GeoUtils.isInNYC(taxiRide.endLon , taxiRide.endLat )
;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.training.exercises.ridesandfares;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -32,6 +34,7 @@
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.training.solutions.ridesandfares.RidesAndFaresSolution;
import org.apache.flink.util.Collector;

/**
Expand Down Expand Up @@ -74,7 +77,10 @@ public JobExecutionResult execute() throws Exception {
DataStream<TaxiFare> fares = env.addSource(fareSource).keyBy(fare -> fare.rideId);

// Create the pipeline.
rides.connect(fares).flatMap(new EnrichmentFunction()).addSink(sink);
rides .connect(fares)
.flatMap(new EnrichmentFunction())
.addSink(sink)
;

// Execute the pipeline and return the result.
return env.execute("Join Rides with Fares");
Expand All @@ -98,20 +104,37 @@ public static void main(String[] args) throws Exception {

public static class EnrichmentFunction
extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;

@Override
public void open(Configuration config) throws Exception {
throw new MissingSolutionException();
rideState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}

@Override
public void flatMap1(TaxiRide ride, Collector<RideAndFare> out) throws Exception {
throw new MissingSolutionException();
TaxiFare fare = fareState.value();
if (fare == null) { // 1st fare on stream
rideState.update(ride);
return;
}
fareState.clear();
out.collect(new RideAndFare(ride, fare));
}

@Override
public void flatMap2(TaxiFare fare, Collector<RideAndFare> out) throws Exception {
throw new MissingSolutionException();
TaxiRide ride = rideState.value();
if (ride == null) { // 1st ride on stream
fareState.update(fare);
return;
}
rideState.clear();
out.collect(new RideAndFare(ride, fare));
}
}
}