diff --git a/src/DtmMongoBarrier/MongoBranchBarrier.cs b/src/DtmMongoBarrier/MongoBranchBarrier.cs index 57f78c7..98bb03f 100644 --- a/src/DtmMongoBarrier/MongoBranchBarrier.cs +++ b/src/DtmMongoBarrier/MongoBranchBarrier.cs @@ -41,7 +41,7 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< if (isNullCompensation || isDuplicateOrPend) { - bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); + bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); await session.CommitTransactionAsync(); return; } @@ -60,19 +60,71 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< } } + public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func> busiCall) + { + bb.BarrierID = bb.BarrierID + 1; + var bid = bb.BarrierID.ToString().PadLeft(2, '0'); + + var session = await mc.StartSessionAsync(); + + session.StartTransaction(); + + try + { + var originOp = Constant.Barrier.OpDict.TryGetValue(bb.Op, out var ot) ? ot : string.Empty; + + var (originAffected, oEx) = await MongoInsertBarrier(bb, session, bb.BranchID, originOp, bid, bb.Op); + var (currentAffected, rEx) = await MongoInsertBarrier(bb, session, bb.BranchID, bb.Op, bid, bb.Op); + + bb?.Logger?.LogDebug("mongo originAffected: {originAffected} currentAffected: {currentAffected}", originAffected, currentAffected); + + if (bb.IsMsgRejected(rEx?.Message, bb.Op, currentAffected)) + throw new DtmDuplicatedException(); + + if (oEx != null || rEx != null) + { + throw oEx ?? rEx; + } + + var isNullCompensation = bb.IsNullCompensation(bb.Op, originAffected); + var isDuplicateOrPend = bb.IsDuplicateOrPend(currentAffected); + + if (isNullCompensation || isDuplicateOrPend) + { + bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); + await session.CommitTransactionAsync(); + return; + } + + var autoCommit = await busiCall.Invoke(session); + if (autoCommit) + { + await session.CommitTransactionAsync(); + } + } + catch (Exception ex) + { + bb?.Logger?.LogError(ex, "Mongo Call error, gid={gid}, trans_type={trans_type}", bb.Gid, bb.TransType); + + await session.AbortTransactionAsync(); + + throw; + } + } + public static async Task MongoQueryPrepared(this BranchBarrier bb, IMongoClient mc) { var session = await mc.StartSessionAsync(); try { - await MongoInsertBarrier( - bb, - session, - Constant.Barrier.MSG_BRANCHID, - Constant.TYPE_MSG, - Constant.Barrier.MSG_BARRIER_ID, - Constant.Barrier.MSG_BARRIER_REASON); + await MongoInsertBarrier( + bb, + session, + Constant.Barrier.MSG_BRANCHID, + Constant.TYPE_MSG, + Constant.Barrier.MSG_BARRIER_ID, + Constant.Barrier.MSG_BARRIER_REASON); } catch (Exception ex) { @@ -81,7 +133,7 @@ await MongoInsertBarrier( } var reason = string.Empty; - + try { var barrier = session.Client.GetDatabase(bb.DtmOptions.BarrierMongoDbName) @@ -155,7 +207,7 @@ await barrier.InsertOneAsync(new DtmBarrierDocument } private static FilterDefinition BuildFilters(string gid, string branchId, string op, string barrierId) - { + { return new FilterDefinitionBuilder().And( Builders.Filter.Eq(x => x.GId, gid), Builders.Filter.Eq(x => x.BranchId, branchId),