Robin Moffatt <[email protected]> 04 Jun 2018
-
Custom KSQL build: https://github.com/rmoff/ksql-fork-with-deep-learning-function
-
Confluent Platform 4.1: https://www.confluent.io/download/
-
kafkacat
Run a screen
session with Elasticsearch & Kibana:
cd ~/git/ksql-fork-with-deep-learning-function
screen -d -m -S CP_demo -t bash watch date
screen -S CP_demo -p 0 -X screen -t Elasticsearch elasticsearch
screen -S CP_demo -p 0 -X screen -t Kibana kibana
Start Confluent Platform 4.1 → DO NOT RUN KSQL FROM HERE
confluent start connect
(This will also start Zookeeper, Kafka, Schema Registry, and REST proxy)
Create Kafka Connect sinks to stream the three topics (source/scored/alerts) to Elasticsearch:
./create_es_sinks.sh
Create topic (cannot register KSQL stream against it otherwise):
kafka-topics \
--zookeeper localhost:2181 \
--create \
--topic HealthSensorInputTopic \
--partitions 1 \
--replication-factor 1
Run the custom KSQL server build:
screen -S CP_demo -p 0 -X screen -t KSQL-server ./bin/ksql-server-start config/ksql-server.properties
Connect to the screen session (screen -x
) — the KSQL-server
screen should say:
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017 Confluent Inc.
Server 5.0.0-SNAPSHOT listening on http://localhost:8088
To access the KSQL CLI, run:
ksql http://localhost:8088
To access the UI, point your browser at:
http://localhost:8088/index.html
Run the KSQL CLI:
screen -S CP_demo -p 0 -X screen -t alert-topic kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic ANOMALYDETECTIONBREACH --from-beginning | jq '.'
screen -S CP_demo -p 0 -X screen -t KSQL-CLI ./bin/ksql http://localhost:8088
The KSQL-CLI
screen should say:
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017 Confluent Inc.
CLI v5.0.0-SNAPSHOT, Server v5.0.0-SNAPSHOT located at http://localhost:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
All components should be running, except ksql-server:
Robin@asgard02 ~/g/ksql-fork-with-deep-learning-function> confluent status
Using CONFLUENT_CURRENT: /var/folders/q9/2tg_lt9j6nx29rvr5r5jn_bw0000gp/T/confluent.zCpaKO3W
control-center is [DOWN]
ksql-server is [DOWN]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]
Check connectors are running:
$ curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
es_sink_anomaly | RUNNING | RUNNING
es_sink_anomaly_alerts | RUNNING | RUNNING
es_sink_raw_events | RUNNING | RUNNING
Check that the source topic exists
kafka-topics --zookeeper localhost:2181 --list|grep HealthSensorInputTopic
Check Kibana dashboard works and has some historical data on which to fall back if demo fails.
In KSQL, register the source topic, and create a derived stream converting it into AVRO:
CREATE STREAM healthsensor (eventid integer, sensorinput varchar) WITH (kafka_topic='HealthSensorInputTopic', value_format='DELIMITED');
CREATE STREAM SENSOR_RAW WITH (PARTITIONS=1,VALUE_FORMAT='AVRO') AS SELECT * FROM HEALTHSENSOR;
Show that there are streams defined, columns created - and then set a continuous SELECT
query running, using the ANOMALY
function to apply the ML model to the input data streams:
SHOW STREAMS;
DESCRIBE healthsensor;
SELECT eventid, anomaly(SENSORINPUT) from healthsensor;
No input yet, so no output.
Write some data to the source topic, using kafkacat
:
echo -e "99999,2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08" | kafkacat -b localhost:9092 -P -t HealthSensorInputTopic
Note that the SELECT
output now shows the scored input. Persist this scored data:
CREATE STREAM AnomalyDetection WITH (PARTITIONS=1, VALUE_FORMAT='AVRO') AS \
SELECT eventid, sensorinput, \
CAST (anomaly(sensorinput) AS DOUBLE) as Anomaly \
FROM healthsensor;
Query the derived stream:
SELECT EVENTID, ANOMALY FROM AnomalyDetection;
Send some more data to the topic
echo -e "33333, 6.90#6.89#6.86#6.82#6.78#6.73#6.64#6.57#6.50#6.41#6.31#6.22#6.13#6.04#5.93#5.85#5.77#5.72#5.65#5.57#5.53#5.48#5.42#5.38#5.35#5.34#5.30#5.27#5.25#5.26#5.24#5.21#5.22#5.22#5.22#5.20#5.19#5.20#5.20#5.18#5.19#5.19#5.18#5.15#5.13#5.10#5.07#5.03#4.99#5.00#5.01#5.06#5.14#5.31#5.52#5.72#5.88#6.09#6.36#6.63#6.86#7.10#7.34#7.53#7.63#7.64#7.60#7.38#6.87#6.06#5.34#5.03#4.95#4.84#4.69#4.65#4.54#4.49#4.46#4.43#4.38#4.33#4.31#4.28#4.26#4.21#4.19#4.18#4.15#4.12#4.09#4.08#4.07#4.03#4.01#4.00#3.97#3.94#3.90#3.90#3.89#3.85#3.81#3.81#3.79#3.77#3.74#3.72#3.71#3.70#3.67#3.66#3.68#3.67#3.66#3.67#3.69#3.71#3.72#3.75#3.80#3.85#3.89#3.95#4.03#4.06#4.18#4.25#4.36#4.45#4.54#4.60#4.68#4.76#4.83#4.86#4.91#4.95#4.97#4.98#5.00#5.04#5.04#5.05#5.03#5.06#5.07#5.06#5.05#5.06#5.07#5.07#5.06#5.06#5.07#5.07#5.06#5.07#5.07#5.08#5.06#5.06#5.08#5.09#5.09#5.10#5.11#5.11#5.10#5.10#5.11#5.12#5.10#5.06#5.07#5.06#5.05#5.02#5.02#5.02#5.01#4.99#4.98#5.00#5.00#5.00#5.02#5.03#5.03#5.01#5.01#5.03#5.04#5.02#5.01#5.02#5.04#5.02#5.02#5.03#5.04#5.03#5.03#5.02#5.04#5.04#5.03#5.03#5.05#5.04" | kafkacat -b localhost:9092 -P -t HealthSensorInputTopic
Note the newly scored output from the SELECT
query. Now go back in time, and query all data from the source topic:
Generate some random data:
./bin/ksql-datagen schema=EcdSensorData.avro format=delimited topic=HealthSensorInputTopic key=eventid maxInterval=1000
View Kibana dashboard to see raw data + scored data:
Set KSQL to query all data from beginning of topic, and use a predicate to filter out scored values above a given threshold:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM AnomalyDetection \
WHERE Anomaly >4.3;
Persist a derived stream that will drive alerts where the anomaly is over a given threshold:
CREATE STREAM AnomalyDetectionBreach WITH (PARTITIONS=1) AS \
SELECT * FROM AnomalyDetection \
WHERE Anomaly >4.3;
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic ANOMALYDETECTIONBREACH --from-beginning | jq '.'
View Kibana dashboard to see raw data, scored, and alerts: