diff --git a/samples/DtmSample/Controllers/WfTestController.cs b/samples/DtmSample/Controllers/WfTestController.cs index 752f0ac..614e969 100644 --- a/samples/DtmSample/Controllers/WfTestController.cs +++ b/samples/DtmSample/Controllers/WfTestController.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Options; using System; using System.IO; +using System.Diagnostics; using System.Net.Http; using System.Net.Http.Headers; using System.Text; @@ -14,6 +15,7 @@ using System.Text.Unicode; using System.Threading; using System.Threading.Tasks; +using Exception = System.Exception; namespace DtmSample.Controllers { @@ -255,84 +257,5 @@ public async Task TccRollBack(CancellationToken cancellationToken return Ok(TransResponse.BuildFailureResponse()); } } - - - private static readonly string wfNameForResume = "wfNameForResume"; - - /// - /// - /// - /// - /// - [HttpPost("wf-crash")] - public async Task Crash(CancellationToken cancellationToken) - { - if (!_globalTransaction.Exists(wfNameForResume)) - { - _globalTransaction.Register(wfNameForResume, async (wf, data) => - { - var content = new ByteArrayContent(data); - content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); - - var outClient = wf.NewBranch().NewRequest(); - await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content); - - // the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back - // manual stop application - Environment.Exit(0); - - var inClient = wf.NewBranch().NewRequest(); - await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content); - - return null; - }); - } - - var req = JsonSerializer.Serialize(new TransRequest("1", -30)); - await _globalTransaction.Execute(wfNameForResume, Guid.NewGuid().ToString("N"), Encoding.UTF8.GetBytes(req), true); - - return Ok(TransResponse.BuildSucceedResponse()); - } - - [HttpPost("wf-resume")] - public async Task WfResume(CancellationToken cancellationToken) - { - try - { - if (!_globalTransaction.Exists(wfNameForResume)) - { - // register again after manual crash by Environment.Exit(0); - _globalTransaction.Register(wfNameForResume, async (wf, data) => - { - var content = new ByteArrayContent(data); - content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); - - var outClient = wf.NewBranch().NewRequest(); - await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content); - - var inClient = wf.NewBranch().NewRequest(); - await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content); - - return null; - }); - } - - // prepared call ExecuteByQS - using var bodyMemoryStream = new MemoryStream(); - await Request.Body.CopyToAsync(bodyMemoryStream, cancellationToken); - byte[] bytes = bodyMemoryStream.ToArray(); - string body = Encoding.UTF8.GetString(bytes); - _logger.LogDebug($"body: {body}"); - - await _globalTransaction.ExecuteByQS(Request.Query, bodyMemoryStream.ToArray()); - - return Ok(TransResponse.BuildSucceedResponse()); - } - catch (Exception ex) - { - _logger.LogError(ex, "Workflow Error"); - return Ok(TransResponse.BuildFailureResponse()); - } - } } } diff --git a/src/Dtmworkflow/Workflow.Imp.cs b/src/Dtmworkflow/Workflow.Imp.cs index 669e24f..1febed0 100644 --- a/src/Dtmworkflow/Workflow.Imp.cs +++ b/src/Dtmworkflow/Workflow.Imp.cs @@ -39,7 +39,9 @@ internal async Task Process(WfFunc2 handler, byte[] data) var status = reply.Transaction.Status; if (status == DtmCommon.Constant.StatusSucceed) { - var sRes = Convert.FromBase64String(reply.Transaction.Result); + var sRes = reply.Transaction.Result != null + ? Convert.FromBase64String(reply.Transaction.Result) + : null; return sRes; } else if (status == DtmCommon.Constant.StatusFailed) diff --git a/tests/Dtmworkflow.Tests/WorkflowHttpTests.cs b/tests/Dtmworkflow.Tests/WorkflowHttpTests.cs index 4f77140..83adf18 100644 --- a/tests/Dtmworkflow.Tests/WorkflowHttpTests.cs +++ b/tests/Dtmworkflow.Tests/WorkflowHttpTests.cs @@ -255,8 +255,114 @@ public async void Commit_Should_Be_Executed() rollBackFunc.Verify(x => x.Invoke(It.IsAny()), Times.Never); commitFunc.Verify(x => x.Invoke(It.IsAny()), Times.Once); } + + [Fact] + public async Task Execute_Result_Should_Be_WfFunc2() + { + var factory = new Mock(); + var httpClient = new Mock(); + var grpcClient = new Mock(); + var httpBb = new Mock(); + + SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusPrepared, null); + var wf = SetupWorkFlow(httpClient, grpcClient, httpBb); + + factory.Setup(x => x.NewWorkflow(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Returns(wf.Object); + + var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance); + + var wfName = nameof(Execute_Result_Should_Be_WfFunc2); + var gid = Guid.NewGuid().ToString("N"); + + wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2"))); + + var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 }); + var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true); + + Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res)); + } + + [Fact] + public async Task Execute_Result_Should_Be_Previous() + { + var factory = new Mock(); + var httpClient = new Mock(); + var grpcClient = new Mock(); + var httpBb = new Mock(); + + SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, "return value from previous"); + var wf = SetupWorkFlow(httpClient, grpcClient, httpBb); + + factory.Setup(x => x.NewWorkflow(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Returns(wf.Object); + + var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance); + + var wfName = nameof(Execute_Result_Should_Be_Previous); + var gid = Guid.NewGuid().ToString("N"); + + wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2"))); + + var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 }); + var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true); + + Assert.Equal("return value from previous", Encoding.UTF8.GetString(res)); + } + + [Fact] + public async Task Execute_Again_Result_Should_Be_Previous() + { + var factory = new Mock(); + var httpClient1 = new Mock(); + var httpClient2 = new Mock(); + var grpcClient = new Mock(); + var httpBb = new Mock(); + + // first + SetupPrepareWorkflow(httpClient1, DtmCommon.Constant.StatusPrepared, null); + var wf = SetupWorkFlow(httpClient1, grpcClient, httpBb); + factory.Setup(x => x.NewWorkflow(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Returns(wf.Object); + var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance); + var wfName = nameof(Execute_Again_Result_Should_Be_Previous); + var gid = Guid.NewGuid().ToString("N"); + wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2"))); + var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 }); + var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true); + Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res)); + + // again + SetupPrepareWorkflow(httpClient2, DtmCommon.Constant.StatusSucceed, "return value from previous"); + wf = SetupWorkFlow(httpClient2, grpcClient, httpBb); + factory.Setup(x => x.NewWorkflow(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Returns(wf.Object); + wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance); + gid = Guid.NewGuid().ToString("N"); + wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2"))); + req = JsonSerializer.Serialize(new { userId = "1", amount = 30 }); + res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true); + Assert.Equal("return value from previous", Encoding.UTF8.GetString(res)); + } + + [Fact] + public async Task Execute_Again_Result_StringEmpty() + { + var factory = new Mock(); + var httpClient = new Mock(); + var grpcClient = new Mock(); + var httpBb = new Mock(); + + // again + SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, null); + var wf = SetupWorkFlow(httpClient, grpcClient, httpBb); + factory.Setup(x => x.NewWorkflow(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Returns(wf.Object); + var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance); + var wfName = nameof(Execute_Again_Result_StringEmpty); + var gid = Guid.NewGuid().ToString("N"); + wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2"))); + var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 }); + var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true); + Assert.Null(res); + } - private void SetupPrepareWorkflow(Mock httpClient, string status, string result, List progressDtos = null) + private void SetupPrepareWorkflow(Mock httpClient, string status, string? result, List progressDtos = null) { var httpResp = new HttpResponseMessage(HttpStatusCode.OK); httpResp.Content = new StringContent(JsonSerializer.Serialize( @@ -265,9 +371,9 @@ private void SetupPrepareWorkflow(Mock httpClient, string status, st Transaction = new DtmTransactionDto { Status = status, - Result = Convert.ToBase64String(Encoding.UTF8.GetBytes(result)) + Result = result == null ? null : Convert.ToBase64String(Encoding.UTF8.GetBytes(result)) }, - Progresses = progressDtos + Progresses = progressDtos ?? [] })); httpClient.Setup(x => x.PrepareWorkflow(It.IsAny(), It.IsAny())).Returns(Task.FromResult(httpResp)); }