The task of the "Hourly Tips" exercise is to identify, for each hour, the driver earning the most tips. It's easiest to approach this in two steps: first use hour-long windows that compute the total tips for each driver during the hour, and then from that stream of window results, find the driver with the maximum tip total for each hour.
Please note that the program should operate in event time.
The input data of this exercise is a stream of TaxiFare
events generated by the Taxi Fare Stream Generator.
The TaxiFareGenerator
annotates the generated DataStream<TaxiFare>
with timestamps and watermarks. Hence, there is no need to provide a custom timestamp and watermark assigner in order to correctly use event time.
The result of this exercise is a data stream of Tuple3<Long, Long, Float>
records, one for each hour. Each hourly record should contain the timestamp at the end of the hour, the driverId of the driver earning the most in tips during that hour, and the actual total of their tips.
The resulting stream should be printed to standard out.
ℹ️ Rather than following these links to the sources, you might prefer to open these classes in your IDE.
- Exercise:
org.apache.flink.training.exercises.hourlytips.HourlyTipsTableExercise
- Tests:
org.apache.flink.training.exercises.hourlytips.HourlyTipsTableTest
- Java:
org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise
- Scala:
org.apache.flink.training.exercises.hourlytips.scala.HourlyTipsExercise
- Java:
org.apache.flink.training.exercises.hourlytips.HourlyTipsTest
- Scala:
org.apache.flink.training.exercises.hourlytips.scala.HourlyTipsTest
Program Structure
Note that it is possible to cascade one set of time windows after another, so long as the timeframes are compatible (the second set of windows needs to have a duration that is a multiple of the first set). So you can have a initial set of hour-long windows that is keyed by the driverId
and use this to create a stream of (endOfHourTimestamp, driverId, totalTips)
, and then follow this with another hour-long window (this window is not keyed) that finds the record from the first window with the maximum totalTips
.
Reference solutions are available at GitHub:
- Table/SQL:
org.apache.flink.training.solutions.hourlytips.HourlyTipsTableSolution
- Java:
org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution
- Scala:
org.apache.flink.training.solutions.hourlytips.scala.HourlyTipsSolution