diff --git a/src/main/java/io/streams/constants/FlinkConstants.java b/src/main/java/io/streams/constants/FlinkConstants.java index c8bc27a..76f3eae 100644 --- a/src/main/java/io/streams/constants/FlinkConstants.java +++ b/src/main/java/io/streams/constants/FlinkConstants.java @@ -13,14 +13,14 @@ public interface FlinkConstants { "WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + "'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " + "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " + - "'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " + + "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " + "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + "'latest-offset' ); CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, " + "quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + "WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + "'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = " + "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', " + - "'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " + + "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " + "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + "'latest-offset' ); CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, " + "`event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', " + diff --git a/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java b/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java index 838e248..8ab69e1 100644 --- a/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java +++ b/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java @@ -46,6 +46,7 @@ public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, St .addNewContainer() .withName("flink-main-container") .withImage("quay.io/streamshub/flink-sql-runner:latest") + .withImagePullPolicy("Always") .addNewVolumeMount() .withName("product-inventory-vol") .withMountPath("/opt/flink/data") @@ -94,7 +95,7 @@ public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, St .endTaskmanagerResource() .endTaskManager() .withNewJob() - .withJarURI("local:///opt/flink/usrlib/flink-sql-runner.jar") + .withJarURI("local:///opt/streamshub/flink-sql-runner.jar") .withParallelism(1L) .withUpgradeMode(Job.UpgradeMode.stateless) .withArgs(args)