Skip to content

Commit

Permalink
Add http xa sample (#71)
Browse files Browse the repository at this point in the history
* add http xa sample

* Add XA pattern comment in README

* Change the DBType of DtmOptions to SqlDbType in the configuration section of the README
  • Loading branch information
JackBOBO authored Jul 15, 2023
1 parent 37e4945 commit c86f875
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 13 deletions.
30 changes: 25 additions & 5 deletions README-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ services.AddDtmcli(x =>
x.BranchTimeout = 10000;

// 子事务屏障的数据库类型, mysql, postgres, sqlserver
x.DBType = "mysql";
x.SqlDbType = "mysql";

// 子事务屏障的数据表名
x.BarrierTableName = "dtm_barrier.barrier";
Expand All @@ -108,7 +108,7 @@ services.AddDtmcli(Configuration, "dtm");
"DtmUrl": "http://localhost:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"SqlDbType": "mysql",
"BarrierTableName": "dtm_barrier.barrier",
}
}
Expand Down Expand Up @@ -265,12 +265,32 @@ public class MyBusi
this._globalTransaction = globalTransaction;
}

public async Task DoBusAsync()
public async Task DoBusAsync(CancellationToken cancellationToken)
{
var svc = "http://localhost:5005";

await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
{
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
// NOTE: XA 模式的限制
// 当前模式仅支持mysql、postgresDB,请修改相应的客户端配置,如SqlDbType等。
// 如使用Mysql并且版本低于8.0需关闭连接池使用
// 调用 XA 子事务
await xa.CallBranch(
// 参数
new TransRequest("1", -30),

// 操作的 URL
svc + "/XaTransOut",

// 取消令牌
cancellationToken);

await xa.CallBranch(
new TransRequest("2", 30),
svc + "/XaTransIn",
cancellationToken);

}, cancellationToken);
}
}
Expand Down
30 changes: 25 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ services.AddDtmcli(x =>
x.BranchTimeout = 10000;

// barrier database type, mysql, postgres, sqlserver
x.DBType = "mysql";
x.SqlDbType = "mysql";

// barrier table name
x.BarrierTableName = "dtm_barrier.barrier";
Expand All @@ -115,7 +115,7 @@ And the configuration file
"DtmUrl": "http://localhost:36789",
"DtmTimeout": 10000,
"BranchTimeout": 10000,
"DBType": "mysql",
"SqlDbType": "mysql",
"BarrierTableName": "dtm_barrier.barrier",
}
}
Expand Down Expand Up @@ -273,12 +273,32 @@ public class MyBusi
this._globalTransaction = globalTransaction;
}

public async Task DoBusAsync()
public async Task DoBusAsync(CancellationToken cancellationToken)
{
var svc = "http://localhost:5005";

await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
{
await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);
await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
// NOTE: Limitations of using Xa mode
// The current mode only supports mysql, postgresDB, please modify the corresponding client configuration, such as SqlDbType, etc.
// Connection pooling needs to be turned off for mysql versions below 8.0
// Create XA sub-transaction
await xa.CallBranch(
// Arguments of action
new TransRequest("1", -30),

// URL of action
svc + "/XaTransOut",

// Cancel token
cancellationToken);

await xa.CallBranch(
new TransRequest("2", 30),
svc + "/XaTransIn",
cancellationToken);

}, cancellationToken);
}
}
Expand Down
49 changes: 46 additions & 3 deletions samples/DtmSample/Controllers/TransController.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
using DtmSample.Dtos;
using Dapper;
using Dtmcli;
using DtmSample.Dtos;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using MySqlConnector;
using System.Threading;
using System.Threading.Tasks;

namespace DtmSample.Controllers
{
Expand All @@ -10,10 +16,12 @@ namespace DtmSample.Controllers
public class TransController : ControllerBase
{
private readonly ILogger<TransController> _logger;
private readonly XaLocalTransaction _xaTrans;

public TransController(ILogger<TransController> logger)
public TransController(ILogger<TransController> logger, XaLocalTransaction xaTrans)
{
_logger = logger;
_xaTrans = xaTrans;
}

#region TCC
Expand Down Expand Up @@ -119,7 +127,42 @@ public IActionResult TransInRevert([FromBody] TransRequest body)
_logger.LogInformation("TransInConfirm, QueryString={0}", Request.QueryString);
_logger.LogInformation("用户: {0},转入 {1} 元---回滚", body.UserId, body.Amount);
return Ok(TransResponse.BuildSucceedResponse());
}
}
#endregion

