Skip to content

Commit

Permalink
Trendyol#14 Postgresql adapter added
Browse files Browse the repository at this point in the history
  • Loading branch information
CelalCanKaya committed May 22, 2024
1 parent 564d9ae commit ede143b
Show file tree
Hide file tree
Showing 36 changed files with 1,285 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Transporter.Core.Utils;
using Transporter.CouchbaseAdapter;
using Transporter.MSSQLAdapter;
using Transporter.PostgreSqlAdapter;
using TransporterService.Daemon;
using TransporterService.Jobs;

Expand All @@ -41,6 +42,7 @@ public static IHostBuilder CreateHostBuilder(string[] args)

services.TransporterMsSqlAdapterRegister();
services.TransporterCouchbaseAdapterRegister();
services.TransporterPostgreSqlAdapterRegister();

services.Configure<QuartzOptions>(hostContext.Configuration.GetSection("Quartz"));
services.AddSingleton<IAdapterFactory, AdapterFactory>();
Expand Down
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,31 @@ In database processes, data safety is our first concern. Since we delete data on

- Interoperability: The project can transfer data mutually between different database types

| | Couchbase | MSSQL |
| :--- | :----: | :----: |
| **Couchbase**| **** | **** |
| **MSSQL** | **** | **** |
| | Couchbase | MSSQL | PostgreSQL |
|:---------------|:---------:|:-----:|:----------:|
| **Couchbase** | **** | **** | **** |
| **MSSQL** | **** | **** | **** |
| **PostgreSQL** | **** | **** | **** |

- Schedulability: Can work at the given interval.



### Configs

Explanation of some main properties of config properties. Please note that the config properties are case sensitive. Example can be found in “examples/configs” folder.

- Name: Must be unique.
- Type: Specifies database type. You can give "Couchbase" or "mssql" for now.
- Type: Specifies database type. You can give "Couchbase", "mssql" or "postgresql" for now.
- Cron: Specifies the interval that transporter will work on. For example: "0/20 * * ? * *"
- Condition: Condition that specifies which data is to be transferred
- KeyProperty: When transferring data from Mssql to Couchbase you can select the key of Couchbase that corresponds to any MSSQL column, not just Id property. If you are transferring data from Couchbase to Couchbase you must use "id" like in the examples.
- KeyProperty: When transferring data from MSSQL/PostgreSQL to Couchbase you can select the key of Couchbase that corresponds to any MSSQL/PostgreSQL column, not just Id property. If you are transferring data from Couchbase to Couchbase you must use "id" like in the examples.


### Noteworthy Mentions

For Couchbase: When generating a key we use id and name of data source. Example: {id}_{dataSourceName}
For mssql: Unique Index must be created for id and data source name on interim table.
For mssql: Unique Index must be created for id and data source name on interim table.
For postgresql: Unique Index must be created for id and data source name on interim table.


<!-- GETTING STARTED -->
Expand Down
90 changes: 90 additions & 0 deletions Transporter.PostgreSQLAdapter/Adapters/PostgreSqlInterimAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Transporter.Core.Adapters.Base.Interfaces;
using Transporter.Core.Adapters.Interim.Interfaces;
using Transporter.Core.Configs.Base.Interfaces;
using Transporter.Core.Utils;
using Transporter.PostgreSQLAdapter.Configs.Interim.Interfaces;
using Transporter.PostgreSQLAdapter.Services.Interim.Interfaces;
using Transporter.PostgreSQLAdapter.Utils;

