Skip to content

Commit

Permalink
add dynamodb commit owner
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed May 16, 2024
1 parent b1b84d5 commit 99bc634
Show file tree
Hide file tree
Showing 9 changed files with 1,153 additions and 39 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ lazy val spark = (project in file("spark"))
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
// For DynamoDBCommitStore
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.dynamodbcommitstore;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import org.apache.spark.sql.delta.managedcommit.CommitOwnerBuilder;
import org.apache.spark.sql.delta.managedcommit.CommitOwnerClient;
import scala.collection.immutable.Map;

import java.lang.reflect.InvocationTargetException;

public class DynamoDBCommitOwnerClientBuilder implements CommitOwnerBuilder {

private final long BACKFILL_BATCH_SIZE = 1L;

@Override
public String getName() {
return "dynamodb";
}

private static final String MANAGED_COMMITS_TABLE_NAME_KEY = "managedCommitsTableName";
private static final String DYNAMO_DB_ENDPOINT_KEY = "dynamoDBEndpoint";
private static final String AWS_CREDENTIALS_PROVIDER_KEY = "awsCredentialsProvider";

// TODO: update this interface so that it can take a sparkSession.
@Override
public CommitOwnerClient build(Map<String, String> conf) {
String managedCommitsTableName = conf.get(MANAGED_COMMITS_TABLE_NAME_KEY).getOrElse(() -> {
throw new RuntimeException(MANAGED_COMMITS_TABLE_NAME_KEY + " not found");
});
String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> {
throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found");
});
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse( () ->
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
try {
AmazonDynamoDBClient client =
createAmazonDDBClient(dynamoDBEndpoint, awsCredentialsProviderName);
return new DynamoDBCommitOwnerClient(
managedCommitsTableName, dynamoDBEndpoint, client, BACKFILL_BATCH_SIZE);
} catch (Exception e) {
throw new RuntimeException("Failed to create DynamoDB client", e);
}
}

private AmazonDynamoDBClient createAmazonDDBClient(
String endpoint,
String credentialProviderName) throws NoSuchMethodException, ClassNotFoundException, InvocationTargetException, InstantiationException, IllegalAccessException {
Class<?> awsCredentialsProviderClass = Class.forName(credentialProviderName);
AWSCredentialsProvider awsCredentialsProvider =
(AWSCredentialsProvider) awsCredentialsProviderClass.getConstructor().newInstance();
AmazonDynamoDBClient client = new AmazonDynamoDBClient(awsCredentialsProvider);
client.setEndpoint(endpoint);
return client;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.dynamodbcommitstore;

/**
* Defines the field names used in the DynamoDB table entry.
*/
final class DynamoDBTableEntryConstants {
private DynamoDBTableEntryConstants() {}

/** The primary key of the DynamoDB table. */
public static final String TABLE_ID = "tableId";
/** The version of the latest commit in the corresponding Delta table. */
public static final String TABLE_LATEST_VERSION = "tableVersion";
/** The inCommitTimestamp of the latest commit in the corresponding Delta table. */
public static final String TABLE_LATEST_TIMESTAMP = "tableTimestamp";
/** Whether this commit owner is accepting more commits for the corresponding Delta table. */
public static final String ACCEPTING_COMMITS = "acceptingCommits";
/** The path of the corresponding Delta table. */
public static final String TABLE_PATH = "path";
/** The schema version of this DynamoDB table entry. */
public static final String SCHEMA_VERSION = "schemaVersion";
/** The name of the field used to store unbackfilled commits. */
public static final String COMMITS = "commits";
/** The unbackfilled commit version. */
public static final String COMMIT_VERSION = "version";
/** The inCommitTimestamp of the unbackfilled commit. */
public static final String COMMIT_TIMESTAMP = "timestamp";
/** The name of the unbackfilled file. e.g. 00001.uuid.json */
public static final String COMMIT_FILE_NAME = "fsName";
/** The length of the unbackfilled file as per the file status. */
public static final String COMMIT_FILE_LENGTH = "fsLength";
/** The modification timestamp of the unbackfilled file as per the file status. */
public static final String COMMIT_FILE_MODIFICATION_TIMESTAMP = "fsTimestamp";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.dynamodbcommitstore;

import org.apache.spark.sql.delta.managedcommit.AbstractMetadata;
import org.apache.spark.sql.delta.managedcommit.UpdatedActions;
import org.apache.hadoop.fs.Path;

import java.util.UUID;

public class ManagedCommitUtils {

private ManagedCommitUtils() {}

/** The subdirectory in which to store the unbackfilled commit files. */
final static String COMMIT_SUBDIR = "_commits";

/** The configuration key for the managed commit owner. */
private static final String MANAGED_COMMIT_OWNER_CONF_KEY =
"delta.managedCommits.commitOwner-dev";

/**
* Creates a new unbackfilled delta file path for the given commit version.
* The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`.
*/
public static Path generateUnbackfilledDeltaFilePath(
Path logPath,
long version) {
String uuid = UUID.randomUUID().toString();
Path basePath = new Path(logPath, COMMIT_SUBDIR);
return new Path(basePath, String.format("%020d.%s.json", version, uuid));
}

/**
* Returns the path to the backfilled delta file for the given commit version.
* The path is of the form `tablePath/_delta_log/00000000000000000001.json`.
*/
public static Path getBackfilledDeltaFilePath(
Path logPath,
Long version) {
return new Path(logPath, String.format("%020d.json", version));
}

private static String getManagedCommitOwner(AbstractMetadata metadata) {
return metadata
.getConfiguration()
.get(MANAGED_COMMIT_OWNER_CONF_KEY)
.getOrElse(() -> "");
}

/**
* Returns true if the commit is a managed commit to filesystem conversion.
*/
public static boolean isManagedCommitToFSConversion(
Long commitVersion,
UpdatedActions updatedActions) {
boolean oldMetadataHasManagedCommits =
getManagedCommitOwner(updatedActions.getOldMetadata()).isEmpty();
boolean newMetadataHasManagedCommits =
getManagedCommitOwner(updatedActions.getNewMetadata()).isEmpty();
return oldMetadataHasManagedCommits && !newMetadataHasManagedCommits && commitVersion > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class Commit(
case class CommitFailedException(
private val retryable: Boolean,
private val conflict: Boolean,
private val message: String) extends Exception(message) {
private val message: String) extends RuntimeException(message) {
def getRetryable: Boolean = retryable
def getConflict: Boolean = conflict
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ import java.io.File
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.duration._

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.storage.{LogStore, LogStoreProvider}
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.threads.DeltaThreadPool
import io.delta.dynamodbcommitstore.DynamoDBCommitOwnerClient
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

trait CommitOwnerClientImplSuiteBase extends QueryTest
with SharedSparkSession
Expand All @@ -50,7 +54,7 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
* of backfill registration.
*/
protected def registerBackfillOp(
commitOwnerClient: CommitOwnerClient,
tableCommitOwnerClient: TableCommitOwnerClient,
deltaLog: DeltaLog,
version: Long): Unit

Expand Down Expand Up @@ -90,11 +94,7 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
tableCommitOwnerClient: TableCommitOwnerClient,
commitTimestampsOpt: Option[Array[Long]] = None): Unit = {
val maxUntrackedVersion: Int = {
val commitResponse =
tableCommitOwnerClient.commitOwnerClient.getCommits(
logPath,
tableCommitOwnerClient.tableConf
)
val commitResponse = tableCommitOwnerClient.getCommits()
if (commitResponse.getCommits.isEmpty) {
commitResponse.getLatestTableVersion.toInt
} else {
Expand Down Expand Up @@ -198,15 +198,19 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
val log = DeltaLog.forTable(spark, tempDir.toString)
val logPath = log.logPath
val tableCommitOwnerClient = createTableCommitOwnerClient(log)
tableCommitOwnerClient.commitOwnerClient.registerTable(
logPath, currentVersion = -1L, initMetadata, Protocol(1, 1))

val e = intercept[CommitFailedException] {
commit(version = 0, timestamp = 0, tableCommitOwnerClient)
}
assert(e.getMessage === "Commit version 0 must go via filesystem.")
writeCommitZero(logPath)
assert(tableCommitOwnerClient.getCommits() == GetCommitsResponse(Seq.empty, -1))
var expectedVersion = -1
if (tableCommitOwnerClient.commitOwnerClient.isInstanceOf[DynamoDBCommitOwnerClient]) {
// DynamoDBCommitOwnerClient stores attemptVersion as the current table version when
// registerTable is called.
expectedVersion = 0
}
assert(tableCommitOwnerClient.getCommits() == GetCommitsResponse(Seq.empty, expectedVersion))
assertBackfilled(version = 0, logPath, Some(0L))

// Test backfilling functionality for commits 1 - 8
Expand All @@ -218,7 +222,7 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest

// Test that out-of-order backfill is rejected
intercept[IllegalArgumentException] {
registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 10)
registerBackfillOp(tableCommitOwnerClient, log, 10)
}
assertInvariants(logPath, tableCommitOwnerClient)
}
Expand All @@ -239,9 +243,6 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
val log = DeltaLog.forTable(spark, tempDir.toString)
val logPath = log.logPath
val tableCommitOwnerClient = createTableCommitOwnerClient(log)
tableCommitOwnerClient.commitOwnerClient.registerTable(
logPath, currentVersion = -1L, initMetadata, Protocol(1, 1)
)
writeCommitZero(logPath)
val maxVersion = 15
(1 to maxVersion).foreach { version =>
Expand All @@ -261,22 +262,17 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
val log = DeltaLog.forTable(spark, tempDir.getPath)
val logPath = log.logPath
val tableCommitOwnerClient = createTableCommitOwnerClient(log)
intercept[IllegalArgumentException] {
registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 0)
}
tableCommitOwnerClient.commitOwnerClient.registerTable(
logPath, currentVersion = -1L, initMetadata, Protocol(1, 1))
// commit-0 must be file system based
writeCommitZero(logPath)
(1 to 3).foreach(i => commit(i, i, tableCommitOwnerClient))

// Test that backfilling is idempotent for already-backfilled commits.
registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 2)
registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 2)
registerBackfillOp(tableCommitOwnerClient, log, 2)
registerBackfillOp(tableCommitOwnerClient, log, 2)

// Test that backfilling uncommited commits fail.
intercept[IllegalArgumentException] {
registerBackfillOp(tableCommitOwnerClient.commitOwnerClient, log, 4)
registerBackfillOp(tableCommitOwnerClient, log, 4)
}
}
}
Expand All @@ -286,8 +282,6 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
val log = DeltaLog.forTable(spark, tempDir.toString)
val logPath = log.logPath
val tableCommitOwnerClient = createTableCommitOwnerClient(log)
tableCommitOwnerClient.commitOwnerClient.registerTable(
logPath, currentVersion = -1L, initMetadata, Protocol(1, 1))

// commit-0 must be file system based
writeCommitZero(logPath)
Expand Down Expand Up @@ -316,20 +310,21 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
val numberOfWriters = 11
val numberOfCommitsPerWriter = 11
// scalastyle:off sparkThreadPools
val executor = Executors.newFixedThreadPool(numberOfWriters)
val executor = DeltaThreadPool("commitOwnerSuite", numberOfWriters)
// scalastyle:on sparkThreadPools
val runningTimestamp = new AtomicInteger(0)
val commitFailedExceptions = new AtomicInteger(0)
val totalCommits = numberOfWriters * numberOfCommitsPerWriter
val commitTimestamp: Array[Long] = new Array[Long](totalCommits)
val commitTimestamp: Array[Long] = new Array[Long](totalCommits + 1)
// commit-0 must be file system based
writeCommitZero(logPath)

try {
(0 until numberOfWriters).foreach { i =>
executor.submit(new Runnable {
override def run(): Unit = {
val tasks = (0 until numberOfWriters).map { i =>
executor.submit(spark) {
var currentWriterCommits = 0
while (currentWriterCommits < numberOfCommitsPerWriter) {
val nextVersion = tcs.getCommits().getLatestTableVersion + 1
val nextVersion = math.max(tcs.getCommits().getLatestTableVersion + 1, 1)
try {
val currentTimestamp = runningTimestamp.getAndIncrement()
val commitResponse = commit(nextVersion, currentTimestamp, tcs)
Expand All @@ -348,11 +343,8 @@ trait CommitOwnerClientImplSuiteBase extends QueryTest
}
}
}
})
}

executor.shutdown()
executor.awaitTermination(15, TimeUnit.SECONDS)
tasks.foreach(ThreadUtils.awaitResult(_, 15.seconds))
} catch {
case e: InterruptedException =>
fail("Test interrupted: " + e.getMessage)
Expand Down
Loading

0 comments on commit 99bc634

Please sign in to comment.