Skip to content

Commit

Permalink
Merge pull request #11 from streamshub/ocp-patch
Browse files Browse the repository at this point in the history
Patch flink deployment to run on openshift
  • Loading branch information
kornys authored Aug 21, 2024
2 parents 683dd5e + 52480e5 commit 63ec319
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion recommendation-app/flink-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@ spec:
volumeMounts:
- name: product-inventory-vol
mountPath: /opt/flink/data
- mountPath: /opt/flink/log
name: flink-logs
- mountPath: /opt/flink/artifacts
name: flink-artifacts
volumes:
- name: product-inventory-vol
configMap:
name: product-inventory
items:
- key: productInventory.csv
path: productInventory.csv
- emptyDir: {}
name: flink-logs
- emptyDir: {}
name: flink-artifacts
jobManager:
resource:
memory: "2048m"
Expand All @@ -38,4 +46,4 @@ spec:
jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar
args: ["CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) WITH ( 'connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ); CREATE TABLE ClickStreamTable ( user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', 'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = 'http://apicurio-registry-api.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = 'latest-offset' ); CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', 'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = 'http://apicurio-registry-api.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = 'latest-offset' ); CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, `event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'flink.recommended.products', 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.client.id' = 'recommended-products-producer-client', 'properties.transaction.timeout.ms' = '800000', 'key.format' = 'csv', 'value.format' = 'csv', 'value.fields-include' = 'ALL' ); CREATE TEMPORARY VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id; CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, p.category, p.stock, p.rating, sr.user_id as purchased FROM clicked_products cp JOIN ProductInventoryTable AS p ON cp.category = p.category LEFT JOIN SalesRecordTable sr ON cp.user_id = sr.user_id AND p.product_id = sr.product_id WHERE p.stock > 0 GROUP BY p.product_id, p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating; CREATE TEMPORARY VIEW top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, cp.stock, cp.rating, cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id ORDER BY cp.purchased DESC, cp.rating DESC) AS rn FROM category_products cp; INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') AS top_product_ids, TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' SECOND);"]
parallelism: 1
upgradeMode: stateless
upgradeMode: stateless

0 comments on commit 63ec319

Please sign in to comment.