Skip to content

Commit

Permalink
bugfix: global transaction hook repeat execute (#5887)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbxyyx authored Nov 11, 2023
1 parent 613959a commit 21af1bc
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 67 deletions.
4 changes: 3 additions & 1 deletion changes/en-us/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] A brief and accurate description of PR

### bugfix:
- [[#5887](https://github.com/seata/seata/pull/5887)] fix global transaction hook repeat execute
- [[#5991](https://github.com/seata/seata/pull/5991)] fix the issue that the Lua script is not synchronized when the redis sentinel master node is down

### optimize:
Expand All @@ -21,6 +22,7 @@ Thanks to these contributors for their code commits. Please report an unintended

<!-- Please make sure your Github ID is in the list below -->
- [slievrly](https://github.com/slievrly)
- [GitHub_ID](https://github.com/GitHub_ID)
- [jsbxyyx](https://github.com/jsbxyyx)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
4 changes: 2 additions & 2 deletions changes/zh-cn/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 准确简要的PR描述

### bugfix:
- [[#5887](https://github.com/seata/seata/pull/5887)] 修复全局事务钩子重复执行
- [[#5991](https://github.com/seata/seata/pull/5991)] 修复redis sentinel master node 宕机时,lua脚本未同步的问题

### optimize:
Expand All @@ -21,7 +22,6 @@

<!-- 请确保您的 GitHub ID 在以下列表中 -->
- [slievrly](https://github.com/slievrly)
- [GitHub_ID](https://github.com/GitHub_ID)

- [jsbxyyx](https://github.com/jsbxyyx)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ public void recordStateMachineFinished(StateMachineInstance machineInstance, Pro

protected void reportTransactionFinished(StateMachineInstance machineInstance, ProcessContext context) {
if (sagaTransactionalTemplate != null) {
GlobalTransaction globalTransaction = null;
try {
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
globalTransaction = getGlobalTransaction(machineInstance, context);
if (globalTransaction == null) {

throw new EngineExecutionException("Global transaction is not exists",
FrameworkErrorCode.ObjectNotExists);
}
Expand Down Expand Up @@ -234,7 +234,7 @@ protected void reportTransactionFinished(StateMachineInstance machineInstance, P
// clear
RootContext.unbind();
RootContext.unbindBranchType();
sagaTransactionalTemplate.triggerAfterCompletion();
sagaTransactionalTemplate.triggerAfterCompletion(globalTransaction);
sagaTransactionalTemplate.cleanUp();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.seata.tm.TMClient;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;
import io.seata.tm.api.GlobalTransactionRole;
import io.seata.tm.api.TransactionalExecutor;
import io.seata.tm.api.TransactionalExecutor.ExecutionException;
import io.seata.tm.api.transaction.TransactionHook;
Expand Down Expand Up @@ -63,9 +64,9 @@ public class DefaultSagaTransactionalTemplate
@Override
public void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
triggerBeforeCommit(tx);
tx.commit();
triggerAfterCommit();
triggerAfterCommit(tx);
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
Expand All @@ -75,19 +76,19 @@ public void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor
@Override
public void rollbackTransaction(GlobalTransaction tx, Throwable ex)
throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
triggerBeforeRollback(tx);
tx.rollback();
triggerAfterRollback();
triggerAfterRollback(tx);
// Successfully rolled back
}

@Override
public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws TransactionalExecutor.ExecutionException {
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
triggerBeforeBegin();
triggerBeforeBegin(tx);
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
triggerAfterBegin(tx);
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);

Expand All @@ -105,7 +106,7 @@ public void reportTransaction(GlobalTransaction tx, GlobalStatus globalStatus)
throws TransactionalExecutor.ExecutionException {
try {
tx.globalReport(globalStatus);
triggerAfterCompletion();
triggerAfterCompletion(tx);
} catch (TransactionException txe) {

throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.ReportFailure);
Expand All @@ -125,73 +126,87 @@ public void branchReport(String xid, long branchId, BranchStatus status, String
DefaultResourceManager.get().branchReport(BranchType.SAGA, xid, branchId, status, applicationData);
}

protected void triggerBeforeBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) {
LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);
protected void triggerBeforeBegin(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) {
LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);
}
}
}
}

protected void triggerAfterBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {} ", e.getMessage(), e);
protected void triggerAfterBegin(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {} ", e.getMessage(), e);
}
}
}
}

protected void triggerBeforeRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeRollback();
} catch (Exception e) {
LOGGER.error("Failed execute beforeRollback in hook {} ", e.getMessage(), e);
protected void triggerBeforeRollback(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeRollback();
} catch (Exception e) {
LOGGER.error("Failed execute beforeRollback in hook {} ", e.getMessage(), e);
}
}
}
}

protected void triggerAfterRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterRollback();
} catch (Exception e) {
LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e);
protected void triggerAfterRollback(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterRollback();
} catch (Exception e) {
LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e);
}
}
}
}

protected void triggerBeforeCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeCommit();
} catch (Exception e) {
LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e);
protected void triggerBeforeCommit(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeCommit();
} catch (Exception e) {
LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e);
}
}
}
}

