diff --git a/Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs b/Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs index 0d3cd97..2ba7f52 100644 --- a/Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs +++ b/Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; using System.Data; +using System.Data.SqlClient; +using System.Threading; using System.Threading.Tasks; using Dapper; using Moq; @@ -664,6 +666,27 @@ public void QueryMultipleAsync_Retry_Failure() Assert.IsNotNull(_onQueryCompleteEvents[0].Error); Assert.IsNotNull(_onQueryCompleteEvents[1].Error); } + + [Test] + public async Task ExecuteReaderAsync_CancellationToken_Cancelled() + { + var cancellationToken = new CancellationTokenSource(TimeSpan.FromDays(1)); + cancellationToken.Cancel(); + var maxAttemptCount = 2; + Assert.ThrowsAsync(async () => await _db.ExecuteReaderAsync("mobile_ro", "db.v1.sp_foo", 1, + maxAttemptCount, + cancellationToken.Token, + new IDbDataParameter[] + { + new SqlParameter("@param1", "value1"), + new SqlParameter("@param2", "value2") + + }, reader => { return Task.FromResult("");}) + ); + + _dbResources.Verify(x => x.ChooseDb("mobile_ro").UpdateWeight(It.IsAny(), false), Times.Exactly(maxAttemptCount)); + } + protected class FakeStoredProc : IStoredProc { diff --git a/Agoda.Frameworks.DB/DbRepository.cs b/Agoda.Frameworks.DB/DbRepository.cs index 06eddd4..487b5fa 100644 --- a/Agoda.Frameworks.DB/DbRepository.cs +++ b/Agoda.Frameworks.DB/DbRepository.cs @@ -3,6 +3,7 @@ using System.Data; using System.Data.SqlClient; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Agoda.Frameworks.LoadBalancing; using Dapper; @@ -296,6 +297,69 @@ public Task ExecuteReaderAsync( } }, ShouldRetry(maxAttemptCount), RaiseOnError); } + + public Task ExecuteReaderAsync( + string database, + string storedProc, + int timeoutSecs, + int maxAttemptCount, + CancellationToken token, + IDbDataParameter[] parameters, + Func> callback) + { + return _dbResources.ChooseDb(database).ExecuteAsync(async (connectionStr, _) => + { + var stopwatch = Stopwatch.StartNew(); + Exception error = null; + try + { + using (var connection = _generateConnection(connectionStr)) + { + if (connection is SqlConnection sqlConn) + { + await sqlConn.OpenAsync(); + } + else + { + connection.Open(); + } + SqlCommand sqlCommand = null; + try + { + sqlCommand = new SqlCommand(storedProc, connection as SqlConnection) + { + CommandType = CommandType.StoredProcedure, + CommandTimeout = timeoutSecs + }; + sqlCommand.Parameters.AddRange(parameters); + using (var reader = await sqlCommand.ExecuteReaderAsync(token)) + { + return await callback(reader); + } + } + finally + { + if (sqlCommand != null) + { + sqlCommand.Parameters.Clear(); + sqlCommand.Dispose(); + } + } + } + } + catch (Exception e) + { + error = e; + throw; + } + finally + { + stopwatch.Stop(); + RaiseOnExecuteReaderComplete( + database, storedProc, stopwatch.ElapsedMilliseconds, error); + } + }, ShouldRetry(maxAttemptCount), RaiseOnError); + } private IEnumerable QueryImpl( IStoredProc sp, diff --git a/Agoda.Frameworks.DB/IDbRepository.cs b/Agoda.Frameworks.DB/IDbRepository.cs index 888b400..9729254 100644 --- a/Agoda.Frameworks.DB/IDbRepository.cs +++ b/Agoda.Frameworks.DB/IDbRepository.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Data; using System.Data.SqlClient; +using System.Threading; using System.Threading.Tasks; namespace Agoda.Frameworks.DB @@ -26,6 +27,7 @@ Task ExecuteReaderAsync( Func> callback, TimeSpan? timeSpan, string cacheKey = ""); + Task ExecuteScalarAsync( string dbName, string sqlCommandString, @@ -134,6 +136,15 @@ Task ExecuteReaderAsync( IDbDataParameter[] parameters, Func> callback); + Task ExecuteReaderAsync( + string database, + string storedProc, + int timeoutSecs, + int maxAttemptCount, + CancellationToken token, + IDbDataParameter[] parameters, + Func> callback); + event EventHandler OnError; event EventHandler OnQueryComplete; event EventHandler OnExecuteReaderComplete;