Skip to content

Commit

Permalink
Merge pull request #194 from tknandu/master
Browse files Browse the repository at this point in the history
Latest enhacements to 2.1 Spark support
  • Loading branch information
tknandu authored Jun 22, 2018
2 parents 29b952f + dcf75c1 commit 8ea13fe
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ limitations under the License.
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>documentdb-bulkexecutor</artifactId>
<version>1.0.6</version>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions

import java.lang.management.ManagementFactory

object CosmosDBConnection {
// For verification purpose
var lastConnectionPolicy: ConnectionPolicy = _
Expand Down Expand Up @@ -90,6 +92,10 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai

def getDocumentBulkImporter(collectionThroughput: Int, partitionKeyDefinition: Option[String]): DocumentBulkExecutor = {
if (bulkImporter == null) {
val initializationRetryOptions = new RetryOptions()
initializationRetryOptions.setMaxRetryAttemptsOnThrottledRequests(1000)
initializationRetryOptions.setMaxRetryWaitTimeInSeconds(1000)

if (partitionKeyDefinition.isDefined) {
val pkDefinition = new PartitionKeyDefinition()
val paths: ListBuffer[String] = new ListBuffer[String]()
Expand All @@ -101,26 +107,40 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai
collectionName,
pkDefinition,
collectionThroughput
).build()
).withInitializationRetryOptions(initializationRetryOptions).build()
}
else {
bulkImporter = DocumentBulkExecutor.builder.from(documentClient,
databaseName,
collectionName,
getCollection.getPartitionKey,
collectionThroughput
).build()
).withInitializationRetryOptions(initializationRetryOptions).build()
}
}

bulkImporter
}

def setDefaultClientRetryPolicy: Unit = {
if (documentClient != null) {
documentClient.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(9);
documentClient.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(30);
}
}

def setZeroClientRetryPolicy: Unit = {
if (documentClient != null) {
documentClient.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
documentClient.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
}
}