protected void triggerAfterCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
} catch (Exception e) {
LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e);
protected void triggerAfterCommit(GlobalTransaction tx) {
if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
} catch (Exception e) {
LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e);
}
}
}
}

@Override
public void triggerAfterCompletion() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
public void triggerAfterCompletion(GlobalTransaction tx) {
if (tx == null || tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ long branchRegister(String resourceId, String clientId, String xid, String appli
void branchReport(String xid, long branchId, BranchStatus status, String applicationData)
throws TransactionException;

void triggerAfterCompletion();
void triggerAfterCompletion(GlobalTransaction tx);

void cleanUp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void branchReport(String xid, long branchId, BranchStatus status, String
}

@Override
public void triggerAfterCompletion() {
public void triggerAfterCompletion(GlobalTransaction tx) {

}

Expand Down
40 changes: 30 additions & 10 deletions tm/src/main/java/io/seata/tm/api/TransactionalTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Object execute(TransactionalExecutor business) throws Throwable {
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
triggerAfterCompletion(tx);
cleanUp();
}
} finally {
Expand Down Expand Up @@ -200,6 +200,12 @@ private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTran

private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
throws TransactionalExecutor.ExecutionException, TransactionException {
if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore commit: just involved in global transaction [{}]", tx.getXid());
}
return;
}
if (isTimeout(tx.getCreateTime(), txInfo)) {
// business execution timeout
Exception exx = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
Expand Down Expand Up @@ -236,7 +242,7 @@ private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
if (null != statusException) {
throw new TransactionalExecutor.ExecutionException(tx, statusException, code);
}
triggerAfterCommit();
triggerAfterCommit(tx);
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
Expand All @@ -245,7 +251,12 @@ private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
}

private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {

if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore rollback: just involved in global transaction [{}]", tx.getXid());
}
return;
}
try {
triggerBeforeRollback();
tx.rollback();
Expand Down Expand Up @@ -285,6 +296,12 @@ private void rollbackTransaction(GlobalTransaction tx, Throwable originalExcepti
}

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore begin: just involved in global transaction [{}]", tx.getXid());
}
return;
}
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
Expand Down Expand Up @@ -346,7 +363,7 @@ private void triggerBeforeCommit() {
}
}

private void triggerAfterCommit() {
private void triggerAfterCommit(GlobalTransaction tx) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
Expand All @@ -356,14 +373,17 @@ private void triggerAfterCommit() {
}
}

private void triggerAfterCompletion() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
private void triggerAfterCompletion(GlobalTransaction tx) {
if (tx == null || tx.getGlobalTransactionRole() == GlobalTransactionRole.Launcher) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
}
}
}

}

private void cleanUp() {
Expand Down

0 comments on commit 21af1bc

Please sign in to comment.