#region Xa
[HttpPost("XaTransOut")]
public async Task<IActionResult> XaTransOut(CancellationToken token)
{
//todo: Connection pooling needs to be turned off for mysql versions below 8.0
using var conn = new MySqlConnection("Server=en.dtm.pub; Port=3306; User ID=dtm; Password=passwd123dtm; Database=dtm_busi;Pooling=False");
await this._xaTrans.ExcecuteAsync(this.Request.Query, conn, "mysql", async (dbConn, xa) =>
{
var body = await this.Request.ReadFromJsonAsync<TransRequest>();
await dbConn.ExecuteAsync($"UPDATE dtm_busi.user_account SET balance = balance + {body.Amount} where user_id = '{body.UserId}'");
_logger.LogInformation("XaTransOut, QueryString={0}", Request.QueryString);
_logger.LogInformation("用户: {0},转出 {1} 元", body.UserId, body.Amount);
}, token);

return Ok(TransResponse.BuildSucceedResponse());
}

[HttpPost("XaTransIn")]
public async Task<IActionResult> XaTransIn(CancellationToken token)
{
//todo: Connection pooling needs to be turned off for mysql versions below 8.0
using var conn = new MySqlConnection("Server=en.dtm.pub; Port=3306; User ID=dtm; Password=passwd123dtm; Database=dtm_busi;Pooling=False");
await this._xaTrans.ExcecuteAsync(this.Request.Query, conn, "mysql", async (dbConn, xa) =>
{
var body = await this.Request.ReadFromJsonAsync<TransRequest>();
await dbConn.ExecuteAsync($"UPDATE dtm_busi.user_account SET balance = balance + {body.Amount} where user_id = '{body.UserId}'");
_logger.LogInformation("XaTransIn, QueryString={0}", Request.QueryString);
_logger.LogInformation("用户: {0},转入 {1} 元", body.UserId, body.Amount);
}, token);

return Ok(TransResponse.BuildSucceedResponse());
}
#endregion
}
}

92 changes: 92 additions & 0 deletions samples/DtmSample/Controllers/XaTestController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using Dtmcli;
using DtmSample.Dtos;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace DtmSample.Controllers
{
/// <summary>
/// XA 示例
/// </summary>
[ApiController]
[Route("/api")]
public class XaTestController : ControllerBase
{

private readonly ILogger<TccTestController> _logger;
private readonly XaGlobalTransaction _globalTransaction;
private readonly AppSettings _settings;

public XaTestController(ILogger<TccTestController> logger, IOptions<AppSettings> optionsAccs, XaGlobalTransaction transaction)
{
_logger = logger;
_settings = optionsAccs.Value;
_globalTransaction = transaction;
}

/// <summary>
/// Xa 成功提交
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[HttpPost("commit")]
public async Task<IActionResult> Commit(CancellationToken cancellationToken)
{
//todo: Currently only supported by mysql, please modify the appsettings.json
try
{
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
{
//// 用户1 转出30元
var res1 = await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);

//// 用户2 转入30元
var res2 = await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);
}, cancellationToken);

return Ok(TransResponse.BuildSucceedResponse());
}
catch (Exception ex)
{
_logger.LogError(ex, "Xa Error");
return Ok(TransResponse.BuildFailureResponse());
}
}


/// <summary>
/// Xa 失败回滚
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[HttpPost("rollbcak")]
public async Task<IActionResult> Rollbcak(CancellationToken cancellationToken)
{
//todo: Currently only supported by mysql, please modify the appsettings.json
try
{
await _globalTransaction.ExcecuteAsync(async (Xa xa) =>
{
//// 用户1 转出30元
var res1 = await xa.CallBranch(new TransRequest("1", -30), _settings.BusiUrl + "/XaTransOut", cancellationToken);

//// 用户2 转入30元
var res2 = await xa.CallBranch(new TransRequest("2", 30), _settings.BusiUrl + "/XaTransIn", cancellationToken);

throw new Exception("rollbcak");
}, cancellationToken);

return Ok(TransResponse.BuildSucceedResponse());
}
catch (Exception ex)
{
_logger.LogError(ex, "Xa Error");
return Ok(TransResponse.BuildFailureResponse());
}
}
}
}

0 comments on commit c86f875

Please sign in to comment.