Skip to content

Commit

Permalink
fix(spanner): support transaction tags in partition DML
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul2393 committed Nov 14, 2024
1 parent 0a46070 commit 384a940
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;
import javax.annotation.Nullable;

/**
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
Expand Down Expand Up @@ -56,4 +57,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
throw new UnsupportedOperationException();
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;
import javax.annotation.Nullable;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -601,4 +602,21 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt, UpdateOption... options);

/**
* Executes a Partitioned DML statement with the specified transaction tag.
*
* <p>This method has the same behavior as {@link #executePartitionedUpdate(Statement,
* UpdateOption...)} but allows specifying a transaction tag that will be applied to all
* partitioned operations.
*
* @param stmt The Partitioned DML statement to execute
* @param transactionTag The transaction tag to apply to all partitioned operations. The tag must
* be a printable string (ASCII 32-126) with maximum length of 50 characters.
* @param options The options to use for the update operation
* @return The total number of rows modified by the statement
* @throws SpannerException if the operation failed
*/
long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,21 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
return executePartitionedUpdateWithOptions(stmt, null, options);
}

@Override
public long executePartitionedUpdate(
final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) {
return executePartitionedUpdateWithOptions(stmt, transactionTag, options);
}

private long executePartitionedUpdateWithOptions(
final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
return runWithSessionRetry(
session -> session.executePartitionedUpdate(stmt, transactionTag, options));
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;

Expand All @@ -54,13 +55,16 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
private final SessionImpl session;
private final SpannerRpc rpc;
private final Ticker ticker;
private final @Nullable String transactionTag;
private final IsRetryableInternalError isRetryableInternalErrorPredicate;
private volatile boolean isValid = true;

PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) {
PartitionedDmlTransaction(
SessionImpl session, SpannerRpc rpc, Ticker ticker, @Nullable String transactionTag) {
this.session = session;
this.rpc = rpc;
this.ticker = ticker;
this.transactionTag = transactionTag;
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
}

Expand Down Expand Up @@ -194,22 +198,27 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
if (options.hasTag()) {
requestOptionsBuilder.setRequestTag(options.tag());
}
if (transactionTag != null) {
requestOptionsBuilder.setTransactionTag(transactionTag);
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
return builder.build();
}

private ByteString initTransaction(final Options options) {
final BeginTransactionRequest request =
BeginTransactionRequest.Builder builder =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()));

if (transactionTag != null) {
builder.setRequestOptions(
RequestOptions.newBuilder().setTransactionTag(transactionTag).build());
}
Transaction tx = rpc.beginTransaction(builder.build(), session.getOptions(), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,18 @@ public DatabaseId getDatabaseId() {
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker());
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker(), null);
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout(), options);
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(
this, spanner.getRpc(), Ticker.systemTicker(), transactionTag);
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout(), options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,11 @@ default AsyncTransactionManager transactionManagerAsync(TransactionOption... opt
default long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
return get().executePartitionedUpdate(stmt, options);
}

default long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
return get().executePartitionedUpdate(stmt, transactionTag, options);
}
}

class PooledSessionFutureWrapper implements SessionFutureWrapper<PooledSessionFuture> {
Expand Down Expand Up @@ -1494,6 +1499,16 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
}
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options) {
try {
return get(true).executePartitionedUpdate(stmt, transactionTag, options);
} finally {
close();
}
}

@Override
public String getName() {
return get().getName();
Expand Down Expand Up @@ -1709,6 +1724,18 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
}
}

@Override
public long executePartitionedUpdate(
Statement stmt, @Nullable String transactionTag, UpdateOption... options)
throws SpannerException {
try {
markUsed();
return delegate.executePartitionedUpdate(stmt, transactionTag, options);
} catch (SpannerException e) {
throw lastException = e;
}
}

@Override
public ReadContext singleUse() {
return delegate.singleUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,30 @@ public void testPartitionedDMLWithTag() {
assertThat(request.getRequestOptions().getTransactionTag()).isEmpty();
}

@Test
public void testPartitionedDMLWithTransactionTag() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
client.executePartitionedUpdate(
UPDATE_STATEMENT, "testTransactionTag", Options.tag("app=spanner,env=test,action=dml"));

List<BeginTransactionRequest> beginTransactions =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
assertThat(beginTransactions).hasSize(1);
BeginTransactionRequest beginTransaction = beginTransactions.get(0);
assertNotNull(beginTransaction.getOptions());
assertTrue(beginTransaction.getOptions().hasPartitionedDml());
assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams());

List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
assertThat(requests).hasSize(1);
ExecuteSqlRequest request = requests.get(0);
assertNotNull(request.getRequestOptions());
assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("testTransactionTag");
assertThat(request.getRequestOptions().getRequestTag())
.isEqualTo("app=spanner,env=test,action=dml");
}

@Test
public void testCommitWithTag() {
DatabaseClient client =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void setup() {
when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)))
.thenReturn(Transaction.newBuilder().setId(txId).build());

tx = new PartitionedDmlTransaction(session, rpc, ticker);
tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
}

@Test
Expand Down Expand Up @@ -332,7 +332,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
Expand Down Expand Up @@ -371,7 +371,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
Expand Down Expand Up @@ -400,7 +400,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() {
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream1);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null);
SpannerException e =
assertThrows(
SpannerException.class,
Expand Down

0 comments on commit 384a940

Please sign in to comment.