Skip to content

Commit

Permalink
Merge pull request #92 from basho/develop
Browse files Browse the repository at this point in the history
Release 1.3.0-beta1
  • Loading branch information
srgg committed Feb 24, 2016
2 parents 01093b1 + 39e4cca commit 8975efc
Show file tree
Hide file tree
Showing 49 changed files with 1,963 additions and 550 deletions.
29 changes: 21 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Spark Riak connector allows you to expose data stored in Riak buckets as Spark R

## Compatibility
* Compatible with Riak KV, bundled inside BDP 1.x
* Compatible with Apache Spark 1.4 or later
* Compatible with Apache Spark 1.5.2 or later
* Compatible with Scala 2.10 or later
* Compatible with Java 8

Expand All @@ -20,6 +20,8 @@ Spark Riak connector allows you to expose data stored in Riak buckets as Spark R
* Provides mapping and data conversion for JSON formatted values
* Provides ability to construct an RDD by using a 2i string index or a set of indexes
* Provides ability to construct an RDD by using a 2i range query or a set of ranges
* Provides ability to perform range queries over Riak TS bucket
* Provides ability to construct Dataframes from Riak TS bucket


## Building
Expand Down Expand Up @@ -94,13 +96,16 @@ from the spark-shell and set within the $SPARK_HOME/conf/spark-default.conf.

The following options are available on `SparkConf` object:

Property name | Description | Default value
-----------------------------------------------|---------------------------------------------------|--------------------
spark.riak.connection.host | IP:port of a Riak node protobuf interface | 127.0.0.1:8087
spark.riak.connections.min | Minimum number of parallel connections to Riak | 10
spark.riak.connections.max | Maximum number of parallel connections to Riak | 30
spark.riak.input.fetch-size | Number of keys to fetch in a single round-trip to Riak | 1000

Property name | Description | Default value | Riak Type
-----------------------------------------------|---------------------------------------------------|--------------------|-------------
spark.riak.connection.host | IP:port of a Riak node protobuf interface | 127.0.0.1:8087 | KV/TS
spark.riak.connections.min | Minimum number of parallel connections to Riak | 10 | KV/TS
spark.riak.connections.max | Maximum number of parallel connections to Riak | 30 | KV/TS
spark.riak.input.fetch-size | Number of keys to fetch in a single round-trip to Riak | 1000 | KV
spark.riak.input.split.count | Desired minimum number of Spark partitions to divide the data into | 10| KV
spark.riak.output.wquorum | Quorum value on write | 1 | KV
spark.riak.connections.inactivity.timeout | Time to keep connection to Riak alive in milliseconds | 1000 | KV/TS
spark.riakts.bindings.timestamp | To treat/convert Riak TS timestamp columns either as a Long (UNIX milliseconds) or as a Timestamps during the automatic schema discovery. Valid values are: <ul><li>useLong</li><li>useTimestamp</li><ul> | useTimestamp | TS

Example:

