Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Kate Stanley <[email protected]>
  • Loading branch information
tomncooper and katheris authored Jan 27, 2025
1 parent 3ddd5f7 commit 04b7364
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions recommendation-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ In Flink SQL _Tables_ represent external data and allow us to query and manipula
### Product Inventory Data

In order to recommend products in the same category and know if they are in stock, we need a table linking a product id to its category and stock level.
We also need to know the average rating user give these products to know if they are worth recommending.
We also need to know the average rating users give these products to know if they are worth recommending.
We could use a connection to a relational database to do this.
However, to keep things simple, we can use a CSV file containing the data.

Expand Down Expand Up @@ -72,16 +72,16 @@ In the [Running the Application](#running-the-application) section we see how th

#### Kafka Connectors

In order to perform the recommendation we need to know what customers are clicking on (the clickstream) and what they have brought (sales records).
Both of these are housed in Kafka topics and so to access the data they hold we need to create Flink SQL tables.
In order to perform the recommendation we need to know what customers are clicking on (the clickstream) and what they have bought (sales records).
Both of these are housed in Kafka topics, therefore to access the data they hold we need to create Flink SQL tables.
The StreamsHub [Flink SQL Runner](https://github.com/streamshub/flink-sql) project provides an image that already contains the Flink Kafka connector and its corresponding Flink SQL Table connector.
Using that image (see the [Running the Application](#running-the-application) section for more details) means we can create tables to read from Kafka topics directly into Flink SQL.

#### Kafka Topic Schemas

However, to Kafka the data inside its topics are just bytes, it does not tell us (or Flink SQL) what those bytes mean.
Data in Kafka topics are just bytes, Kafka cannot tell us (or Flink SQL) what those bytes mean.
For that we need to know the schema of the messages within the topic.
Thankfully, the thoughtful people who designed the data generation application which publishes the click stream and sales records, have produced [Avro](https://avro.apache.org/) schema files ([click stream schema](https://github.com/streamshub/flink-sql-examples/blob/main/data-generator/src/main/resources/clickStream.avsc), [sales records schema](https://github.com/streamshub/flink-sql-examples/blob/main/data-generator/src/main/resources/sales.avsc)), which tell what each message on those topics contain.
Thankfully, the thoughtful people who designed the data generation application which publishes the click stream and sales records, have produced [Avro](https://avro.apache.org/) schema files ([click stream schema](https://github.com/streamshub/flink-sql-examples/blob/main/data-generator/src/main/resources/clickStream.avsc), [sales records schema](https://github.com/streamshub/flink-sql-examples/blob/main/data-generator/src/main/resources/sales.avsc)), which tell us what each message on those topics contain.
These schemas have then been registered with an instance of the [Apicurio Registry](https://www.apicur.io/registry/).
Flink's Kafka connector can read Avro Schema from the Confluent Schema Registry (by including the `flink-sql-avro-confluent-registry` library, which the StreamsHub Flink SQL Runner image does) and the Apicurio Registry has a Confluent compatible API.
Therefore, using the StreamsHub Flink SQL Runner image, we can use `avro-confluent` as the value format and supply the address of the Apicurio Registry in our Flink Kafka Connector configuration and Flink will be able to pull the correct schema for each topic and deserialize the messages automatically.
Expand Down Expand Up @@ -169,7 +169,7 @@ Lets remind ourselves of what the pipeline needs to recommend:
- Products that the customer has bought before should be favoured

We start with the what the customer is currently viewing, the clickstream.
But that stream only contains the user ID and the product ID, we need to know the category the product belongs too.
But that stream only contains the user ID and the product ID, we need to know the category the product belongs to.
To do that, we can join the click stream table with the product inventory table to pull out the category.

```sql
Expand All @@ -187,11 +187,11 @@ Views provide a way to reference the results of a query.
In the statement above we are assigning the results of the join between the clickstream and product inventory to the `clicked_products` view, so we can reference it in later queries.
The `TEMPORARY` clause in the `CREATE` statement refers to how Flink will persist the metadata about this view and is linked to Flink Catalogs that we won't cover here (you can read more about Catalogs in the [docs](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/catalogs/)).

Now we have the product category information in the clickstream we need to get all products, in that category, that are in stock and mark weather the user purchased them or not.
Now we have the product category information in the clickstream we need to get all products, in that category, that are in stock and mark whether the user purchased them or not.
To do that we can `JOIN` the enriched clickstream on the product inventory table, where the categories match, to get all products in that category.
This allows us to also pull in the stock level and ratings.
We can then take this set of products from the same category and `LEFT JOIN` on the sales records table.
This will pass through all entries from the category, weather they have been purchased or not and allow us to have a new `purchased` column for any products the the user has brought in the past.
This will pass through all entries from the category, whether they have been purchased or not and allow us to have a new `purchased` column for any products the the user has bought in the past.
We can then filter out any items that are not in stock.

We do the above operations using the statement below and create a `VIEW` for the results:
Expand Down Expand Up @@ -266,9 +266,9 @@ TUMBLE(event_time, INTERVAL '5' SECOND)

### Saving the results

Now that we can generate the product recommendations we need to save that data somewhere.
The easiest option is to output the result back to Kafka so that some other service can pick them up and present them to the user.
To do this we need to create a output Kafka table.
Once we have generated the product recommendations, we need to save that data somewhere.
The easiest option is to output the results back to Kafka so that some other service can pick them up and present them to the user.
To do this we need to create an output Kafka table.
The Kafka connector included in the Flink SQL Runner image allows us to do this by defining the `connector` as `upsert-kafka` in the `WITH` clause of the `CREATE TABLE` statement.
For simplicity sake, we will just output the recommendations as CSV data (you could also define an Avro schema and upload that to the Apicurio registry).

Expand Down

0 comments on commit 04b7364

Please sign in to comment.