-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy patheQRawToParquet.py
34 lines (26 loc) · 872 Bytes
/
eQRawToParquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import os
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 pyspark-shell'
# Create a SparkSession
spark = SparkSession.builder.appName("earthquakeRawToParquet").getOrCreate()
spark
KAFKA_SERVER = "localhost:9092"
KAFKA_TOPIC = "earthquakeRaw"
# Read from Kafka using the Kafka source
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_SERVER) \
.option("subscribe", KAFKA_TOPIC) \
.option("failOnDataLoss", "false") \
.load()
EQ_DIR = "/home/ubuntu/parquet-files/eq/"
EQ_CKPT_DIR = "/home/ubuntu/ckpt-files/eq"
query = lines \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", EQ_DIR) \
.option("checkpointLocation", EQ_CKPT_DIR) \
.start()
query.awaitTermination()