Expand Down Expand Up @@ -193,6 +198,14 @@ Riak TS buckets can be queried using sql() function:
val rdd = sc.riakTSBucket(tableName).sql(s"SELECT * FROM $tableName WHERE time >= $from AND time <= $to")
```

### Saving rdd to Riak TS bucket

Existing rdd of org.apache.spark.sql.Row> can be saved to Riak TS as follows

```scala
rdd.saveToRiakTS(TABLE_NAME);
```

## Examples

Riak Spark connector comes with several sample programs and demos that can be found in the [**examples** folder](./examples)
Expand Down
24 changes: 21 additions & 3 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@
<parent>
<groupId>com.basho.riak</groupId>
<artifactId>spark-riak-connector-parent</artifactId>
<version>1.2.0-beta1</version>
<version>1.3.0-beta1</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>spark-riak-connector</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.major.version}</artifactId>
Expand Down Expand Up @@ -40,6 +48,16 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
Expand Down Expand Up @@ -90,15 +108,15 @@
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<shadedClassifierName>uber</shadedClassifierName>
<artifactSet>
<includes>
<include>com.google.guava:guava</include>
<include>com.fasterxml.jackson.datatype:jackson-datatype-joda</include>
<include>joda-time:joda-time</include>
<include>com.fasterxml.jackson.module:jackson-module-scala_${scala.major.version}</include>
<!--<include>org.jfree:jcommon</include>-->
<include>org.scala-lang.modules:scala-java8-compat_${scala.major.version}</include>
<inlude>com.basho.riak:riak-client</inlude>
<inlude>com.basho.riak:dataplatform-riak-client</inlude>
</includes>
</artifactSet>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.basho.riak.spark.rdd.connector.RiakConnector
import com.basho.riak.spark.rdd.{RiakTSRDD, RiakRDD, ReadConf}
import com.basho.riak.spark.util.RiakObjectConversionUtil
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.StructType

import scala.reflect.ClassTag
import scala.runtime.AbstractFunction2
Expand All @@ -39,9 +40,9 @@ class SparkContextFunctions(@transient val sc: SparkContext) extends Serializabl
}
}

def riakTSBucket[T](bucketName: String)
(implicit ct: ClassTag[T]): RiakTSRDD[T] =
RiakTSRDD[T](sc, bucketName)
def riakTSBucket[T](bucketName: String, readConf: ReadConf = ReadConf(sc.getConf), schema: Option[StructType] = None)
(implicit ct: ClassTag[T], connector: RiakConnector = RiakConnector(sc.getConf)): RiakTSRDD[T] =
RiakTSRDD[T](sc, bucketName, readConf = readConf, schema = schema)

def riakBucket[T](bucketName: String)
(implicit ct: ClassTag[T]): RiakRDD[T] =
Expand All @@ -50,7 +51,7 @@ class SparkContextFunctions(@transient val sc: SparkContext) extends Serializabl
def riakBucket[T](bucketName: String, bucketType: String, convert: (Location, RiakObject) => T)
(implicit connector: RiakConnector = RiakConnector(sc.getConf),
ct: ClassTag[T]): RiakRDD[T] =
new RiakRDD[T](sc, connector, bucketType, bucketName, convert, readConf = ReadConf.fromSparkConf(sc.getConf))
new RiakRDD[T](sc, connector, bucketType, bucketName, convert, readConf = ReadConf(sc.getConf))

def riakBucket[T](ns: Namespace)
(implicit ct: ClassTag[T]): RiakRDD[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait LocationQuery[T] extends Query[T] {
r match {
case (_, Nil) =>
(None, Nil)
case (nextToken: T, locations: Iterable[Location]) =>
case (nextToken, locations) =>
/**
* To be 100% sure that massive fetch doesn't lead to the connection pool starvation,
* fetch will be performed by the smaller chunks of keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class RDDFunctions[T](rdd: RDD[T]) extends Serializable {
*/
def saveToRiak(bucketName: String,
bucketType: String = "default",
writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))
writeConf: WriteConf = WriteConf(sparkContext.getConf))
(implicit connector: RiakConnector = RiakConnector(sparkContext.getConf),
vwf: WriteDataMapperFactory[T, KeyValue]): Unit = {
val writer = RiakWriter[T](connector, bucketType, bucketName, writeConf )
Expand All @@ -48,7 +48,7 @@ class RDDFunctions[T](rdd: RDD[T]) extends Serializable {
saveToRiak(ns.getBucketNameAsString, ns.getBucketTypeAsString)
}

def saveAsRiakBucket(bucketDef: BucketDef, writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))
def saveAsRiakBucket(bucketDef: BucketDef, writeConf: WriteConf = WriteConf(sparkContext.getConf))
(implicit connector: RiakConnector = RiakConnector(sparkContext.getConf),
vwf: WriteDataMapperFactory[T, KeyValue]): Unit = {

Expand All @@ -58,7 +58,7 @@ class RDDFunctions[T](rdd: RDD[T]) extends Serializable {

def saveToRiakTS(bucketName: String,
bucketType: String = "default",
writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))
writeConf: WriteConf = WriteConf(sparkContext.getConf))
(implicit evidence: T <:< SparkRow,
connector: RiakConnector = RiakConnector(sparkContext.getConf),
vwf: WriteDataMapperFactory[T, RowDef]): Unit = {
Expand Down
56 changes: 45 additions & 11 deletions connector/src/main/scala/com/basho/riak/spark/rdd/ReadConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import java.util.Properties
import org.apache.spark.SparkConf

/** RDD read settings
*
* @param fetchSize number of keys to fetch in a single round-trip to Riak
* @param splitCount desired minimum number of Spark partitions to divide the data into
*/
case class ReadConf (
fetchSize: Int = ReadConf.DefaultFetchSize,
splitCount: Int = ReadConf.DefaultSplitCount,
val fetchSize: Int = ReadConf.DefaultFetchSize,
val splitCount: Int = ReadConf.DefaultSplitCount,
val tsTimestampBinding: TsTimestampBindingType = ReadConf.DefaultTsTimestampBinding,

/**
* Turns on streaming values support for PEx.
Expand All @@ -42,10 +44,24 @@ case class ReadConf (
* IT MAY CAUSE EITHER PERFORMANCE DEGRADATION or INTRODUCE FBR ERRORS
*/
useStreamingValuesForFBRead: Boolean = ReadConf.DefaultUseStreamingValues4FBRead
)
) {

def overrideProperties(options: Map[String, String]): ReadConf = {
val newFetchSize = options.getOrElse(ReadConf.fetchSizePropName, fetchSize.toString).toInt
val newSplitCount = options.getOrElse(ReadConf.splitCountPropName, splitCount.toString).toInt
val newUseStreamingValuesForFBRead = options.getOrElse(ReadConf.useStreamingValuesPropName, useStreamingValuesForFBRead.toString).toBoolean
val newTsTimestampBinding = TsTimestampBindingType(options.getOrElse(ReadConf.tsBindingsTimestamp, tsTimestampBinding.value))
ReadConf(newFetchSize, newSplitCount, newTsTimestampBinding, newUseStreamingValuesForFBRead)
}
}

object ReadConf {

final val splitCountPropName = "spark.riak.input.split.count"
final val useStreamingValuesPropName = "spark.riak.fullbucket.use-streaming-values"
final val fetchSizePropName = "spark.riak.input.fetch-size"
final val tsBindingsTimestamp = "spark.riakts.bindings.timestamp"

private val defaultProperties: Properties =
getClass.getResourceAsStream("/ee-default.properties") match {
case s: InputStream =>
Expand All @@ -57,20 +73,38 @@ object ReadConf {
new Properties()
}

val DefaultFetchSize = 1000
final val DefaultTsTimestampBinding = UseTimestamp

final val DefaultFetchSize = 1000

// TODO: Need to think about the proper default value
val DefaultSplitCount = 10
final val DefaultSplitCount = 10

val DefaultUseStreamingValues4FBRead: Boolean =
defaultProperties.getProperty("spark.riak.fullbucket.use-streaming-values", "false")
final val DefaultUseStreamingValues4FBRead: Boolean =
defaultProperties.getProperty(useStreamingValuesPropName, "false")
.toBoolean

def fromSparkConf(conf: SparkConf): ReadConf = {
/** Creates ReadConf based on properties provided to Spark Conf
*
* @param conf SparkConf of Spark context with Riak-related properties
*/
def apply(conf: SparkConf): ReadConf = {
ReadConf(
fetchSize = conf.getInt("spark.riak.input.fetch-size", DefaultFetchSize),
splitCount = conf.getInt("spark.riak.input.split.count", DefaultSplitCount),
useStreamingValuesForFBRead = conf.getBoolean("spark.riak.fullbucket.use-streaming-values", DefaultUseStreamingValues4FBRead)
fetchSize = conf.getInt(fetchSizePropName, DefaultFetchSize),
splitCount = conf.getInt(splitCountPropName, DefaultSplitCount),
tsTimestampBinding = TsTimestampBindingType(conf.get(tsBindingsTimestamp, DefaultTsTimestampBinding.value)),
useStreamingValuesForFBRead = conf.getBoolean(useStreamingValuesPropName, DefaultUseStreamingValues4FBRead)
)
}

/** Creates ReadConf based on an externally provided map of properties
* to override those of SparkCon
*
* @param conf SparkConf of Spark context to be taken as defaults
* @param options externally provided map of properties
*/
def apply(conf: SparkConf, options: Map[String, String]): ReadConf = {
val readConf = ReadConf(conf)
readConf.overrideProperties(options)
}
}
58 changes: 47 additions & 11 deletions connector/src/main/scala/com/basho/riak/spark/rdd/RiakTSRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.basho.riak.spark.query.{TSQueryData, QueryTS}
import com.basho.riak.spark.rdd.connector.RiakConnector
import com.basho.riak.spark.rdd.partitioner.{RiakTSPartition, RiakTSPartitioner}
import com.basho.riak.spark.util.{TSConversionUtil, CountingIterator, DataConvertingIterator}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{TaskContext, Partition, Logging, SparkContext}
import org.apache.spark.rdd.RDD

Expand All @@ -35,6 +36,7 @@ class RiakTSRDD[R] private[spark](
@transient sc: SparkContext,
val connector: RiakConnector,
val bucketName: String,
val schema:Option[StructType] = None,
columnNames: Option[Seq[String]] = None,
whereConstraints: Option[(String, Seq[Any])] = None,
val query: Option[String] = None,
Expand All @@ -49,32 +51,58 @@ class RiakTSRDD[R] private[spark](
"SELECT " +
(
columnNames match {
case None => "*"
case None => schema match {
case Some(s) => s.fieldNames.mkString(", ")
case None => "*"
}
case Some(c: Seq[String]) => c.mkString(", ")
}
) +
s" FROM $bucketName " +
(
whereConstraints match {
case None => ""
case Some(x: (String, Seq[(String, Any)])) =>
values = x._2.map(k => k._2)
s" WHERE ${x._1}"
case Some(x) => x match {
case (n, vs) =>
values = vs.map { case (_, v) => v }
s" WHERE $n"
}
}
)
)

RiakTSPartitioner.partitions(connector.hosts, TSQueryData(sql, values))
}

private def validateSchema(schema: StructType, columns: Seq[ColumnDescription]): Unit = {
val columnNames = columns.map(_.getName)

schema.fieldNames.diff(columnNames).toList match {
case Nil => columnNames.diff(schema.fieldNames) match {
case Nil =>
case diff =>
throw new IllegalArgumentException(s"Provided schema has nothing about the following fields returned by query: ${diff.mkString(", ")}")
}
case diff =>
throw new IllegalArgumentException(s"Provided schema contains fields that are not returned by query: ${diff.mkString(", ")}")
}
}

private def computeTS(partitionIdx: Int, context: TaskContext, queryData: TSQueryData) = {
// this implicit values is using to pass parameters to 'com.basho.riak.spark.util.TSConversionUtil$.from'
implicit val schema = this.schema
implicit val tsTimestampBinding = readConf.tsTimestampBinding

val startTime = System.currentTimeMillis()
val q = new QueryTS(BucketDef(bucketName, bucketName), queryData, readConf)

val session = connector.openSession()
val result: (Seq[ColumnDescription], Seq[Row]) = q.nextChunk(session)
val convertingIterator: Iterator[R] = q.nextChunk(session) match {
case (cds, rows) =>
if (this.schema.isDefined) validateSchema(schema.get, cds)
DataConvertingIterator.createTSConverting[R]((cds, rows), TSConversionUtil.from[R])
}

val convertingIterator = DataConvertingIterator.createTSConverting[R](result, TSConversionUtil.from[R])
val countingIterator = CountingIterator[R](convertingIterator)
context.addTaskCompletionListener { (context) =>
val endTime = System.currentTimeMillis()
Expand All @@ -99,15 +127,20 @@ class RiakTSRDD[R] private[spark](

private def copy(
query: Option[String] = this.query,
schema: Option[StructType] = schema,
columnNames: Option[Seq[String]] = columnNames,
where: Option[(String, Seq[Any])] = whereConstraints,
readConf: ReadConf = readConf, connector: RiakConnector = connector): RiakTSRDD[R] =
new RiakTSRDD(sc, connector, bucketName, columnNames, where, query, readConf)
new RiakTSRDD(sc, connector, bucketName, schema, columnNames, where, query, readConf)

def select(columns: String*): RiakTSRDD[R] = {
copy(columnNames = Some(columns))
}

def schema(structType: StructType):RiakTSRDD[R] = {
copy(schema = Option(structType))
}

/**
* Adds a SQL `WHERE` predicate(s) to the query.
*/
Expand All @@ -125,10 +158,13 @@ class RiakTSRDD[R] private[spark](
* @since 1.1.0
*/
object RiakTSRDD {
def apply[T](sc: SparkContext, bucketName: String)
(implicit ct: ClassTag[T]): RiakTSRDD[T] =
new RiakTSRDD[T](
sc, RiakConnector(sc.getConf), bucketName)
def apply[T](sc: SparkContext, bucketName: String, readConf: ReadConf)
(implicit ct: ClassTag[T], connector: RiakConnector): RiakTSRDD[T] =
new RiakTSRDD[T](sc, connector, bucketName, readConf = readConf)

def apply[T](sc: SparkContext, bucketName: String, readConf: ReadConf, schema: Option[StructType])
(implicit ct: ClassTag[T], connector: RiakConnector): RiakTSRDD[T] =
new RiakTSRDD[T](sc, connector, bucketName, schema, readConf = readConf)
}


Loading

0 comments on commit 8975efc

Please sign in to comment.