Skip to content

Commit

Permalink
Removed usage of SQLContext
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksandrPavlenko authored and srgg committed Feb 8, 2017
1 parent 126faf9 commit 50b5938
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 215 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ case class QueryFullBucket(bucket: BucketDef,
response.getEntries.toList match {
case Nil if coverageEntriesIt.get.hasNext => nextChunk(None)
case entries: List[FullBucketRead.Response.Entry] =>
val data = entries.map(e => e.getLocation -> e.getFetchedValue.getValue(classOf[RiakObject]))
val data = entries.map(e => e.getFetchedValue.getLocation -> e.getFetchedValue.getValue(classOf[RiakObject]))
response.hasContinuation match {
case true => Some(Left(response.getContinuation.toStringUtf8)) -> data
case _ if coverageEntriesIt.get.hasNext => Some(Right(coverageEntriesIt.get.next)) -> data
Expand Down
28 changes: 7 additions & 21 deletions connector/src/main/scala/com/basho/riak/spark/rdd/ReadConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package com.basho.riak.spark.rdd
import java.io.InputStream
import java.util.Properties
import org.apache.spark.SparkConf
import java.time.Duration
import org.joda.time.Duration
import org.apache.spark.network.util.JavaUtils

/** RDD read settings
Expand All @@ -33,16 +31,6 @@ case class ReadConf (
val fetchSize: Int = ReadConf.DefaultFetchSize,
val splitCount: Int = ReadConf.DefaultSplitCount,
val tsTimestampBinding: TsTimestampBindingType = ReadConf.DefaultTsTimestampBinding,
/**
* Used only in ranged partitioner to identify quantized field.
* Usage example:
* sparkSession.read
* .option("spark.riak.partitioning.ts-range-field-name", "time")
* Providing this property automatically turns on RangedRiakTSPartitioner
*/
val tsRangeFieldName: String = null,
val quantum: Option[Long] = None,

/**
* Turns on streaming values support for PEx.
*
Expand All @@ -54,17 +42,17 @@ case class ReadConf (
* DO NOT CHANGE THIS VALUES MANUALLY IF YOU DON'T KNOW WHAT YOU ARE DOING
* IT MAY CAUSE EITHER PERFORMANCE DEGRADATION or INTRODUCE FBR ERRORS
*/
useStreamingValuesForFBRead: Boolean = ReadConf.DefaultUseStreamingValues4FBRead
useStreamingValuesForFBRead: Boolean = ReadConf.DefaultUseStreamingValues4FBRead,
val tsPartitioner: Option[String] = None
) {

def overrideProperties(options: Map[String, String]): ReadConf = {
val newFetchSize = options.getOrElse(ReadConf.fetchSizePropName, fetchSize.toString).toInt
val newSplitCount = options.getOrElse(ReadConf.splitCountPropName, splitCount.toString).toInt
val newUseStreamingValuesForFBRead = options.getOrElse(ReadConf.useStreamingValuesPropName, useStreamingValuesForFBRead.toString).toBoolean
val newTsTimestampBinding = TsTimestampBindingType(options.getOrElse(ReadConf.tsBindingsTimestamp, tsTimestampBinding.value))
val newTsRangeFieldName = options.getOrElse(ReadConf.tsRangeFieldPropName, tsRangeFieldName)
val newQuantum = options.get(ReadConf.tsQuantumPropName).map(JavaUtils.timeStringAsMs)
ReadConf(newFetchSize, newSplitCount, newTsTimestampBinding, newTsRangeFieldName, newQuantum, newUseStreamingValuesForFBRead)
val newTSPartitioner = options.get(ReadConf.tsPartitionerName)
ReadConf(newFetchSize, newSplitCount, newTsTimestampBinding, newUseStreamingValuesForFBRead, newTSPartitioner)
}
}

Expand All @@ -74,8 +62,7 @@ object ReadConf {
final val useStreamingValuesPropName = "spark.riak.fullbucket.use-streaming-values"
final val fetchSizePropName = "spark.riak.input.fetch-size"
final val tsBindingsTimestamp = "spark.riakts.bindings.timestamp"
final val tsRangeFieldPropName = "spark.riak.partitioning.ts-range-field-name"
final val tsQuantumPropName = "spark.riak.partitioning.ts-quantum"
final val tsPartitionerName = "spark.riak.ts.partitioner"

private val defaultProperties: Properties =
getClass.getResourceAsStream("/ee-default.properties") match {
Expand Down Expand Up @@ -108,9 +95,8 @@ object ReadConf {
fetchSize = conf.getInt(fetchSizePropName, DefaultFetchSize),
splitCount = conf.getInt(splitCountPropName, DefaultSplitCount),
tsTimestampBinding = TsTimestampBindingType(conf.get(tsBindingsTimestamp, DefaultTsTimestampBinding.value)),
tsRangeFieldName = conf.get(tsRangeFieldPropName, null),
quantum = conf.getOption(tsQuantumPropName).map(JavaUtils.timeStringAsMs),
useStreamingValuesForFBRead = conf.getBoolean(useStreamingValuesPropName, DefaultUseStreamingValues4FBRead)
useStreamingValuesForFBRead = conf.getBoolean(useStreamingValuesPropName, DefaultUseStreamingValues4FBRead),
tsPartitioner = conf.getOption(tsPartitionerName)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ class RiakTSRDD[R] private[spark](
val q = new QueryTS(connector, queryData)
val iterator: TSDataQueryingIterator = TSDataQueryingIterator(q)
val columns = iterator.columnDefs
if (this.schema.isDefined && !columns.isEmpty)
if (this.schema.isDefined && columns.nonEmpty) {
validateSchema(schema.get, columns)
}
val convertingIterator = DataConvertingIterator.createTSConverting((columns, iterator), TSConversionUtil.from[R])
val countingIterator = CountingIterator[R](convertingIterator)
context.addTaskCompletionListener { (context) =>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package com.basho.riak.spark.rdd.connector

import com.basho.riak.client.api.{RiakClient, RiakCommand}
import com.basho.riak.client.core.{RiakFuture, FutureOperation}
import com.basho.riak.client.core.query.timeseries.TableDefinition
import com.basho.riak.client.core.{FutureOperation, RiakFuture}
import com.basho.riak.spark.rdd.TsTimestampBindingType
import org.apache.spark.sql.types.StructType

/**
* @author Sergey Galkin <srggal at gmail dot com>
Expand All @@ -29,6 +32,8 @@ trait RiakSession extends AutoCloseable {

def execute[V, S](operation: FutureOperation[V, _, S]): RiakFuture[V, S]

def getTableDefinition(name: String, bindingType: TsTimestampBindingType): StructType

def isClosed: Boolean


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ import com.google.common.cache._

import scala.collection.JavaConverters._
import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor, ThreadFactory, TimeUnit}
import scala.collection.JavaConversions._

import com.basho.riak.client.core.operations.ts.DescribeTableOperation
import com.basho.riak.spark.rdd.TsTimestampBindingType
import io.netty.bootstrap.Bootstrap
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.util.concurrent.DefaultThreadFactory
import org.apache.spark.riak.Logging
import org.apache.spark.riak.types.RiakStructType
import org.apache.spark.sql.types.StructType

import scala.collection.concurrent.TrieMap
import scala.util.Try

/**
* Simple [[RiakSession]] Cache/Pool.
Expand Down Expand Up @@ -267,6 +273,15 @@ class CachedSession(val conf: RiakConnectorConf, riakClient: RiakClient, afterCl
}
}

override def getTableDefinition(name: String, bindingType: TsTimestampBindingType): StructType = {
val describeOp = new DescribeTableOperation.Builder(name).build()
val tableDef = Try(execute(describeOp).get()) getOrElse {
throw new IllegalStateException(s"No table $name was found")
}

RiakStructType(tableDef.getFullColumnDescriptions.toSeq, bindingType)
}

override def maxConnectionsPerNode: Int = {
checkThatSessionIsActive()
riakClient.getRiakCluster.getNodes.asScala match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@
*/
package com.basho.riak.spark.util

import com.basho.riak.client.core.query.timeseries.{ Cell, Row => RiakRow, ColumnDescription }
import com.basho.riak.spark.rdd.{ UseTimestamp, UseLong, TsTimestampBindingType }
import com.basho.riak.client.core.query.timeseries.{Cell, ColumnDescription, Row => RiakRow}
import com.basho.riak.spark.rdd.{TsTimestampBindingType, UseLong, UseTimestamp}
import com.fasterxml.jackson.core.`type`.TypeReference
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._

import scala.collection.convert.decorateAll._
import java.sql.Timestamp
import org.apache.spark.sql.{ Row => SparkRow }

import org.apache.spark.sql.{Row => SparkRow}

import scala.reflect.ClassTag
import com.basho.riak.client.core.query.timeseries.ColumnDescription.ColumnType
import java.util.Calendar
import java.sql.Date

import scala.util.Try
import com.basho.riak.client.core.query.timeseries.FullColumnDescription
import com.basho.riak.client.core.query.timeseries.TableDefinition
import org.apache.spark.riak.types.RiakStructType

import scala.collection.JavaConversions._
import scala.util.Success
import scala.util.Failure
Expand All @@ -43,7 +49,8 @@ import scala.util.Failure
object TSConversionUtil {
val partitionKeyOrdinalProp = "riak.partitionKeyOrdinal"
val localKeyOrdinalProp = "riak.localKeyOrdinal"
val isTSFieldProp = "riak.isTSField"
val quantum = "riakTS.quantum"
val quantizedField = "riakTS.quantizedField"

private val STRING_TYPE_REFERENCE = new TypeReference[String] {}

Expand Down Expand Up @@ -74,18 +81,19 @@ object TSConversionUtil {
} else null
}

def asDataType(columnType: ColumnType, tsTimestampBinding: TsTimestampBindingType): DataType =
def asDataType(columnType: ColumnType, tsTimestampBinding: TsTimestampBindingType): DataType = {
columnType match {
case ColumnType.BOOLEAN => BooleanType
case ColumnType.DOUBLE => DoubleType
case ColumnType.SINT64 => LongType
case ColumnType.DOUBLE => DoubleType
case ColumnType.SINT64 => LongType
case ColumnType.TIMESTAMP => tsTimestampBinding match {
case UseLong => LongType
case UseLong => LongType
case UseTimestamp => TimestampType
}
case ColumnType.VARCHAR => StringType
case _ => throw new IllegalStateException(s"Unsupported column type $columnType")
case _ => throw new IllegalStateException(s"Unsupported column type $columnType")
}
}

def asColumnType(dataType: DataType) = {
dataType match {
Expand All @@ -98,27 +106,6 @@ object TSConversionUtil {
}
}

private def asStructField(columnDescription: ColumnDescription, tsTimestampBinding: TsTimestampBindingType): StructField = {
val ft = asDataType(columnDescription.getType, tsTimestampBinding)
if (columnDescription.isInstanceOf[FullColumnDescription]) {
val fullColumnDescription = columnDescription.asInstanceOf[FullColumnDescription]
val isNullable = fullColumnDescription.isNullable()
val partitionKeyOrdinal = fullColumnDescription.getPartitionKeyOrdinal
val localKeyOrdinal = fullColumnDescription.getLocalKeyOrdinal
// val isTSField = fullColumnDescription.getType == ColumnType.TIMESTAMP && fullColumnDescription.isPartitionKeyMember
val metadataBuilder = new MetadataBuilder()
if(localKeyOrdinal != null)
metadataBuilder.putLong(localKeyOrdinalProp, localKeyOrdinal.toLong)
if(partitionKeyOrdinal != null)
metadataBuilder.putLong(partitionKeyOrdinalProp, partitionKeyOrdinal.toLong)
// if(isTSField)
// metadataBuilder.putBoolean(isTSFieldProp, isTSField)
val metadata = metadataBuilder.build()
StructField(columnDescription.getName, ft, isNullable, metadata)
} else {
StructField(columnDescription.getName, ft)
}
}

def asSparkRow(schema: StructType, row: RiakRow, columns: Option[Seq[ColumnDescription]] = None): SparkRow = {
val cells = row.getCellsCopy.asScala
Expand All @@ -130,17 +117,15 @@ object TSConversionUtil {
new GenericRowWithSchema(values.toArray, schema)
}

def asSparkSchema(columns: Seq[ColumnDescription], tsTimestampBinding: TsTimestampBindingType): StructType =
StructType(columns.map(c => asStructField(c, tsTimestampBinding)))

private val classOfSparkRow = classOf[SparkRow]

def from[T: ClassTag](columns: Seq[ColumnDescription], row: RiakRow)(implicit schema: Option[StructType] = None, tsTimestampBinding: TsTimestampBindingType): T = {
def from[T: ClassTag](columns: Seq[ColumnDescription], row: RiakRow)
(implicit schema: Option[StructType] = None, tsTimestampBinding: TsTimestampBindingType): T = {
val ct = implicitly[ClassTag[T]]

val (st, cs) = schema match {
case Some(structType) => (structType, Some(columns))
case None => (asSparkSchema(columns, tsTimestampBinding), None)
case Some(structType) => (new RiakStructType(structType.fields), Some(columns))
case None => (RiakStructType(columns, tsTimestampBinding), None)
}
asSparkRow(st, row, cs).asInstanceOf[T]
}
Expand Down Expand Up @@ -228,24 +213,4 @@ object TSConversionUtil {
}
new RiakRow(cells: _*)
}

def asTableDef(name: String, schema: StructType): TableDefinition = {
val columns = schema.map(asColumnDef)
new TableDefinition(name, columns)
}

private def getIntegerFromMetadata(metadata: Metadata, name: String): Integer = {
Try(metadata.getLong(name)) match{
case Success(long) => long.toInt
case Failure(_) => null
}
}

def asColumnDef(field: StructField): FullColumnDescription = {
val dataType = asColumnType(field.dataType)
val metadata = field.metadata
val partitionKeyOrdinal = getIntegerFromMetadata(metadata, partitionKeyOrdinalProp)
val localKeyOrdinal = getIntegerFromMetadata(metadata, localKeyOrdinalProp)
new FullColumnDescription(field.name, dataType, field.nullable, partitionKeyOrdinal, localKeyOrdinal)
}
}
Loading

0 comments on commit 50b5938

Please sign in to comment.