namespace Transporter.PostgreSQLAdapter.Adapters
{
public class PostgreSqlInterimAdapter : IInterimAdapter
{
private readonly IConfiguration _configuration;
private readonly IInterimService _interimService;
private IPostgreSqlInterimSettings _settings;
public PostgreSqlInterimAdapter(IConfiguration configuration, IInterimService interimService)
{
_configuration = configuration;
_interimService = interimService;
}

public object Clone()
{
var result = MemberwiseClone() as IAdapter;
return result;
}

public bool CanHandle(ITransferJobSettings transferJobSettings)
{
var options = GetOptions(transferJobSettings);
return string.Equals(options.Type, PostgreSqlAdapterConstants.OptionsType,
StringComparison.InvariantCultureIgnoreCase);
}

public bool CanHandle(IPollingJobSettings jobSetting)
{
var type = GetTypeBySettings(jobSetting);
return string.Equals(type, PostgreSqlAdapterConstants.OptionsType, StringComparison.InvariantCultureIgnoreCase);
}

public void SetOptions(ITransferJobSettings transferJobSettings)
{
_settings = GetOptions(transferJobSettings);
}

public void SetOptions(IPollingJobSettings jobSettings)
{
_settings = GetOptions(jobSettings);
}

public async Task<IEnumerable<dynamic>> GetAsync()
{
return await _interimService.GetInterimDataAsync(_settings);
}

public async Task DeleteAsync(IEnumerable<dynamic> ids)
{
await _interimService.DeleteAsync(_settings, ids);
}

private IPostgreSqlInterimSettings GetOptions(ITransferJobSettings transferJobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.TransferJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == transferJobSettings.Name);
return (IPostgreSqlInterimSettings)options.Interim;
}

private IPostgreSqlInterimSettings GetOptions(IPollingJobSettings jobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.PollingJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == jobSettings.Name);
return (IPostgreSqlInterimSettings)options.Interim;
}

private string GetTypeBySettings(IPollingJobSettings jobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.PollingJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == jobSettings.Name);
return options.Interim?.Type;
}
}
}
102 changes: 102 additions & 0 deletions Transporter.PostgreSQLAdapter/Adapters/PostgreSqlSourceAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Transporter.Core.Adapters.Base.Interfaces;
using Transporter.Core.Adapters.Source.Interfaces;
using Transporter.Core.Configs.Base.Interfaces;
using Transporter.Core.Utils;
using Transporter.PostgreSQLAdapter;
using Transporter.PostgreSQLAdapter.Configs.Source.Interfaces;
using Transporter.PostgreSQLAdapter.Services.Source.Interfaces;
using Transporter.PostgreSQLAdapter.Utils;

namespace Transporter.PostgreSqlAdapter.Adapters
{
public class PostgreSqlSourceAdapter : ISourceAdapter
{
private readonly IConfiguration _configuration;
private readonly ISourceService _sourceService;
private IPostgreSqlSourceSettings _settings;

public PostgreSqlSourceAdapter(ISourceService sourceService, IConfiguration configuration)
{
_sourceService = sourceService;
_configuration = configuration;
}

public bool CanHandle(ITransferJobSettings transferJobSettings)
{
var options = GetOptions(transferJobSettings);
return string.Equals(options.Type, PostgreSqlAdapterConstants.OptionsType,
StringComparison.InvariantCultureIgnoreCase);
}

public bool CanHandle(IPollingJobSettings jobSetting)
{
var type = GetTypeBySettings(jobSetting);
return string.Equals(type, PostgreSqlAdapterConstants.OptionsType, StringComparison.InvariantCultureIgnoreCase);
}

public void SetOptions(ITransferJobSettings transferJobSettings)
{
_settings = GetOptions(transferJobSettings);
}

public void SetOptions(IPollingJobSettings jobSettings)
{
_settings = GetOptions(jobSettings);
}

public async Task<IEnumerable<dynamic>> GetAsync(IEnumerable<dynamic> ids)
{
return await _sourceService.GetSourceDataAsync(_settings, ids);
}

public async Task DeleteAsync(IEnumerable<dynamic> ids)
{
await _sourceService.DeleteDataByListOfIdsAsync(_settings, ids);
}

public string GetDataSourceName()
{
return $"{_settings.Options.Schema}.{_settings.Options.Table}";
}

public async Task<IEnumerable<dynamic>> GetIdsAsync()
{
return await _sourceService.GetIdDataAsync(_settings);
}

public virtual object Clone()
{
var result = MemberwiseClone() as IAdapter;
return result;
}

private IPostgreSqlSourceSettings GetOptions(ITransferJobSettings transferJobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.TransferJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == transferJobSettings.Name);
return (IPostgreSqlSourceSettings)options.Source;
}

private IPostgreSqlSourceSettings GetOptions(IPollingJobSettings jobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.PollingJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == jobSettings.Name);
return (IPostgreSqlSourceSettings)options.Source;
}

