Skip to content

Commit

Permalink
bugfix: fix workflow return null handling
Browse files Browse the repository at this point in the history
- Fix null value handling for execute again
- Add unit test and sample for different workflow return
  • Loading branch information
wooln committed Dec 27, 2024
1 parent 2150dcc commit a121932
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 83 deletions.
81 changes: 2 additions & 79 deletions samples/DtmSample/Controllers/WfTestController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
using Microsoft.Extensions.Options;
using System;
using System.IO;
using System.Diagnostics;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Unicode;
using System.Threading;
using System.Threading.Tasks;
using Exception = System.Exception;

namespace DtmSample.Controllers
{
Expand Down Expand Up @@ -255,84 +257,5 @@ public async Task<IActionResult> TccRollBack(CancellationToken cancellationToken
return Ok(TransResponse.BuildFailureResponse());
}
}


private static readonly string wfNameForResume = "wfNameForResume";

/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[HttpPost("wf-crash")]
public async Task<IActionResult> Crash(CancellationToken cancellationToken)
{
if (!_globalTransaction.Exists(wfNameForResume))
{
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
{
var content = new ByteArrayContent(data);
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");

var outClient = wf.NewBranch().NewRequest();
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);

// the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back
// manual stop application
Environment.Exit(0);

var inClient = wf.NewBranch().NewRequest();
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);

return null;
});
}

var req = JsonSerializer.Serialize(new TransRequest("1", -30));
await _globalTransaction.Execute(wfNameForResume, Guid.NewGuid().ToString("N"), Encoding.UTF8.GetBytes(req), true);

return Ok(TransResponse.BuildSucceedResponse());
}

[HttpPost("wf-resume")]
public async Task<IActionResult> WfResume(CancellationToken cancellationToken)
{
try
{
if (!_globalTransaction.Exists(wfNameForResume))
{
// register again after manual crash by Environment.Exit(0);
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
{
var content = new ByteArrayContent(data);
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");

var outClient = wf.NewBranch().NewRequest();
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);

var inClient = wf.NewBranch().NewRequest();
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);

return null;
});
}

// prepared call ExecuteByQS
using var bodyMemoryStream = new MemoryStream();
await Request.Body.CopyToAsync(bodyMemoryStream, cancellationToken);
byte[] bytes = bodyMemoryStream.ToArray();
string body = Encoding.UTF8.GetString(bytes);
_logger.LogDebug($"body: {body}");

await _globalTransaction.ExecuteByQS(Request.Query, bodyMemoryStream.ToArray());

return Ok(TransResponse.BuildSucceedResponse());
}
catch (Exception ex)
{
_logger.LogError(ex, "Workflow Error");
return Ok(TransResponse.BuildFailureResponse());
}
}
}
}
4 changes: 3 additions & 1 deletion src/Dtmworkflow/Workflow.Imp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ internal async Task<byte[]> Process(WfFunc2 handler, byte[] data)
var status = reply.Transaction.Status;
if (status == DtmCommon.Constant.StatusSucceed)
{
var sRes = Convert.FromBase64String(reply.Transaction.Result);
var sRes = reply.Transaction.Result != null
? Convert.FromBase64String(reply.Transaction.Result)
: null;
return sRes;
}
else if (status == DtmCommon.Constant.StatusFailed)
Expand Down
112 changes: 109 additions & 3 deletions tests/Dtmworkflow.Tests/WorkflowHttpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,114 @@ public async void Commit_Should_Be_Executed()
rollBackFunc.Verify(x => x.Invoke(It.IsAny<BranchBarrier>()), Times.Never);
commitFunc.Verify(x => x.Invoke(It.IsAny<BranchBarrier>()), Times.Once);
}

[Fact]
public async Task Execute_Result_Should_Be_WfFunc2()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusPrepared, null);
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Result_Should_Be_WfFunc2);
var gid = Guid.NewGuid().ToString("N");

wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));

var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);

Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res));
}

[Fact]
public async Task Execute_Result_Should_Be_Previous()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, "return value from previous");
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);

factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);

var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);

var wfName = nameof(Execute_Result_Should_Be_Previous);
var gid = Guid.NewGuid().ToString("N");

wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));

var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);

Assert.Equal("return value from previous", Encoding.UTF8.GetString(res));
}

[Fact]
public async Task Execute_Again_Result_Should_Be_Previous()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient1 = new Mock<IDtmClient>();
var httpClient2 = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

// first
SetupPrepareWorkflow(httpClient1, DtmCommon.Constant.StatusPrepared, null);
var wf = SetupWorkFlow(httpClient1, grpcClient, httpBb);
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfName = nameof(Execute_Again_Result_Should_Be_Previous);
var gid = Guid.NewGuid().ToString("N");
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res));

// again
SetupPrepareWorkflow(httpClient2, DtmCommon.Constant.StatusSucceed, "return value from previous");
wf = SetupWorkFlow(httpClient2, grpcClient, httpBb);
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
gid = Guid.NewGuid().ToString("N");
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
Assert.Equal("return value from previous", Encoding.UTF8.GetString(res));
}

[Fact]
public async Task Execute_Again_Result_StringEmpty()
{
var factory = new Mock<IWorkflowFactory>();
var httpClient = new Mock<IDtmClient>();
var grpcClient = new Mock<IDtmgRPCClient>();
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();

// again
SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, null);
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
var wfName = nameof(Execute_Again_Result_StringEmpty);
var gid = Guid.NewGuid().ToString("N");
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
Assert.Null(res);
}

private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, string result, List<DtmProgressDto> progressDtos = null)
private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, string? result, List<DtmProgressDto> progressDtos = null)

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on windows-latest

Cannot convert null literal to non-nullable reference type.

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on windows-latest

Cannot convert null literal to non-nullable reference type.

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on ubuntu-latest

Cannot convert null literal to non-nullable reference type.

Check warning on line 365 in tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

View workflow job for this annotation

GitHub Actions / build on ubuntu-latest

Cannot convert null literal to non-nullable reference type.
{
var httpResp = new HttpResponseMessage(HttpStatusCode.OK);
httpResp.Content = new StringContent(JsonSerializer.Serialize(
Expand All @@ -265,9 +371,9 @@ private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, st
Transaction = new DtmTransactionDto
{
Status = status,
Result = Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
Result = result == null ? null : Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
},
Progresses = progressDtos
Progresses = progressDtos ?? []
}));
httpClient.Setup(x => x.PrepareWorkflow(It.IsAny<DtmCommon.TransBase>(), It.IsAny<CancellationToken>())).Returns(Task.FromResult(httpResp));
}
Expand Down

0 comments on commit a121932

Please sign in to comment.