docker-compose up -d
docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
CREATE SOURCE CONNECTOR DOGS WITH (
'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'='false',
'genkp.dogs.with' = '#{Internet.uuid}',
'genv.dogs.name.with' = '#{Dog.name}',
'genv.dogs.dogsize.with' = '#{Dog.size}',
'genv.dogs.age.with' = '#{Dog.age}',
'topic.dogs.throttle.ms' = 1000
);
CREATE STREAM DOGS (ID STRING KEY, NAME STRING, DOGSIZE STRING, AGE STRING)
WITH (KAFKA_TOPIC='dogs', VALUE_FORMAT='JSON');
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE DOGS_BY_SIZE AS
SELECT DOGSIZE AS DOG_SIZE, COUNT(*) AS DOGS_CT
FROM DOGS WINDOW TUMBLING (SIZE 15 MINUTE)
GROUP BY DOGSIZE;
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END,
DOG_SIZE, DOGS_CT
FROM DOGS_BY_SIZE
WHERE DOG_SIZE='medium';