-
Notifications
You must be signed in to change notification settings - Fork 6
/
consume.scala
125 lines (105 loc) · 4.62 KB
/
consume.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.commons.configuration.Configuration
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.kafka010._
class consume {
def run(conf: Configuration) = {
val spark = SparkSession.builder()
.appName("kafka.avro.consumer")
.getOrCreate()
val schemaRegistry = new CachedSchemaRegistryClient(conf.getString("schemaRegistry.url"), 1000)
val m = schemaRegistry.getLatestSchemaMetadata(conf.getString("schemaRegistry.subject"))
val schemaId = m.getId
val schema = schemaRegistry.getById(schemaId)
// Kafka configuration
// The Kafka topic(s) to read from
val topics = Array(conf.getString("kafka.topics"))
// Batching interval when reading
val batchInterval = 2
// A function that creates a streaming context
def createStreamingContext(): StreamingContext = {
// Create a new StreamingContext from the default context.
val ssc = new StreamingContext(spark.sparkContext, Seconds(batchInterval))
// Kafka parameters when reading
// auto.offset.reset = 'earliest' reads from the beginning of the queue
// Set to 'latest' to only receive new messages as they are added to the queue.
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf.getString("kafka.brokers"),
"key.deserializer" -> classOf[KafkaAvroDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"schema.registry.url" -> conf.getString("schemaRegistry.url")
)
// Create the stream from Kafka
val messageStream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, GenericRecord](topics, kafkaParams)
)
// Get only the tweets (in deserialized Avro format)
val tweetsAvro = messageStream.map(record => record.value)
// Convert the records to dataframes, so we can select interesting values
tweetsAvro.foreachRDD {
rdd =>
// because sometimes there's not really an RDD there
if (rdd.count() >= 1) {
val tweetObj = rdd.map(
v => {
Row.fromSeq(List[Any](
v.get("id"),
v.get("createdAt"),
v.get("lang").toString, // Type is org.apache.avro.util.Utf8
v.get("retweetCount"),
v.get("text").toString,
v.get("location").toString
))
})
val schemaStructType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
val tweetRaw = spark.createDataFrame(tweetObj, schemaStructType)
val tweetInfo = tweetRaw
.withColumn("createdAt", from_unixtime(col("createdAt").divide(1000)))
.withColumn("year", year(col("createdAt")))
.withColumn("month", month(col("createdAt")))
.withColumn("day", dayofmonth(col("createdAt")))
// Show 5 in the console
tweetInfo.show(5)
// Append to Parquet
tweetInfo
.write
.partitionBy("year", "month", "day")
.mode(SaveMode.Append)
.save(conf.getString("spark.output"))
}
}
// Tell the stream to keep the data around for a minute, so it's there when we query later
ssc.remember(Minutes(1))
// Checkpoint for fault-tolerance
// ssc.checkpoint("/tweetcheckpoint")
// Return the StreamingContext
ssc
}
// Stop any existing StreamingContext
val stopActiveContext = true
if (stopActiveContext) {
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
}
// Get or create a StreamingContext
val ssc = StreamingContext.getActiveOrCreate(createStreamingContext)
// This starts the StreamingContext in the background.
ssc.start()
// Set the stream to run with a timeout of batchInterval * 60 * 1000 seconds
ssc.awaitTerminationOrTimeout(batchInterval * 60 * 1000)
}
}