- Supports only Writes in real-time ingestion mode.
- Under the hood - the druid-io/tranquility connector is used for spark-realtime ingestion.
- Reads - Not implemented yet
The following hive table points to a Druid
CREATE EXTERNAL TABLE `pcatalog.druid_testing`(
`id` string,
`type` string,
`time_updated` string
)
LOCATION
'hdfs:///tmp/gimel/pcatalog.druid_testing'
TBLPROPERTIES (
'gimel.storage.type'='druid',
'gimel.druid.zookeeper.hosts' = 'druid_zk_host_1:2181,druid_zk_host_1:2181,druid_zk_host_1:2181',
'gimel.druid.cluster.index.service' = 'druid/overlord',
'gimel.druid.cluster.discovery.path' = '/druid/discovery',
'gimel.druid.datasource.name' = 'default_datasource',
'gimel.druid.datasource.dimensions' = '["type", "time_updated"]',
'gimel.druid.datasource.metrics' = '[{"type", "hyperUnique", "field_name", "id", "name", "distinct_id"}]',
'gimel.druid.timestamp.fieldname' = 'time_updated',
'gimel.druid.timestamp.format' = 'seconds'
)
Refer Tranquility documentation for under the hood configurations
Property | Mandatory? | Description | Example | Default |
---|---|---|---|---|
gimel.druid.zookeeper.hosts | Y | The Host Name List for ZK | druid_zk_host_1:2181,druid_zk_host_2:2181 | |
gimel.druid.cluster.index.service | Y | Index Name in Druid | druid/overlord | |
gimel.druid.cluster.discovery.path | Y | The Discovery Path in Druid Cluster | /druid/discovery | |
gimel.druid.datasource.name | Y | The data source name | default_datasource | |
gimel.druid.datasource.dimensions | Y | The list of Dimensions specified in JSON format | ["type", "time_updated"] | |
gimel.druid.datasource.metrics | Y | The list of metrics in Druid Index | [{"type", "hyperUnique", "field_name", "id", "name", "distinct_id"}] | |
gimel.druid.timestamp.fieldname | Y | The Time Field | time_updated | |
gimel.druid.timestamp.format | Y | Format of the Time Field | seconds |
import com.paypal.gimel._
import org.apache.spark._
import org.apache.spark.sql._
val dataSet= DataSet(sparkSession)
val map = List(Map("id" -> 1, "type" -> "U", "time_updated" -> "10000000"))
val rdd = sc.parallelize(map)
val dataSetProps = Map("load_type" -> "realtime")
dataSet.write("pcatalog.sampleDruidData", rdd, dataSetProps)