private def getClientConfiguration(config: Config): ClientConfiguration = {
val connectionPolicy = new ConnectionPolicy()
connectionPolicy.setConnectionMode(connectionMode)
connectionPolicy.setUserAgentSuffix(Constants.userAgentSuffix)
connectionPolicy.setUserAgentSuffix(Constants.userAgentSuffix + " " + ManagementFactory.getRuntimeMXBean().getName())

config.get[String](CosmosDBConfig.ConnectionMaxPoolSize) match {
case Some(maxPoolSizeStr) => connectionPolicy.setMaxPoolSize(maxPoolSizeStr.toInt)
case None => // skip
Expand Down Expand Up @@ -148,6 +168,7 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai
connectionPolicy.setPreferredLocations(preferredLocations)
}

/*
val bulkimport = config.get[String](CosmosDBConfig.BulkImport).
getOrElse(CosmosDBConfig.DefaultBulkImport.toString).
toBoolean
Expand All @@ -158,6 +179,7 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai
connectionPolicy.getRetryOptions.setMaxRetryAttemptsOnThrottledRequests(0)
connectionPolicy.setConnectionMode(ConnectionMode.Gateway)
}
*/

ClientConfiguration(
config.get[String](CosmosDBConfig.Endpoint).get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import rx.Observable
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.bulkupdate.{BulkUpdateResponse, UpdateItem}
import com.microsoft.azure.documentdb.bulkexecutor.{DocumentBulkExecutor}
import com.microsoft.azure.documentdb.bulkexecutor.bulkimport.{BulkImportResponse}
import com.microsoft.azure.documentdb.bulkexecutor.{DocumentBulkExecutor, BulkImportResponse, BulkUpdateResponse, UpdateItem}
import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -199,10 +197,20 @@ object CosmosDBSpark extends LoggingTrait {
collectionThroughput: Int,
writingBatchSize: Int,
partitionKeyDefinition: Option[String])(implicit ev: ClassTag[D]): Unit = {
val importer: DocumentBulkExecutor = connection.getDocumentBulkImporter(collectionThroughput, partitionKeyDefinition)

// Set retry options high for initialization (default values)
connection.setDefaultClientRetryPolicy

// Initialize BulkExecutor
val updater: DocumentBulkExecutor = connection.getDocumentBulkImporter(collectionThroughput, partitionKeyDefinition)

// Set retry options to 0 to pass control to BulkExecutor
// connection.setZeroClientRetryPolicy

val updateItems = new java.util.ArrayList[UpdateItem](writingBatchSize)
val updatePatchItems = new java.util.ArrayList[Document](writingBatchSize)
var bulkImportResponse: BulkUpdateResponse = null

var bulkUpdateResponse: BulkUpdateResponse = null
iter.foreach(item => {
item match {
case updateItem: UpdateItem =>
Expand All @@ -214,19 +222,31 @@ object CosmosDBSpark extends LoggingTrait {
case _ => throw new Exception("Unsupported update item types")
}
if (updateItems.size() >= writingBatchSize) {
bulkImportResponse = importer.updateAll(updateItems)
bulkUpdateResponse = updater.updateAll(updateItems, null)
if (bulkUpdateResponse.getNumberOfDocumentsUpdated != updateItems.size) {
throw new Exception("Error encountered in bulk update API execution. Exceptions observed:\n" + bulkUpdateResponse.getErrors.toString)
}
updateItems.clear()
}
if (updatePatchItems.size() >= writingBatchSize) {
bulkImportResponse = importer.updateAllWithPatch(updatePatchItems)
bulkUpdateResponse = updater.mergeAll(updatePatchItems, null)
if (bulkUpdateResponse.getNumberOfDocumentsUpdated != updatePatchItems.size) {
throw new Exception("Error encountered in bulk update API execution. Exceptions observed:\n" + bulkUpdateResponse.getErrors.toString)
}
updatePatchItems.clear()
}
})
if (updateItems.size() > 0) {
bulkImportResponse = importer.updateAll(updateItems)
bulkUpdateResponse = updater.updateAll(updateItems, null)
if (bulkUpdateResponse.getNumberOfDocumentsUpdated != updateItems.size) {
throw new Exception("Error encountered in bulk update API execution. Exceptions observed:\n" + bulkUpdateResponse.getErrors.toString)
}
}
if (updatePatchItems.size() > 0) {
bulkImportResponse = importer.updateAllWithPatch(updatePatchItems)
bulkUpdateResponse = updater.mergeAll(updatePatchItems, null)
if (bulkUpdateResponse.getNumberOfDocumentsUpdated != updatePatchItems.size) {
throw new Exception("Error encountered in bulk update API execution. Exceptions observed:\n" + bulkUpdateResponse.getErrors.toString)
}
}
}

Expand All @@ -237,8 +257,18 @@ object CosmosDBSpark extends LoggingTrait {
rootPropertyToSave: Option[String],
partitionKeyDefinition: Option[String],
upsert: Boolean): Unit = {

// Set retry options high for initialization (default values)
connection.setDefaultClientRetryPolicy

// Initialize BulkExecutor
val importer: DocumentBulkExecutor = connection.getDocumentBulkImporter(collectionThroughput, partitionKeyDefinition)

// Set retry options to 0 to pass control to BulkExecutor
// connection.setZeroClientRetryPolicy

val documents = new java.util.ArrayList[String](writingBatchSize)

var bulkImportResponse: BulkImportResponse = null
iter.foreach(item => {
val document: Document = item match {
Expand All @@ -256,12 +286,18 @@ object CosmosDBSpark extends LoggingTrait {
}
documents.add(document.toJson())
if (documents.size() >= writingBatchSize) {
bulkImportResponse = importer.importAll(documents, upsert)
bulkImportResponse = importer.importAll(documents, upsert, false, null)
if (bulkImportResponse.getNumberOfDocumentsImported != documents.size) {
throw new Exception("Error encountered in bulk import API execution. Exceptions observed:\n" + bulkImportResponse.getErrors.toString)
}
documents.clear()
}
})
if (documents.size() > 0) {
bulkImportResponse = importer.importAll(documents, upsert)
bulkImportResponse = importer.importAll(documents, upsert, false, null)
if (bulkImportResponse.getNumberOfDocumentsImported != documents.size) {
throw new Exception("Error encountered in bulk import API execution. Exceptions observed:\n" + bulkImportResponse.getErrors.toString)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package com.microsoft.azure.cosmosdb.spark.schema
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.types._
import org.json.JSONObject

/**
* Json - Scala object transformation support.
Expand All @@ -46,6 +47,7 @@ trait JsonSupport {
protected def enforceCorrectType(value: Any, desiredType: DataType): Any =
Option(value).map { _ =>
desiredType match {
case _ if value == JSONObject.NULL => null // guard when null value was inserted in document
case StringType => toString(value)
case _ if value == "" => null // guard the non string type
case ByteType => toByte(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.microsoft.azure.cosmosdb.spark.rdd.{CosmosDBRDD, CosmosDBRDDIterator}
import com.microsoft.azure.cosmosdb.spark.streaming.{CosmosDBSinkProvider, CosmosDBSourceProvider}
import com.microsoft.azure.cosmosdb.spark.{RequiresCosmosDB, _}
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.bulkupdate.{IncUpdateOperation, UpdateItem, UpdateOperationBase}
import com.microsoft.azure.documentdb.bulkexecutor.{IncUpdateOperation, UpdateItem, UpdateOperationBase}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.datasources.FilePartition
Expand Down

0 comments on commit 8ea13fe

Please sign in to comment.