Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added ability to schema sql server operations #136

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ internal class SqlServerRepositorySettings
public string ConnectionString { get; set; }

public string DatabaseName { get; set; }
public string Schema { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

internal class SqlServerRepository : IRepository
{
private const string schema = "dbo";
private const int TimeoutSec = 60;
private readonly ConnectionProvider connectionProvider;

private readonly IRetryQueueItemMessageHeaderRepository retryQueueItemMessageHeaderRepository;
private readonly IRetryQueueItemMessageRepository retryQueueItemMessageRepository;
private readonly IRetryQueueItemRepository retryQueueItemRepository;
Expand All @@ -31,7 +31,7 @@ public SqlServerRepository(
string connectionString,
string dbName)
{
this.sqlServerDbSettings = new SqlServerDbSettings(connectionString, dbName);
this.sqlServerDbSettings = new SqlServerDbSettings(connectionString, dbName, schema);

this.RetryQueueDataProvider = new SqlServerDbDataProviderFactory().Create(this.sqlServerDbSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@ public CreateSchemaCreatorTests(BootstrapperRepositoryFixture bootstrapperReposi
}

[Fact]
public async Task SqlServerDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
public async Task PostgresDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
{
var sqlDataProviderFactory = new SqlServerDbDataProviderFactory();
var postgresDataProviderFactory = new PostgresDbDataProviderFactory();

var connectionString = this.bootstrapperRepositoryFixture.SqlServerSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.SqlServerSettings.DatabaseName;
var connectionString = this.bootstrapperRepositoryFixture.PostgresSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.PostgresSettings.DatabaseName;

var sqlSettings = new SqlServerDbSettings(connectionString, databaseName);
var postgresSettings = new PostgresDbSettings(connectionString, databaseName);

var retrySchemaCreator = sqlDataProviderFactory.CreateSchemaCreator(sqlSettings);
var retrySchemaCreator = postgresDataProviderFactory.CreateSchemaCreator(postgresSettings);

await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName);
}

[Fact]
public async Task PostgresDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
public async Task SqlServerDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully()
{
var postgresDataProviderFactory = new PostgresDbDataProviderFactory();
var sqlDataProviderFactory = new SqlServerDbDataProviderFactory();

var connectionString = this.bootstrapperRepositoryFixture.PostgresSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.PostgresSettings.DatabaseName;
var connectionString = this.bootstrapperRepositoryFixture.SqlServerSettings.ConnectionString;
var databaseName = this.bootstrapperRepositoryFixture.SqlServerSettings.DatabaseName;
var schema = this.bootstrapperRepositoryFixture.SqlServerSettings.Schema;

var postgresSettings = new PostgresDbSettings(connectionString, databaseName);
var sqlSettings = new SqlServerDbSettings(connectionString, databaseName, schema);

var retrySchemaCreator = postgresDataProviderFactory.CreateSchemaCreator(postgresSettings);
var retrySchemaCreator = sqlDataProviderFactory.CreateSchemaCreator(sqlSettings);

await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName);
}
Expand Down
3 changes: 2 additions & 1 deletion src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"SqlServerRepository": {
"ConnectionString": "Server=localhost; User ID=SA; Password=SqlSever123123; Pooling=true; Trusted_Connection=true; Integrated Security=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Tests;",
"DatabaseName": "kafka_flow_retry_durable_test"
"DatabaseName": "kafka_flow_retry_durable_test",
"Schema": "dbo"
},
"PostgresRepository": {
"ConnectionString": "Server=localhost;Database=postgres;User Id=postgres;Password=Postgres123123;Port=5432;Application Name=KafkaFlow Retry Tests;",
Expand Down
2 changes: 2 additions & 0 deletions src/KafkaFlow.Retry.SqlServer/DbConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public DbConnectionContext(SqlServerDbSettings sqlServerDbSettings, bool withinT
this.withinTransaction = withinTransaction;
}

public string Schema => this.sqlServerDbSettings.Schema;

public void Commit()
{
if (this.sqlTransaction is object)
Expand Down
2 changes: 2 additions & 0 deletions src/KafkaFlow.Retry.SqlServer/IDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

internal interface IDbConnection : IDisposable
{
string Schema { get; }

SqlCommand CreateCommand();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -10,4 +10,4 @@ internal interface IRetryQueueItemMessageHeaderRepository

Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageDbo> retryQueueItemMessagesDbo);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Threading.Tasks;
Expand All @@ -10,4 +10,4 @@ internal interface IRetryQueueItemMessageRepository

Task<IList<RetryQueueItemMessageDbo>> GetMessagesOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemDbo> retryQueueItemsDbo);
}
carlosgoias marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Data.SqlClient;
Expand Down Expand Up @@ -29,8 +29,8 @@ public async Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConn
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"SELECT *
FROM [RetryItemMessageHeaders] h
INNER JOIN [RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage
FROM [{dbConnection.Schema}].[RetryItemMessageHeaders] h
INNER JOIN [{dbConnection.Schema}].[RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage
WHERE h.IdItemMessage IN ({string.Join(",", retryQueueItemMessagesDbo.Select(x => $"'{x.IdRetryQueueItem}'"))})
ORDER BY rqi.IdRetryQueue, h.IdItemMessage";

Expand All @@ -46,7 +46,7 @@ private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHea
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [RetryItemMessageHeaders]
command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[RetryItemMessageHeaders]
(IdItemMessage, [Key], Value)
VALUES
(@IdItemMessage, @Key, @Value)";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
carlosgoias marked this conversation as resolved.
Show resolved Hide resolved
using System.Collections.Generic;
using System.Data.SqlClient;
Expand All @@ -16,7 +16,7 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [ItemMessages]
command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[ItemMessages]
(IdRetryQueueItem, [Key], Value, TopicName, Partition, Offset, UtcTimeStamp)
VALUES
(@idRetryQueueItem, @key, @value, @topicName, @partition, @offSet, @utcTimeStamp)";
Expand Down Expand Up @@ -50,11 +50,11 @@ public async Task<IList<RetryQueueItemMessageDbo>> GetMessagesOrderedAsync(IDbCo
}
var parameter = new SqlParameter("@RetryQueueItemsIds", entriesToLoad);
parameter.Direction = System.Data.ParameterDirection.Input;
parameter.TypeName = "dbo.TY_RetryQueueItemsIds";
parameter.TypeName = $"{dbConnection.Schema}.TY_RetryQueueItemsIds";

command.Parameters.Add(parameter);
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"EXEC P_LoadItemMessages @RetryQueueItemsIds";
command.CommandText = $@"EXEC {dbConnection.Schema}.P_LoadItemMessages @RetryQueueItemsIds";

return await this.ExecuteReaderAsync(command).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand All @@ -21,11 +21,11 @@ public async Task<long> AddAsync(IDbConnection dbConnection, RetryQueueItemDbo r
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [RetryQueueItems]
command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[RetryQueueItems]
(IdDomain, IdRetryQueue, IdDomainRetryQueue, IdItemStatus, IdSeverityLevel, AttemptsCount, Sort, CreationDate, LastExecution, ModifiedStatusDate, Description)
VALUES
(@idDomain, @idRetryQueue, @idDomainRetryQueue, @idItemStatus, @idSeverityLevel, @attemptsCount,
(SELECT COUNT(1) FROM [RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue),
(SELECT COUNT(1) FROM [{dbConnection.Schema}].[RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue),
@creationDate, @lastExecution, @modifiedStatusDate, @description);

SELECT SCOPE_IDENTITY()";
Expand Down Expand Up @@ -53,8 +53,8 @@ public async Task<bool> AnyItemStillActiveAsync(IDbConnection dbConnection, Guid
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"SELECT 1 WHERE EXISTS(
SELECT TOP 1 * FROM [RetryQueueItems]
command.CommandText = $@"SELECT 1 WHERE EXISTS(
SELECT TOP 1 * FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry))";
Expand All @@ -77,8 +77,8 @@ public async Task<RetryQueueItemDbo> GetItemAsync(IDbConnection dbConnection, Gu
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"SELECT *
FROM [RetryQueueItems]
command.CommandText = $@"SELECT *
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomain = @IdDomain";

Expand All @@ -96,8 +96,8 @@ public async Task<IList<RetryQueueItemDbo>> GetItemsByQueueOrderedAsync(IDbConne
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"SELECT *
FROM [RetryQueueItems]
command.CommandText = $@"SELECT *
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
ORDER BY Sort ASC";
Expand Down Expand Up @@ -133,20 +133,20 @@ public async Task<IList<RetryQueueItemDbo>> GetItemsOrderedAsync(
}

query = string.Concat(query, $@" *
FROM [RetryQueueItems]
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue IN ({string.Join(",", retryQueueIds.Select(x => $"'{x}'"))})");

if (stuckStatusFilter is null)
{
query = string.Concat(query, $" AND IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))})");
query = string.Concat(query, $" AND IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))})");
}
else
{
query = string.Concat(query, $@" AND(
IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))})
IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))})
OR(
IdItemStatus = { (byte)stuckStatusFilter.ItemStatus}
IdItemStatus = {(byte)stuckStatusFilter.ItemStatus}
AND DATEADD(SECOND, {Math.Floor(stuckStatusFilter.ExpirationInterval.TotalSeconds)}, ModifiedStatusDate) < @DateTimeUtcNow
)
)");
Expand All @@ -173,7 +173,7 @@ public async Task<IList<RetryQueueItemDbo>> GetNewestItemsAsync(IDbConnection db
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"SELECT *
FROM [RetryQueueItems]
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry)
Expand All @@ -198,7 +198,7 @@ public async Task<IList<RetryQueueItemDbo>> GetPendingItemsAsync(IDbConnection d
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"SELECT *
FROM [RetryQueueItems]
FROM [{dbConnection.Schema}].[RetryQueueItems]
WITH (NOLOCK)
WHERE IdDomainRetryQueue = @IdDomainRetryQueue
AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry)
Expand Down Expand Up @@ -237,7 +237,7 @@ public async Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, Re
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"UPDATE [RetryQueueItems]
command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueueItems]
SET IdItemStatus = @IdItemStatus,
AttemptsCount = @AttemptsCount,
LastExecution = @LastExecution,
Expand Down Expand Up @@ -265,7 +265,7 @@ public async Task<int> UpdateStatusAsync(IDbConnection dbConnection, Guid idDoma
using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"UPDATE [RetryQueueItems]
command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueueItems]
SET IdItemStatus = @IdItemStatus,
ModifiedStatusDate = @DateTimeUtcNow
WHERE IdDomain = @IdDomain";
Expand Down
Loading
Loading