A practical exercise introducing the TestTopologyDriver to drive development of a streaming compute application.
Use in unison with TW AK3W Workshop, a solution to this exercise is available here.
This project is also available in Java.
Number Stations are shortwave radio stations that broadcast formatted numbers, which are believed to be addressed to intelligence officers operating in foreign countries.
We created a radio that captured ~3hrs of mysterious global Number Station broadcasts, taking the form of 1.5M messages from 541 Number Stations world-wide. The radio also captured some spurious messages of no interest.
Can we filter, branch, translate, group, window, and aggregate these messages to decode the hidden message?
- Visit https://www.oracle.com/technetwork/java/javase/downloads/index.html
- This project requires Java 11+
If you are having problems with docker similar to the following error messages, try restarting docker:
driver failed programming external connectivity on endpoint
-----
input/output error
Using troy-west/apache-kafka-cli-tools:
Start a 3-node Kafka Cluster and enter a shell with all kafka-tools scripts:
docker-compose down
docker-compose up -d
docker-compose -f docker-compose.tools.yml run kafka-tools
In a new terminal, view the running kafka logs:
docker-compose logs -f
Create a new Topic 'radio-logs' with 12 partitions and RF=3:
# ./bin/kafka-topics.sh --bootstrap-server kafka-1:19092 --create --topic radio-logs --partitions 12 --replication-factor 3
Confirm the new topic has been created:
# ./bin/kafka-topics.sh --bootstrap-server kafka-1:19092 --list
radio-logs
Describe the new topic:
# ./bin/kafka-topics.sh --bootstrap-server kafka-1:19092 --describe --topic radio-logs
Topic:radio-logs PartitionCount:12 ReplicationFactor:3 Configs:
Topic: radio-logs Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: radio-logs Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: radio-logs Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: radio-logs Partition: 3 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: radio-logs Partition: 4 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: radio-logs Partition: 5 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: radio-logs Partition: 6 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: radio-logs Partition: 7 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: radio-logs Partition: 8 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: radio-logs Partition: 9 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: radio-logs Partition: 10 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: radio-logs Partition: 11 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Take a look at the data in each partition (initially empty)
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-1:19092 --topic radio-logs --time -1
radio-logs:0:0
radio-logs:1:0
radio-logs:2:0
radio-logs:3:0
radio-logs:4:0
radio-logs:5:0
radio-logs:6:0
radio-logs:7:0
radio-logs:8:0
radio-logs:9:0
radio-logs:10:0
radio-logs:11:0
At any time, run all the project tests with lein test
The NullPointerExceptions that you initially see are simply because you haven't implemented certain things yet.
Take a look at a sample of twenty intercepted messages:
(require '[numbers.radio :as radio])
=> nil
(radio/sample)
=>
({:time 1557125670764, :type "ENG", :name "315", :long 22, :lat 7, :content ["one"]}
{:time 1557125670765, :type "UXX", :name "X-RAY"}
{:time 1557125670776, :type "ENG", :name "150", :long -60, :lat -20, :content ["one"]}
{:time 1557125670777, :type "UXX", :name "X-RAY"}
{:time 1557125670781, :type "MOR", :name "371", :long 50, :lat 16, :content [".----"]}
{:time 1557125670787, :type "GER", :name "133", :long -68, :lat -22, :content ["eins"]}
{:time 1557125670787, :type "MOR", :name "392", :long 61, :lat 20, :content [".----"]}
{:time 1557125670789, :type "ENG", :name "315", :long 22, :lat 7, :content ["one"]}
{:time 1557125670798, :type "ENG", :name "417", :long 73, :lat 24, :content ["one"]}
{:time 1557125670799, :type "ENG", :name "NZ1", :long 166, :lat -78, :content ["one"]}
{:time 1557125670799, :type "GER", :name "073", :long -98, :lat -32, :content ["eins"]}
{:time 1557125670801, :type "ENG", :name "150", :long -60, :lat -20, :content ["one"]}
{:time 1557125670806, :type "MOR", :name "371", :long 50, :lat 16, :content [".----"]}
{:time 1557125670812, :type "GER", :name "133", :long -68, :lat -22, :content ["eins"]}
{:time 1557125670812, :type "MOR", :name "392", :long 61, :lat 20, :content [".----"]}
{:time 1557125670814, :type "ENG", :name "315", :long 22, :lat 7, :content ["one"]}
{:time 1557125670823, :type "ENG", :name "417", :long 73, :lat 24, :content ["one"]}
{:time 1557125670824, :type "ENG", :name "NZ1", :long 166, :lat -78, :content ["one"]}
{:time 1557125670824, :type "GER", :name "073", :long -98, :lat -32, :content ["eins"]}
{:time 1557125670824, :type "ENG", :name "429", :long 79, :lat 26, :content ["one"]})
We have messages of type English, German, and Morse Code. There are also some spurious of type 'UXX'.
In order to create a producer that sends clj datastructures to Kafka first we create the Serializer and Deserializer types.
- Get numbers.serde-test.* passing
Do we throw or swallow exceptions in the serialization classes, and why?
Create a new KafkaProducer and send each message returned by (radio/listen) to the radio-logs topic.
Once implemented, call the function to produce the full broadcast to the radio-logs topic
(radio/produce)
=> nil
Once produced, take another look at the partitions and offsets of the radio-logs topic:
# ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-1:19092 --topic radio-logs --time -1
radio-logs:0:121272
radio-logs:1:70920
radio-logs:2:125806
radio-logs:3:114417
radio-logs:4:102927
radio-logs:5:102933
radio-logs:6:107544
radio-logs:7:114376
radio-logs:8:93776
radio-logs:9:123552
radio-logs:10:77787
radio-logs:11:82374
The distribution is a bit lumpy, more messages in partition 9 than partition 8 for instance. Why is that?
We want our messages to be interpreted at the time they declare in the :time field rather than producer or log time. At this point we cover the different concepts of time in Kafka, and the big idea of deterministic recomputabiity. Broadly similar to favouring pure functions without side-effects in a functional programming sense.
- Get numbers.compute-test.test-timestamp-extraction passing
Why should you never return a static number (like 0L) from the extractor? Does it impact compaction, deletion, etc?
We provide a translate ns that can decode individual messages from German, English, and Morse Code into numeric.
Use streams.filter(...)
and translate.known?(message)
to filter out unknown messages.
- Get numbers.compute-test.test-filter-known passing
We are told that Scott Base is a special station that should be considered independently.
Use streams.branch(...)
to split the filtered stream in two. Scott Base is the only station below -75 latitude.
- Get numbers.compute-test.test-branch-rest-of-world passing
- Get numbers.compute-test.test-branch-scott-base passing
Use streams.map(...)
or streams.mapValues(...)
to translate the message stream with translate/translate(...)
- Get numbers.compute-test.test-translate passing
Why do we prefer streams.mapValues in this case? What is the consequence of using map?
We are told that each station produces three messages in every 10s tumbling time window.
Group the translated stream by Station, window the resultant stream by 10s time windows, and aggregate that grouped, windowed stream such that the three messages in a time window are reduced into a single message with three numbers as the content.
In this case we use streams.groupByKey()
, streams.windowBy(...)
, and streams.aggregate(...)
.
Make sure the KTable that results from the aggregation is materialized as 'PT10S-Store'.
- Get numbers.compute-test.test-correlate passing
You have implemented the Kafka Streams Topology described in the streaming compute presentation slides
It turns out that there are 540 number stations, and they each broadcast 960 tuples of three numbers.
Those three numbers are RGB values, and we can reconstitute a 540px by 960px png image from the stream.
You can run that topology against your local cluster:
(require '[numbers.system :as system])
=> nil
(system/start! 8080)
13:03:17.022 WARN [nRepl-session-fb0fc988-fd38-401f-96a1-3569310c8901] o.a.k.c.consumer.ConsumerConfig – The configuration 'admin.retries' was supplied but isn't a known config.
13:03:17.027 WARN [nRepl-session-fb0fc988-fd38-401f-96a1-3569310c8901] o.a.k.c.consumer.ConsumerConfig – The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
Serving on port 8080
Navigate to localhost:8080 to inspect the decoded message! It may take a minute to process, and you may want to reload the page to see progress.
While the logs are being computed you can check on progress by looking at the offsets of the consumer group
./bin/kafka-consumer-groups.sh --bootstrap-server kafka-1:19092 --group compute-radio-logs --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
radio-logs 11 27541 103504 75963 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 4 27670 129498 101828 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 10 27447 97830 70383 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 6 18714 135357 116643 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 3 18729 144000 125271 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 8 27688 117879 90191 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 0 18735 152640 133905 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 7 21479 143891 122412 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 1 18390 89205 70815 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 2 27770 158125 130355 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 9 18719 155520 136801 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
radio-logs 5 27747 129372 101625 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer-c21b4661-18fb-422b-981c-38b0615950c1 /172.27.0.1 compute-radio-logs-5a520771-acfe-49d4-bf44-8976ff8d23cf-StreamThread-1-consumer
As you progress through this project you can always reset the consumer offets like so:
./bin/kafka-consumer-groups.sh --bootstrap-server kafka-1:19092 --group compute-radio-logs --reset-offsets --to-earliest --execute --topic radio-logs
What happens when you run more than one application (say on ports 8081, 8082, 8083), and why?
lein uberjar
- java -jar target/apache-kafka-java-number-stations-1.0-SNAPSHOT-jar-with-dependencies.jar 8080 &
- java -jar target/apache-kafka-java-number-stations-1.0-SNAPSHOT-jar-with-dependencies.jar 8081 &
What image is displayed on each port, and why? What happens to local KTable state if you start more instances?
It's a case for Interactive Queries!
The decoded image doesn't quite match the source (src/main/resources/source.png).
That's because Scott Base is special. It broadcasts a rotation factor for each time window, e.g.
- 0 - don't rotate any of the three numbers.
- 1 - rotate the numbers left one ([111 222 333] becomes [222 333 111]).
- 2 - rotate each number left two ([111 222 333] becomes [333 111 222]).
If we use Scott Base as a further cipher we could aggregate that cipher to a different ktable, then join the stream of correlated messages with that, rotating where appropriate. This is left up to the adventurous (and may require dropping into the Processor API)
If you do complete the extension problem, please do raise a PR to the solution project!
Copyright © 2019 Troy-West, Pty Ltd. MIT Licensed.