Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/schema quantum #224

Open
wants to merge 1 commit into
base: spark-2.0-support
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ case class QueryFullBucket(bucket: BucketDef,
response.getEntries.toList match {
case Nil if coverageEntriesIt.get.hasNext => nextChunk(None)
case entries: List[FullBucketRead.Response.Entry] =>
val data = entries.map(e => e.getLocation -> e.getFetchedValue.getValue(classOf[RiakObject]))
val data = entries.map(e => e.getFetchedValue.getLocation -> e.getFetchedValue.getValue(classOf[RiakObject]))
response.hasContinuation match {
case true => Some(Left(response.getContinuation.toStringUtf8)) -> data
case _ if coverageEntriesIt.get.hasNext => Some(Right(coverageEntriesIt.get.next)) -> data
Expand Down
28 changes: 7 additions & 21 deletions connector/src/main/scala/com/basho/riak/spark/rdd/ReadConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package com.basho.riak.spark.rdd
import java.io.InputStream
import java.util.Properties
import org.apache.spark.SparkConf
import java.time.Duration
import org.joda.time.Duration
import org.apache.spark.network.util.JavaUtils

/** RDD read settings
Expand All @@ -33,16 +31,6 @@ case class ReadConf (
val fetchSize: Int = ReadConf.DefaultFetchSize,
val splitCount: Int = ReadConf.DefaultSplitCount,
val tsTimestampBinding: TsTimestampBindingType = ReadConf.DefaultTsTimestampBinding,
/**
* Used only in ranged partitioner to identify quantized field.
* Usage example:
* sparkSession.read
* .option("spark.riak.partitioning.ts-range-field-name", "time")
* Providing this property automatically turns on RangedRiakTSPartitioner
*/
val tsRangeFieldName: String = null,
val quantum: Option[Long] = None,

/**
* Turns on streaming values support for PEx.
*
Expand All @@ -54,17 +42,17 @@ case class ReadConf (
* DO NOT CHANGE THIS VALUES MANUALLY IF YOU DON'T KNOW WHAT YOU ARE DOING
* IT MAY CAUSE EITHER PERFORMANCE DEGRADATION or INTRODUCE FBR ERRORS
*/
useStreamingValuesForFBRead: Boolean = ReadConf.DefaultUseStreamingValues4FBRead
useStreamingValuesForFBRead: Boolean = ReadConf.DefaultUseStreamingValues4FBRead,
val tsPartitioner: Option[String] = None
) {

def overrideProperties(options: Map[String, String]): ReadConf = {
val newFetchSize = options.getOrElse(ReadConf.fetchSizePropName, fetchSize.toString).toInt
val newSplitCount = options.getOrElse(ReadConf.splitCountPropName, splitCount.toString).toInt
val newUseStreamingValuesForFBRead = options.getOrElse(ReadConf.useStreamingValuesPropName, useStreamingValuesForFBRead.toString).toBoolean
val newTsTimestampBinding = TsTimestampBindingType(options.getOrElse(ReadConf.tsBindingsTimestamp, tsTimestampBinding.value))
val newTsRangeFieldName = options.getOrElse(ReadConf.tsRangeFieldPropName, tsRangeFieldName)
val newQuantum = options.get(ReadConf.tsQuantumPropName).map(JavaUtils.timeStringAsMs)
ReadConf(newFetchSize, newSplitCount, newTsTimestampBinding, newTsRangeFieldName, newQuantum, newUseStreamingValuesForFBRead)
val newTSPartitioner = options.get(ReadConf.tsPartitionerName)
ReadConf(newFetchSize, newSplitCount, newTsTimestampBinding, newUseStreamingValuesForFBRead, newTSPartitioner)
}
}

Expand All @@ -74,8 +62,7 @@ object ReadConf {
final val useStreamingValuesPropName = "spark.riak.fullbucket.use-streaming-values"
final val fetchSizePropName = "spark.riak.input.fetch-size"
final val tsBindingsTimestamp = "spark.riakts.bindings.timestamp"
final val tsRangeFieldPropName = "spark.riak.partitioning.ts-range-field-name"
final val tsQuantumPropName = "spark.riak.partitioning.ts-quantum"
final val tsPartitionerName = "spark.riak.ts.partitioner"

private val defaultProperties: Properties =
getClass.getResourceAsStream("/ee-default.properties") match {
Expand Down Expand Up @@ -108,9 +95,8 @@ object ReadConf {
fetchSize = conf.getInt(fetchSizePropName, DefaultFetchSize),
splitCount = conf.getInt(splitCountPropName, DefaultSplitCount),
tsTimestampBinding = TsTimestampBindingType(conf.get(tsBindingsTimestamp, DefaultTsTimestampBinding.value)),
tsRangeFieldName = conf.get(tsRangeFieldPropName, null),
quantum = conf.getOption(tsQuantumPropName).map(JavaUtils.timeStringAsMs),
useStreamingValuesForFBRead = conf.getBoolean(useStreamingValuesPropName, DefaultUseStreamingValues4FBRead)
useStreamingValuesForFBRead = conf.getBoolean(useStreamingValuesPropName, DefaultUseStreamingValues4FBRead),
tsPartitioner = conf.getOption(tsPartitionerName)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ class RiakTSRDD[R] private[spark](
val q = new QueryTS(connector, queryData)
val iterator: TSDataQueryingIterator = TSDataQueryingIterator(q)
val columns = iterator.columnDefs
if (this.schema.isDefined && !columns.isEmpty)
if (this.schema.isDefined && columns.nonEmpty) {
validateSchema(schema.get, columns)
}
val convertingIterator = DataConvertingIterator.createTSConverting((columns, iterator), TSConversionUtil.from[R])
val countingIterator = CountingIterator[R](convertingIterator)
context.addTaskCompletionListener { (context) =>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package com.basho.riak.spark.rdd.connector

import com.basho.riak.client.api.{RiakClient, RiakCommand}
import com.basho.riak.client.core.{RiakFuture, FutureOperation}
import com.basho.riak.client.core.query.timeseries.TableDefinition
import com.basho.riak.client.core.{FutureOperation, RiakFuture}
import com.basho.riak.spark.rdd.TsTimestampBindingType
import org.apache.spark.sql.types.StructType

/**
* @author Sergey Galkin <srggal at gmail dot com>
Expand All @@ -29,6 +32,8 @@ trait RiakSession extends AutoCloseable {

def execute[V, S](operation: FutureOperation[V, _, S]): RiakFuture[V, S]

def getTableDefinition(name: String, bindingType: TsTimestampBindingType): StructType

def isClosed: Boolean


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ import com.google.common.cache._

import scala.collection.JavaConverters._
import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor, ThreadFactory, TimeUnit}
import scala.collection.JavaConversions._

import com.basho.riak.client.core.operations.ts.DescribeTableOperation
import com.basho.riak.spark.rdd.TsTimestampBindingType
import io.netty.bootstrap.Bootstrap
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.util.concurrent.DefaultThreadFactory
import org.apache.spark.riak.Logging
import org.apache.spark.riak.types.RiakStructType
import org.apache.spark.sql.types.StructType

import scala.collection.concurrent.TrieMap
import scala.util.Try

/**
* Simple [[RiakSession]] Cache/Pool.
Expand Down Expand Up @@ -267,6 +273,15 @@ class CachedSession(val conf: RiakConnectorConf, riakClient: RiakClient, afterCl
}
}

override def getTableDefinition(name: String, bindingType: TsTimestampBindingType): StructType = {
val describeOp = new DescribeTableOperation.Builder(name).build()
val tableDef = Try(execute(describeOp).get()) getOrElse {
throw new IllegalStateException(s"No table $name was found")
}

RiakStructType(tableDef.getFullColumnDescriptions.toSeq, bindingType)
}

override def maxConnectionsPerNode: Int = {
checkThatSessionIsActive()
riakClient.getRiakCluster.getNodes.asScala match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@
*/
package com.basho.riak.spark.util

import com.basho.riak.client.core.query.timeseries.{ Cell, Row => RiakRow, ColumnDescription }
import com.basho.riak.spark.rdd.{ UseTimestamp, UseLong, TsTimestampBindingType }
import com.basho.riak.client.core.query.timeseries.{Cell, ColumnDescription, Row => RiakRow}
import com.basho.riak.spark.rdd.{TsTimestampBindingType, UseLong, UseTimestamp}
import com.fasterxml.jackson.core.`type`.TypeReference
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._

import scala.collection.convert.decorateAll._
import java.sql.Timestamp
import org.apache.spark.sql.{ Row => SparkRow }

import org.apache.spark.sql.{Row => SparkRow}

import scala.reflect.ClassTag
import com.basho.riak.client.core.query.timeseries.ColumnDescription.ColumnType
import java.util.Calendar
import java.sql.Date

import scala.util.Try
import com.basho.riak.client.core.query.timeseries.FullColumnDescription
import com.basho.riak.client.core.query.timeseries.TableDefinition
import org.apache.spark.riak.types.RiakStructType

import scala.collection.JavaConversions._
import scala.util.Success
import scala.util.Failure
Expand All @@ -43,7 +49,8 @@ import scala.util.Failure
object TSConversionUtil {
val partitionKeyOrdinalProp = "riak.partitionKeyOrdinal"
val localKeyOrdinalProp = "riak.localKeyOrdinal"
val isTSFieldProp = "riak.isTSField"
val quantum = "riakTS.quantum"
val quantizedField = "riakTS.quantizedField"

private val STRING_TYPE_REFERENCE = new TypeReference[String] {}

Expand Down Expand Up @@ -74,18 +81,19 @@ object TSConversionUtil {
} else null
}

def asDataType(columnType: ColumnType, tsTimestampBinding: TsTimestampBindingType): DataType =
def asDataType(columnType: ColumnType, tsTimestampBinding: TsTimestampBindingType): DataType = {
columnType match {
case ColumnType.BOOLEAN => BooleanType
case ColumnType.DOUBLE => DoubleType
case ColumnType.SINT64 => LongType
case ColumnType.DOUBLE => DoubleType
case ColumnType.SINT64 => LongType
case ColumnType.TIMESTAMP => tsTimestampBinding match {
case UseLong => LongType
case UseLong => LongType
case UseTimestamp => TimestampType
}
case ColumnType.VARCHAR => StringType
case _ => throw new IllegalStateException(s"Unsupported column type $columnType")
case _ => throw new IllegalStateException(s"Unsupported column type $columnType")
}
}

def asColumnType(dataType: DataType) = {
dataType match {
Expand All @@ -98,27 +106,6 @@ object TSConversionUtil {
}
}

private def asStructField(columnDescription: ColumnDescription, tsTimestampBinding: TsTimestampBindingType): StructField = {
val ft = asDataType(columnDescription.getType, tsTimestampBinding)
if (columnDescription.isInstanceOf[FullColumnDescription]) {
val fullColumnDescription = columnDescription.asInstanceOf[FullColumnDescription]
val isNullable = fullColumnDescription.isNullable()
val partitionKeyOrdinal = fullColumnDescription.getPartitionKeyOrdinal
val localKeyOrdinal = fullColumnDescription.getLocalKeyOrdinal
// val isTSField = fullColumnDescription.getType == ColumnType.TIMESTAMP && fullColumnDescription.isPartitionKeyMember
val metadataBuilder = new MetadataBuilder()
if(localKeyOrdinal != null)
metadataBuilder.putLong(localKeyOrdinalProp, localKeyOrdinal.toLong)
if(partitionKeyOrdinal != null)
metadataBuilder.putLong(partitionKeyOrdinalProp, partitionKeyOrdinal.toLong)
// if(isTSField)
// metadataBuilder.putBoolean(isTSFieldProp, isTSField)
val metadata = metadataBuilder.build()
StructField(columnDescription.getName, ft, isNullable, metadata)
} else {
StructField(columnDescription.getName, ft)
}
}

def asSparkRow(schema: StructType, row: RiakRow, columns: Option[Seq[ColumnDescription]] = None): SparkRow = {
val cells = row.getCellsCopy.asScala
Expand All @@ -130,17 +117,15 @@ object TSConversionUtil {
new GenericRowWithSchema(values.toArray, schema)
}

def asSparkSchema(columns: Seq[ColumnDescription], tsTimestampBinding: TsTimestampBindingType): StructType =
StructType(columns.map(c => asStructField(c, tsTimestampBinding)))

private val classOfSparkRow = classOf[SparkRow]

def from[T: ClassTag](columns: Seq[ColumnDescription], row: RiakRow)(implicit schema: Option[StructType] = None, tsTimestampBinding: TsTimestampBindingType): T = {
def from[T: ClassTag](columns: Seq[ColumnDescription], row: RiakRow)
(implicit schema: Option[StructType] = None, tsTimestampBinding: TsTimestampBindingType): T = {
val ct = implicitly[ClassTag[T]]

val (st, cs) = schema match {
case Some(structType) => (structType, Some(columns))
case None => (asSparkSchema(columns, tsTimestampBinding), None)
case Some(structType) => (new RiakStructType(structType.fields), Some(columns))
case None => (RiakStructType(columns, tsTimestampBinding), None)
}
asSparkRow(st, row, cs).asInstanceOf[T]
}
Expand Down Expand Up @@ -228,24 +213,4 @@ object TSConversionUtil {
}
new RiakRow(cells: _*)
}

def asTableDef(name: String, schema: StructType): TableDefinition = {
val columns = schema.map(asColumnDef)
new TableDefinition(name, columns)
}

private def getIntegerFromMetadata(metadata: Metadata, name: String): Integer = {
Try(metadata.getLong(name)) match{
case Success(long) => long.toInt
case Failure(_) => null
}
}

def asColumnDef(field: StructField): FullColumnDescription = {
val dataType = asColumnType(field.dataType)
val metadata = field.metadata
val partitionKeyOrdinal = getIntegerFromMetadata(metadata, partitionKeyOrdinalProp)
val localKeyOrdinal = getIntegerFromMetadata(metadata, localKeyOrdinalProp)
new FullColumnDescription(field.name, dataType, field.nullable, partitionKeyOrdinal, localKeyOrdinal)
}
}
Loading