Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to flink 1.20 based flink-sql image #42

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions interactive-etl/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Interactive ETL using Flink SQL

[Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection.
[Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection.
It allows you to access the power of Flink's distributed stream processing abilities with a familar interface.
In this tutorial we go over a simple introduction to using Flink SQL to read from a Kafka topic, perform basic queries, transform and clean data and then load that back into Kafka.

Expand Down Expand Up @@ -48,7 +48,7 @@ In order to run this example you will need:

## Interactive SQL client

The Flink distribution comes with an interactive SQL command line tool ([`sql-client.sh`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sqlclient/)) which allows you to submit SQL queries to a running Flink cluster.
The Flink distribution comes with an interactive SQL command line tool ([`sql-client.sh`](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sqlclient/)) which allows you to submit SQL queries to a running Flink cluster.
The `setup.sh` script you ran above, creates a long-running session cluster inside minikube which we can use for this purpose.

In order to access the cluster we need to allow access, from your local machine, to the job manager container running inside the minikube cluster:
Expand All @@ -68,15 +68,15 @@ The interactive SQL client also need access to these plugin libraries, you could
However, you can run the Flink SQL Runner container locally using the command below (make sure to add the `--net=host` flag so the container can see the forwarded job-manager port):

```shell
podman run -it --rm --net=host quay.io/streamshub/flink-sql-runner:v0.0.1 /opt/flink/bin/sql-client.sh embedded
podman run -it --rm --net=host quay.io/streamshub/flink-sql-runner:0.1.0 /opt/flink/bin/sql-client.sh embedded
```

If you use docker, you should be able to replace `podman` with `docker` in the command above.

## Tutorial

This tutorial will walk through some basic data exploration and ETL (Extract Transform Load) queries, using Flink SQL.
The [Flink SQL documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/) contains detailed breakdowns of the various SQL commands and query functions available.
The [Flink SQL documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/overview/) contains detailed breakdowns of the various SQL commands and query functions available.

### Source Data Table

Expand Down Expand Up @@ -120,9 +120,9 @@ CREATE TABLE SalesRecordTable (
);
```

The [`CREATE`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/create/) statement will set up a table with the defined fields within the Flink SQL client.
The [`CREATE`](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/create/) statement will set up a table with the defined fields within the Flink SQL client.
Keep in mind that nothing has actually been sent to the Flink cluster at this point, we have just setup where data will go once we run a query.
The [`WITH`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/with/) clause allows us to pass required configuration to Flink to allow it to connect to external sources.
The [`WITH`](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/with/) clause allows us to pass required configuration to Flink to allow it to connect to external sources.
In the query above we are telling Flink to use the Kafka connector to pull data from the specified Kafka cluster and topic and also to talk to Apicurio to find the Apache Avro schema for the values in the messages of that topic.

### Querying Sales Data
Expand Down Expand Up @@ -187,7 +187,7 @@ But that would require a redeployment of the producing application and maybe a d

All is not lost though, we can work around this formatting issue using Flink SQL.

There are a large number of [built-in functions](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/functions/systemfunctions/) that you can call as part of your queries, including [string manipulation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/functions/systemfunctions/#string-functions) and [type conversion](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/functions/systemfunctions/#type-conversion-functions).
There are a large number of [built-in functions](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/) that you can call as part of your queries, including [string manipulation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/#string-functions) and [type conversion](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/systemfunctions/#type-conversion-functions).
We can use those to convert the currency string as part of the query.

In the query below, we add a sub-query that does the conversion of the original values and then select from the output of that:
Expand Down
2 changes: 1 addition & 1 deletion interactive-etl/flink-session.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: FlinkDeployment
metadata:
name: session-cluster
spec:
image: quay.io/streamshub/flink-sql-runner:v0.0.1
image: quay.io/streamshub/flink-sql-runner:0.1.0
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
Expand Down
2 changes: 1 addition & 1 deletion interactive-etl/standalone-etl-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: FlinkDeployment
metadata:
name: standalone-etl
spec:
image: quay.io/streamshub/flink-sql-runner:v0.0.1
image: quay.io/streamshub/flink-sql-runner:0.1.0
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
Expand Down
2 changes: 1 addition & 1 deletion recommendation-app/flink-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: FlinkDeployment
metadata:
name: recommendation-app
spec:
image: quay.io/streamshub/flink-sql-runner:v0.0.1
image: quay.io/streamshub/flink-sql-runner:0.1.0
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
Expand Down
Loading