This Structured Streaming application shows how to read Kafka stream and calculate word frequencies over one-minute window intervals.
Word count problem is a classical example used in Structured Streaming Programming Guide
- Refer to section on how to set the Prerequisites before you deploy the application locally Setup Spark locally.
- Ensure your tenancy is configured according to the Data Flow onboard instructions. Getting Started with Data Flow
- Ensure that Data Flow Spark Streaming configuration is also in place. Getting Started with Spark Streaming
- Setup OSS Kafka instance. See Getting Started with Spark Streaming
- Prepared /producer/oss-producer-from-file.py for work, download source text and update constants with relevant information.
- Prepare command line for local and Data Flow based run:
usage: streaming_aggregate.py [-h] [--auth-type AUTH_TYPE]
[--bootstrap-port BOOTSTRAP_PORT]
[--bootstrap-server BOOTSTRAP_SERVER]
[--checkpoint-location CHECKPOINT_LOCATION]
[--encryption ENCRYPTION] [--ocid OCID]
[--output-location OUTPUT_LOCATION]
[--output-mode OUTPUT_MODE]
[--stream-password STREAM_PASSWORD]
[--raw-stream RAW_STREAM]
[--stream-username STREAM_USERNAME]
optional arguments:
-h, --help show this help message and exit
--auth-type AUTH_TYPE
--bootstrap-port BOOTSTRAP_PORT
--bootstrap-server BOOTSTRAP_SERVER
--checkpoint-location CHECKPOINT_LOCATION
--encryption ENCRYPTION
--ocid OCID
--output-location OUTPUT_LOCATION
--output-mode OUTPUT_MODE
--stream-password STREAM_PASSWORD
--raw-stream RAW_STREAM
--stream-username STREAM_USERNAME
-
Provide your dependency using any of the below suitable option.
- Use
--packages
option orspark.jars.packages
spark configuration. Application running in private endpoint has to allow traffic from private subnet to internet to download package (confirm with PM). - Provide object storage jar location in
--jars
orspark.jars
as comma seperated list. - Use
python/structured_streaming_java_dependencies_for_python
createarchive.zip
.
- Use
-
First start Structured Streaming app (StructuredKafkaWordCount) locally or in the cloud.
-
Second start data producer oss-producer-from-file.py (
python3 oss-producer-from-file.py
) locally or in the cloud.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 ./StructuredKafkaWordCount.py --raw-stream <kafka topic> --bootstrap-server <kafka bootstrap server> --checkpoint-location /tmp/checkpoint --output-location /tmp/output --stream-username <tenancy name>/<user name>/<stream pool id> --stream-password <user security token> --output-mode console
More info on spark-submit Submitting Applications and what is supported by Data Flow Spark-Submit Functionality in Data Flow
oci data-flow application create \
--compartment-id <compartment_ocid> \
--display-name "StructuredKafkaWordCount" \
--driver-shape VM.Standard2.1 \
--executor-shape VM.Standard2.1 \
--num-executors 1 \
--spark-version 3.0.2 \
--type streaming \
--file-uri "oci://<bucket>@<namespace>/StructuredKafkaWordCount.py" \
--archive-uri "oci://<bucket>@<namespace>/archive.zip" \
--language Python
--class-name example.StructuredKafkaWordCount
--arguments "--raw-stream <kafka topic> --bootstrap-server <kafka bootstrap server> --checkpoint-location oci://<bucket>@<namespace>/checkpoint --output-location oci://<bucket>@<namespace>/output --stream-username <tenancy name>/<user name>/<stream pool id> --stream-password <user security token> --output-mode csv"
Make note of the Application ID produced.
oci data-flow run create \
--compartment-id <compartment_ocid> \
--application-id <application_ocid> \
--display-name "CSV to Parquet Java"
Arguments can be updated to switch from plain password authentication with Kafka to Data Flow Resource Principal which is more suitable for production scenarios
e.g.
--arguments "--raw-stream <kafka topic> --bootstrap-server <kafka bootstrap server> --checkpoint-location oci://<bucket>@<namespace>/checkpoint --output-location oci://<bucket>@<namespace>/output --ocid <stream pool id> --output-mode csv"
For more details on OCI CLI configuration options see OCI CLI Command Reference