Skip to content

Commit

Permalink
Log transaction state for CONCURRENT_TRANSACTIONS debugging (streamna…
Browse files Browse the repository at this point in the history
…tive#39)

* Log transaction state for CONCURRENT_TRANSACTIONS debugging

* Make checkstyle pass
  • Loading branch information
michaeljmarshall authored Jan 5, 2023
1 parent 27e26ee commit 55994d8
Showing 1 changed file with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ private void completeInitProducer(String transactionalId,
if (errors != Errors.NONE) {
responseCallback.accept(initTransactionError(errors));
} else {
log.info("CONCURRENT_TRANSACTIONS: completeInitProducer callback with Errors.NONE");
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} "
+ "completeInitProducer callback with Errors.NONE",
transactionalId, newMetadata.getProducerId(), newMetadata.getProducerEpoch());
responseCallback.accept(initTransactionError(Errors.CONCURRENT_TRANSACTIONS));
}
});
Expand Down Expand Up @@ -405,7 +407,9 @@ private CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>> prepareIni
Optional<ProducerIdAndEpoch> expectedProducerIdAndEpoch) {
CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>> resultFuture = new CompletableFuture<>();
if (txnMetadata.pendingTransitionInProgress()) {
log.info("CONCURRENT_TRANSACTIONS: prepareInitProducerIdTransit failed because pendingState == {}",
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} "
+ "prepareInitProducerIdTransit failed. pendingState == {}",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(), txnMetadata.getProducerEpoch(),
txnMetadata.getPendingState());
// return a retriable exception to let the client backoff and retry
resultFuture.complete(Either.left(Errors.CONCURRENT_TRANSACTIONS));
Expand All @@ -421,8 +425,10 @@ private CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>> prepareIni
switch (txnMetadata.getState()) {
case PREPARE_ABORT:
case PREPARE_COMMIT:
log.info("CONCURRENT_TRANSACTIONS: prepareInitProducerIdTransit failed because state " +
"== PREPARE_COMMIT");
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} "
+ "prepareInitProducerIdTransit failed. state == PREPARE_COMMIT",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(),
txnMetadata.getProducerEpoch());
// reply to client and let it backoff and retry
resultFuture.complete(Either.left(Errors.CONCURRENT_TRANSACTIONS));
break;
Expand Down Expand Up @@ -511,12 +517,16 @@ public void handleAddPartitionsToTransaction(String transactionalId,
} else if (txnMetadata.getProducerEpoch() != producerEpoch) {
return Either.left(producerEpochFenceErrors());
} else if (txnMetadata.getPendingState().isPresent()) {
log.info("CONCURRENT_TRANSACTIONS: handleAddPartitionsToTransaction failed because pendingState == {}",
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} "
+ "handleAddPartitionsToTransaction failed. pendingState == {}",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(), txnMetadata.getProducerEpoch(),
txnMetadata.getPendingState());
// return a retriable exception to let the client backoff and retry
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else if (txnMetadata.getState() == PREPARE_COMMIT || txnMetadata.getState() == PREPARE_ABORT) {
log.info("CONCURRENT_TRANSACTIONS: handleAddPartitionsToTransaction failed because state == {}",
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {}"
+ "handleAddPartitionsToTransaction failed. state == {}",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(), txnMetadata.getProducerEpoch(),
txnMetadata.getState());
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else if (txnMetadata.getState() == ONGOING
Expand Down Expand Up @@ -670,7 +680,7 @@ private Either<Errors, TxnTransitMetadata> endTxnPreAppend(
}
if (txnMetadata.getPendingState().isPresent()
&& txnMetadata.getPendingState().get() != PREPARE_EPOCH_FENCE) {
log.info("CONCURRENT_TRANSACTIONS: endTxnPreAppend failed because pendingState == {}",
log.info("CONCURRENT_TRANSACTIONS: {} {} endTxnPreAppend failed. pendingState == {}",
txnMetadata.getPendingState());
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
}
Expand All @@ -694,11 +704,15 @@ private Either<Errors, TxnTransitMetadata> endTxnByStatus(String transactionalId
return Either.left(getPreEndTxnErrors(txnMarkerResult, TransactionResult.ABORT, Errors.NONE,
transactionalId, txnMetadata));
case PREPARE_COMMIT:
log.info("CONCURRENT_TRANSACTIONS: endTxnByStatus failed because state == PREPARE_COMMIT");
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} endTxnByStatus failed. "
+ "state == PREPARE_COMMIT",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(), txnMetadata.getProducerEpoch());
return Either.left(getPreEndTxnErrors(txnMarkerResult, TransactionResult.COMMIT,
Errors.CONCURRENT_TRANSACTIONS, transactionalId, txnMetadata));
case PREPARE_ABORT:
log.info("CONCURRENT_TRANSACTIONS: endTxnByStatus failed because state == PREPARE_ABORT");
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} endTxnByStatus failed. "
+ "state == PREPARE_ABORT",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(), txnMetadata.getProducerEpoch());
return Either.left(getPreEndTxnErrors(txnMarkerResult, TransactionResult.ABORT,
Errors.CONCURRENT_TRANSACTIONS, transactionalId, txnMetadata));
case EMPTY:
Expand Down Expand Up @@ -777,8 +791,10 @@ private void completeEndTxn(String transactionalId,
} else if (txnMetadata.getProducerEpoch() != producerEpoch) {
return Either.left(producerEpochFenceErrors());
} else if (txnMetadata.getPendingState().isPresent()) {
log.info("CONCURRENT_TRANSACTIONS: completeEndTxn failed because pendingState == {}",
txnMetadata.getPendingState());
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} completeEndTxn "
+ "failed. pendingState == {}",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(),
txnMetadata.getProducerEpoch(), txnMetadata.getPendingState());
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else {
switch (txnMetadata.getState()) {
Expand Down Expand Up @@ -874,8 +890,10 @@ protected void abortTimedOutTransactions(
+ "is a pending state transition",
txnIdAndPidEpoch.getTransactionalId());
}
log.info("CONCURRENT_TRANSACTIONS: abortTimedOutTransactions failed because " +
"pendingState == {}", txnMetadata.getPendingState());
log.info("CONCURRENT_TRANSACTIONS: txnId: {} producerId: {} producerEpoch: {} "
+ "abortTimedOutTransactions failed. pendingState == {}",
txnMetadata.getTransactionalId(), txnMetadata.getProducerId(),
txnMetadata.getProducerEpoch(), txnMetadata.getPendingState());
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else {
return Either.right(txnMetadata.prepareFenceProducerEpoch());
Expand Down

0 comments on commit 55994d8

Please sign in to comment.