private string GetTypeBySettings(IPollingJobSettings jobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.PollingJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == jobSettings.Name);
return options.Source?.Type;
}
}
}
92 changes: 92 additions & 0 deletions Transporter.PostgreSQLAdapter/Adapters/PostgreSqlTargetAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Transporter.Core.Adapters.Base.Interfaces;
using Transporter.Core.Adapters.Target.Interfaces;
using Transporter.Core.Configs.Base.Interfaces;
using Transporter.Core.Utils;
using Transporter.PostgreSQLAdapter;
using Transporter.PostgreSQLAdapter.Configs.Target.Interfaces;
using Transporter.PostgreSQLAdapter.Services.Target.Interfaces;
using Transporter.PostgreSQLAdapter.Utils;

namespace Transporter.PostgreSqlAdapter.Adapters
{
public class PostgreSqlTargetAdapter : ITargetAdapter
{
private readonly IConfiguration _configuration;
private readonly ITargetService _targetService;
private IPostgreSqlTargetSettings _settings;

public PostgreSqlTargetAdapter(ITargetService targetService, IConfiguration configuration)
{
_targetService = targetService;
_configuration = configuration;
}

public bool CanHandle(ITransferJobSettings transferJobSettings)
{
var options = GetOptions(transferJobSettings);
return string.Equals(options.Type, PostgreSqlAdapterConstants.OptionsType,
StringComparison.InvariantCultureIgnoreCase);
}

public bool CanHandle(IPollingJobSettings jobSetting)
{
var type = GetTypeBySettings(jobSetting);
return string.Equals(type, PostgreSqlAdapterConstants.OptionsType, StringComparison.InvariantCultureIgnoreCase);
}

public void SetOptions(ITransferJobSettings transferJobSettings)
{
_settings = GetOptions(transferJobSettings);
}

public void SetOptions(IPollingJobSettings jobSettings)
{
_settings = GetOptions(jobSettings);
}

public async Task SetAsync(string data)
{
await _targetService.SetTargetDataAsync(_settings, data);
}

public async Task SetInterimTableAsync(string data, string dataSourceName)
{
await _targetService.SetTargetTemporaryDataAsync(_settings, data, dataSourceName);
}

public virtual object Clone()
{
var result = MemberwiseClone() as IAdapter;
return result;
}

private IPostgreSqlTargetSettings GetOptions(IPollingJobSettings jobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.PollingJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == jobSettings.Name);
return (IPostgreSqlTargetSettings)options.Target;
}

private IPostgreSqlTargetSettings GetOptions(ITransferJobSettings transferJobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.TransferJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == transferJobSettings.Name);
return (IPostgreSqlTargetSettings)options.Target;
}

private string GetTypeBySettings(IPollingJobSettings jobSettings)
{
var jobOptionsList = _configuration
.GetSection(Constants.PollingJobSettings).Get<List<PostgreSqlTransferJobSettings>>();
var options = jobOptionsList.First(x => x.Name == jobSettings.Name);
return options.Target?.Type;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Transporter.PostgreSQLAdapter.Configs.Interim.Interfaces;

namespace Transporter.PostgreSQLAdapter.Configs.Interim.Implementations
{
public class PostgreSqlInterimOptions : IPostgreSqlInterimOptions
{
public string Table { get; set; }
public string Schema { get; set; }
public string ConnectionString { get; set; }
public long BatchQuantity { get; set; }
public string DataSourceName { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Transporter.PostgreSQLAdapter.Configs.Interim.Interfaces;

namespace Transporter.PostgreSQLAdapter.Configs.Interim.Implementations
{
public class PostgreSqlInterimSettings : IPostgreSqlInterimSettings
{
public PostgreSqlInterimSettings()
{
Options = new PostgreSqlInterimOptions();
}

public IPostgreSqlInterimOptions Options { get; set; }
public string Type { get; set; }

public override string ToString()
{
return $"Type : {Type} Options : {Options}";
}

public string Host { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Transporter.PostgreSQLAdapter.Configs.Interim.Interfaces
{
public interface IPostgreSqlInterimOptions
{
public string Table { get; set; }
public string Schema { get; set; }
public string ConnectionString { get; set; }
public long BatchQuantity { get; set; }
public string DataSourceName { get; set; }
}
}
Loading

0 comments on commit ede143b

Please sign in to comment.