diff --git a/.github/actions/build-with-plugins/action.yml b/.github/actions/build-with-plugins/action.yml index 9aa8f88..f68a76d 100644 --- a/.github/actions/build-with-plugins/action.yml +++ b/.github/actions/build-with-plugins/action.yml @@ -151,6 +151,21 @@ runs: -p:PublishReadyToRun=false \ -p:PublishTrimmed=false \ -p:Version=${{ inputs.build-version }} + - name: Build PostgreSQL Extension + shell: bash + run: | + dotnet publish \ + Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj \ + --configuration Release \ + --output ${{ inputs.platform-short }}/Extensions \ + --self-contained false \ + --runtime ${{ inputs.runtime }} \ + -p:PublishSingleFile=false \ + -p:DebugType=embedded \ + -p:EnableCompressionInSingleFile=true \ + -p:PublishReadyToRun=false \ + -p:PublishTrimmed=false \ + -p:Version=${{ inputs.build-version }} - name: Upload package uses: actions/upload-artifact@v3 with: diff --git a/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj b/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj index c9d8254..67838d7 100644 --- a/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj +++ b/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj @@ -19,15 +19,15 @@ - - + + - + diff --git a/Core/Cosmos.DataTransfer.Core/migrationsettings.json b/Core/Cosmos.DataTransfer.Core/migrationsettings.json index 82ea817..a5a6272 100644 --- a/Core/Cosmos.DataTransfer.Core/migrationsettings.json +++ b/Core/Cosmos.DataTransfer.Core/migrationsettings.json @@ -1,9 +1,11 @@ { - "Source": null, - "Sink": null, - "SourceSettings": { + "Source": "", + "Sink": "", + "SourceSettings": { + }, - "SinkSettings": { + "SinkSettings": { + }, "Operations": [ //{ diff --git a/CosmosDbDataMigrationTool.sln b/CosmosDbDataMigrationTool.sln index 92b59f9..10126ef 100644 --- a/CosmosDbDataMigrationTool.sln +++ b/CosmosDbDataMigrationTool.sln @@ -39,7 +39,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Mongo", "Mongo", "{F18E789A Extensions\Mongo\README.md = Extensions\Mongo\README.md EndProjectSection EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoExtension\Cosmos.DataTransfer.MongoExtension.csproj", "{F6EAC33B-9F7D-433B-9328-622FB8938C24}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoVectorExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoVectorExtension\Cosmos.DataTransfer.MongoVectorExtension.csproj", "{F6EAC33B-9F7D-433B-9328-622FB8938C24}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.JsonExtension.UnitTests", "Extensions\Json\Cosmos.DataTransfer.JsonExtension.UnitTests\Cosmos.DataTransfer.JsonExtension.UnitTests.csproj", "{ED1E375E-A5A3-47EA-A7D5-07344C7E152F}" EndProject @@ -87,7 +87,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Csv", "Csv", "{39930280-DA2 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.CsvExtension", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension\Cosmos.DataTransfer.CsvExtension.csproj", "{6A3FB90C-B837-4724-A406-214D4CEA686F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cosmos.DataTransfer.CsvExtension.UnitTests", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension.UnitTests\Cosmos.DataTransfer.CsvExtension.UnitTests.csproj", "{40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.CsvExtension.UnitTests", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension.UnitTests\Cosmos.DataTransfer.CsvExtension.UnitTests.csproj", "{40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}" +EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{BCBBAF22-0CB5-416B-8C80-03AB2FC4D0A0}" ProjectSection(SolutionItems) = preProject Contributing.md = Contributing.md @@ -95,6 +96,15 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution README.md = README.md EndProjectSection EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.PostgresqlExtension", "Extensions\PostgreSQL\Cosmos.DataTransfer.PostgresqlExtension.csproj", "{85820167-DB94-458B-B09B-9E823996C692}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PostgreSQL", "PostgreSQL", "{1B927C5F-50FC-42A6-BAF6-B00E6D760543}" + ProjectSection(SolutionItems) = preProject + Extensions\PostgreSQL\README.md = Extensions\PostgreSQL\README.md + EndProjectSection +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoExtension\Cosmos.DataTransfer.MongoExtension.csproj", "{31BC84E1-55E5-45AA-BFAC-90732F20588B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -181,6 +191,14 @@ Global {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}.Debug|Any CPU.Build.0 = Debug|Any CPU {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}.Release|Any CPU.ActiveCfg = Release|Any CPU {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}.Release|Any CPU.Build.0 = Release|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Debug|Any CPU.Build.0 = Debug|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Release|Any CPU.ActiveCfg = Release|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Release|Any CPU.Build.0 = Release|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -212,6 +230,9 @@ Global {39930280-DA29-4814-837B-FA7F252EB3EC} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201} {6A3FB90C-B837-4724-A406-214D4CEA686F} = {39930280-DA29-4814-837B-FA7F252EB3EC} {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E} = {39930280-DA29-4814-837B-FA7F252EB3EC} + {85820167-DB94-458B-B09B-9E823996C692} = {1B927C5F-50FC-42A6-BAF6-B00E6D760543} + {1B927C5F-50FC-42A6-BAF6-B00E6D760543} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201} + {31BC84E1-55E5-45AA-BFAC-90732F20588B} = {F18E789A-D32D-48D3-B75F-1196D7215F74} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {662B3F27-70D8-45E6-A1C0-1438A9C8A542} diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj index 06801d6..277532c 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj @@ -8,7 +8,7 @@ - + diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj new file mode 100644 index 0000000..9ef48ac --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj @@ -0,0 +1,26 @@ + + + + net6.0 + enable + enable + Exe + + + + + + + + + + + + + + + + + + + diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs new file mode 100644 index 0000000..c9d671f --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs @@ -0,0 +1,89 @@ +using System.ComponentModel.Composition; +using Azure; +using Azure.AI.OpenAI; +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.MongoExtension; +using Cosmos.DataTransfer.MongoVectorExtension.Settings; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using MongoDB.Bson; + +namespace Cosmos.DataTransfer.MongoVectorExtension; +[Export(typeof(IDataSinkExtension))] +public class MongoVectorDataSinkExtension : IDataSinkExtensionWithSettings +{ + public string DisplayName => $"MongoDB-Vector{ExtensionExtensions.BetaExtensionTag}"; + + public async Task WriteAsync(IAsyncEnumerable dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default) + { + var settings = config.Get(); + settings.Validate(); + + if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName) && !string.IsNullOrEmpty(settings.Collection)) + { + var Isembeddingsetsvalid = false; + var client = new OpenAIClient(""); + if (settings.GenerateEmbedding.HasValue && settings.GenerateEmbedding.Value && settings.SourcePropEmbedding != null && settings.DestPropEmbedding != null) + { + if (!string.IsNullOrEmpty(settings.OpenAIUrl) && !string.IsNullOrEmpty(settings.OpenAIKey) && !string.IsNullOrEmpty(settings.OpenAIDeploymentName)) + { + client = new OpenAIClient(new Uri(settings.OpenAIUrl), new AzureKeyCredential(settings.OpenAIKey)); + Isembeddingsetsvalid = true; + logger.LogInformation("OpenAI Embedding settings are valid."); + } + } + + var context = new Context(settings.ConnectionString, settings.DatabaseName); + var repo = context.GetRepository(settings.Collection); + var batchSize = settings.BatchSize ?? 1000; + var objects = new List(); + int itemCount = 0; + await foreach (var item in dataItems.WithCancellation(cancellationToken)) + { + var dict = item.BuildDynamicObjectTree(); + + if (Isembeddingsetsvalid) + { + var valtoemb = item.GetValue(settings.SourcePropEmbedding)?.ToString(); + if (!string.IsNullOrEmpty(valtoemb) && valtoemb?.Length < 8192) + { + var options = new EmbeddingsOptions() + { + DeploymentName = settings.OpenAIDeploymentName, + Input = { valtoemb } + }; + var vector = await client.GetEmbeddingsAsync(options,cancellationToken); + if (vector != null) + { + dict?.TryAdd(settings.DestPropEmbedding, vector.Value.Data[0].Embedding.ToArray()); + } + } + } + objects.Add(new BsonDocument(dict)); + itemCount++; + + if (objects.Count == batchSize) + { + await repo.AddRange(objects); + logger.LogInformation("Added {ItemCount} items to collection '{Collection}'", itemCount, settings.Collection); + objects.Clear(); + } + } + + if (objects.Any()) + { + await repo.AddRange(objects); + } + + if (itemCount > 0) + logger.LogInformation("Added {ItemCount} total items to collection '{Collection}'", itemCount, settings.Collection); + else + logger.LogWarning("No items added to collection '{Collection}'", settings.Collection); + } + } + + public IEnumerable GetSettings() + { + yield return new MongoVectorSinkSettings(); + } +} diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Program.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Program.cs new file mode 100644 index 0000000..90fe8a7 --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Program.cs @@ -0,0 +1 @@ +Console.WriteLine("Starting Mongo extension"); diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml new file mode 100644 index 0000000..789090b --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml @@ -0,0 +1,24 @@ + + + + + Debug + Any CPU + ..\..\..\Core\Cosmos.DataTransfer.Core\bin\Debug\net6.0\Extensions + FileSystem + <_TargetId>Folder + net6.0 + false + + + Release + Any CPU + ..\..\..\Core\Cosmos.DataTransfer.Core\bin\Release\net6.0\Extensions + FileSystem + <_TargetId>Folder + net6.0 + false + + \ No newline at end of file diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs new file mode 100644 index 0000000..4148263 --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs @@ -0,0 +1,21 @@ +using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.MongoExtension.Settings; + +namespace Cosmos.DataTransfer.MongoVectorExtension.Settings; +public class MongoVectorSinkSettings : MongoBaseSettings +{ + [Required] + public string? Collection { get; set; } + + public int? BatchSize { get; set; } + + public bool? GenerateEmbedding { get; set; } + + public string? OpenAIUrl { get; set; } + public string? OpenAIKey { get; set; } + + // name of the deployment for text-embedding-ada-002 + public string? OpenAIDeploymentName { get; set; } + public string? SourcePropEmbedding { get; set; } + public string? DestPropEmbedding { get; set; } +} diff --git a/Extensions/Mongo/README.md b/Extensions/Mongo/README.md index 621027b..f491f2e 100644 --- a/Extensions/Mongo/README.md +++ b/Extensions/Mongo/README.md @@ -20,11 +20,47 @@ Source and sink settings require both `ConnectionString` and `DatabaseName` para ### Sink +```json +{ + "ConnectionString": "", + "DatabaseName: "", + "Collection": "" +} +``` + +# MongoDB Vector Extension (Beta) + +The MongoDB Vector extension is a Sink only extension that builds on the MongoDB extension by providing additional capabilities for generating embeddings using Azure OpenAI APIs. + +> **Note**: When specifying the MongoDB Vector extension as the Sink property in configuration, utilize the name **MongoDB-Vector(beta)**. + +## Settings + +The settings are based on the MongoDB extension settings with additional parameters for generating embeddings. + +### Additional Sink Settings + +The sink settings require the following additional parameters: + +- `GenerateEmbedding`: If set to true, the sink will generate embeddings for the records before writing them to the database. The sink requires the `OpenAIUrl`, `OpenAIKey`, and `OpenAIDeploymentModel` parameters to be set. Following paramaters are required if this is true +- `OpenAIUrl`: The URL of the OpenAI API +- `OpenAIKey`: The API key for the OpenAI API +- `OpenAIDeploymentModel`: The deployment model to use for the OpenAI API +- `SourcePropEmbedding`: The property in the source data that should be used to generate the embeddings +- `DestPropEmbedding`: New property name that will be added to the source data with the generated embeddings + ```json { "ConnectionString": "", "DatabaseName: "", "Collection": "", - "BatchSize: 100 + "BatchSize: 100, + "GenerateEmbedding": true | false + "OpenAIUrl": "", + "OpenAIKey": "", + "OpenAIDeploymentModel": "", + "SourcePropEmbedding": "", + "DestPropEmbedding": "" } ``` + diff --git a/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj b/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj new file mode 100644 index 0000000..d96d74c --- /dev/null +++ b/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj @@ -0,0 +1,22 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + + + diff --git a/Extensions/PostgreSQL/PostgreDataCol.cs b/Extensions/PostgreSQL/PostgreDataCol.cs new file mode 100644 index 0000000..9cf23ff --- /dev/null +++ b/Extensions/PostgreSQL/PostgreDataCol.cs @@ -0,0 +1,141 @@ +using Microsoft.VisualBasic; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Cosmos.DataTransfer.PostgresqlExtension +{ + public class PostgreDataCol + { + public string ColumnName { get; set; } + public Type ColumnType { get; set; } + + public NpgsqlTypes.NpgsqlDbType PostgreType { get; set; } + + public PostgreDataCol(string colname, Type coltype) + { + ColumnName = colname; + ColumnType = coltype; + PostgreType = Convert(coltype); + } + + public PostgreDataCol(string colname, NpgsqlTypes.NpgsqlDbType postgreType) + { + ColumnName = colname; + PostgreType = postgreType; + ColumnType = Convert(postgreType); + } + + public PostgreDataCol(string colname, string postgredatatye) + { + ColumnName = colname; + PostgreType = Convert(postgredatatye); + ColumnType = Convert(PostgreType); + } + + public PostgreDataCol() + { + } + + public Dictionary SparseColumnData { get; } = new Dictionary(); + + + + + public void AddColumnValue(long row, object? value) + { + if (value == null) + { + return; + } + + SparseColumnData[row] = value; + } + + public Type Convert(NpgsqlTypes.NpgsqlDbType coltype) + { + return coltype switch + { + NpgsqlTypes.NpgsqlDbType.Varchar => typeof(string), + NpgsqlTypes.NpgsqlDbType.Integer => typeof(int), + NpgsqlTypes.NpgsqlDbType.Bigint => typeof(long), + NpgsqlTypes.NpgsqlDbType.Boolean => typeof(bool), + NpgsqlTypes.NpgsqlDbType.Timestamp => typeof(DateTime), + NpgsqlTypes.NpgsqlDbType.Double => typeof(double), + NpgsqlTypes.NpgsqlDbType.Real => typeof(float), + NpgsqlTypes.NpgsqlDbType.Numeric => typeof(decimal), + NpgsqlTypes.NpgsqlDbType.Bytea => typeof(byte[]), + NpgsqlTypes.NpgsqlDbType.Uuid => typeof(Guid), + NpgsqlTypes.NpgsqlDbType.Char => typeof(char), + NpgsqlTypes.NpgsqlDbType.Interval => typeof(TimeSpan), + NpgsqlTypes.NpgsqlDbType.TimestampTz => typeof(DateTimeOffset), + NpgsqlTypes.NpgsqlDbType.Smallint => typeof(short), + NpgsqlTypes.NpgsqlDbType.Unknown => typeof(DBNull), + _ => typeof(DBNull), + }; + } + + public NpgsqlTypes.NpgsqlDbType Convert(string postgredattype) + { + return postgredattype.ToLower() switch + { + "varchar" =>NpgsqlTypes.NpgsqlDbType.Varchar, + "int8" => NpgsqlTypes.NpgsqlDbType.Bigint, + "int4" => NpgsqlTypes.NpgsqlDbType.Integer, + "int2" => NpgsqlTypes.NpgsqlDbType.Smallint, + "bool" => NpgsqlTypes.NpgsqlDbType.Boolean, + "timestamp" => NpgsqlTypes.NpgsqlDbType.Timestamp, + "timestamptz" => NpgsqlTypes.NpgsqlDbType.TimestampTz, + "float8" => NpgsqlTypes.NpgsqlDbType.Double, + "float4" => NpgsqlTypes.NpgsqlDbType.Real, + "numeric" => NpgsqlTypes.NpgsqlDbType.Numeric, + "bytea" => NpgsqlTypes.NpgsqlDbType.Bytea, + "char" => NpgsqlTypes.NpgsqlDbType.Char, + "interval" => NpgsqlTypes.NpgsqlDbType.Interval, + "int2vector"=> NpgsqlTypes.NpgsqlDbType.Array, + "jsonb" => NpgsqlTypes.NpgsqlDbType.Jsonb, + "name" => NpgsqlTypes.NpgsqlDbType.Name, + "oid" => NpgsqlTypes.NpgsqlDbType.Oid, + "text" => NpgsqlTypes.NpgsqlDbType.Text, + "unknown" =>NpgsqlTypes.NpgsqlDbType.Unknown, + _ => NpgsqlTypes.NpgsqlDbType.Unknown, + }; + } + + public NpgsqlTypes.NpgsqlDbType Convert(Type coltype) + { + if (coltype.Name == "Missing") + { + return //NpgsqlTypes.NpgsqlDbType.Varchar; + NpgsqlTypes.NpgsqlDbType.Unknown; + } + return coltype switch + { + var type when type == typeof(string) => NpgsqlTypes.NpgsqlDbType.Varchar, + var type when type == typeof(int) => NpgsqlTypes.NpgsqlDbType.Integer, + var type when type == typeof(long) => NpgsqlTypes.NpgsqlDbType.Bigint, + var type when type == typeof(bool) => NpgsqlTypes.NpgsqlDbType.Boolean, + var type when type == typeof(DateTime) => NpgsqlTypes.NpgsqlDbType.Timestamp, + var type when type == typeof(double) => NpgsqlTypes.NpgsqlDbType.Double, + var type when type == typeof(float) => NpgsqlTypes.NpgsqlDbType.Real, + var type when type == typeof(decimal) => NpgsqlTypes.NpgsqlDbType.Numeric, + var type when type == typeof(byte[]) => NpgsqlTypes.NpgsqlDbType.Bytea, + var type when type == typeof(Guid) => NpgsqlTypes.NpgsqlDbType.Uuid, + var type when type == typeof(char) => NpgsqlTypes.NpgsqlDbType.Char, + var type when type == typeof(TimeSpan) => NpgsqlTypes.NpgsqlDbType.Interval, + var type when type == typeof(DateTimeOffset) => NpgsqlTypes.NpgsqlDbType.TimestampTz, + var type when type == typeof(short) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(uint) => NpgsqlTypes.NpgsqlDbType.Integer, + var type when type == typeof(ushort) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(ulong) => NpgsqlTypes.NpgsqlDbType.Bigint, + var type when type == typeof(sbyte) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(byte) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(char[]) => NpgsqlTypes.NpgsqlDbType.Text, + var type when type == typeof(char?) => NpgsqlTypes.NpgsqlDbType.Char, + _ => NpgsqlTypes.NpgsqlDbType.Unknown, + }; + } + } +} diff --git a/Extensions/PostgreSQL/PostgreDictionaryDataItem.cs b/Extensions/PostgreSQL/PostgreDictionaryDataItem.cs new file mode 100644 index 0000000..a668304 --- /dev/null +++ b/Extensions/PostgreSQL/PostgreDictionaryDataItem.cs @@ -0,0 +1,27 @@ +using Cosmos.DataTransfer.Interfaces; + +namespace Cosmos.DataTransfer.PostgresqlExtension +{ + public class PostgreDictionaryDataItem : IDataItem + { + public IDictionary Columns { get; set; } + + public PostgreDictionaryDataItem(IDictionary columns) + { + Columns = columns; + } + public IEnumerable GetFieldNames() + { + return Columns.Keys; + } + + public object? GetValue(string fieldName) + { + if (!Columns.TryGetValue(fieldName, out var value)) + { + return null; + } + return value; + } + } +} diff --git a/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs b/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs new file mode 100644 index 0000000..8f16dce --- /dev/null +++ b/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs @@ -0,0 +1,197 @@ +using System.ComponentModel.Composition; +using System.Data; +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.PostgresqlExtension.Settings; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Npgsql; + +namespace Cosmos.DataTransfer.PostgresqlExtension +{ + [Export(typeof(IDataSinkExtension))] + public class PostgresqlDataSinkExtension : IDataSinkExtensionWithSettings + { + public string DisplayName => "PostgreSQL"; + + public IEnumerable GetSettings() + { + yield return new PostgreSinkSettings(); + } + + public async Task WriteAsync(IAsyncEnumerable dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default) + { + var settings = config.Get(); + settings.Validate(); + + var cols = await FindPostgreDataTypes(dataItems, cancellationToken); + NpgsqlConnection con = new(settings.ConnectionString); + + if (settings.AppendDataToTable == true && !string.IsNullOrEmpty(settings.TableName)) + { + var destcols = LoadTableSchema(con, settings.TableName); + cols = MapDataTypes(destcols, cols); + } + else if (settings.DropAndCreateTable == true) + { + DropTable(con, settings.TableName); + CreateTable(con, settings.TableName, cols); + } + con.Open(); + using (var writer = con.BeginBinaryImport(GenerateInsertCommand(settings.TableName, cols))) + { + await foreach (var row in dataItems) + { + await writer.StartRowAsync(cancellationToken).ConfigureAwait(false); + foreach (var item in cols) + { + try + { + await writer.WriteAsync(row.GetValue(item.ColumnName), item.PostgreType, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogError(ex, "Error writing to database"); + } + } + } + await writer.CompleteAsync(cancellationToken).ConfigureAwait(false); + } + con.Close(); + } + + private async Task> FindPostgreDataTypes(IAsyncEnumerable dataItems, CancellationToken cancellationToken = default) + { + List postgreDataCols = new(); + await foreach (var item in dataItems) + { + var fieldNames = item.GetFieldNames(); + int row = 0; + foreach (var col in fieldNames) + { + var current = postgreDataCols.FirstOrDefault(c => c.ColumnName == col); + var colval = item.GetValue(col); + var coltype = Type.Missing.GetType(); + if (colval != null) + { + coltype = colval.GetType(); + } + if (current == null) + { + var newcol = new PostgreDataCol(col, coltype); + newcol.AddColumnValue(row, colval); + postgreDataCols.Add(newcol); + + } + else + { + if (current.PostgreType == NpgsqlTypes.NpgsqlDbType.Unknown && coltype?.Name != "Missing") + { + var newcol = new PostgreDataCol(col, coltype); + postgreDataCols[row] = newcol; + } + } + row++; + } + } + return postgreDataCols; + } + + private List MapDataTypes(List dest, List source) + { + var temp = new List(); + foreach (var item in dest) + { + bool found = false; + foreach (var col in source) + { + if (item.ColumnName.ToLower() == col.ColumnName.ToLower()) + { + temp.Add(new PostgreDataCol() + { + ColumnName = col.ColumnName, + ColumnType = item.ColumnType, + PostgreType = item.PostgreType + }); + found = true; + break; + } + } + if (!found) + { + throw new Exception($"Column '{item.ColumnName}' does not exist in the source."); + } + } + return temp; + } + + private static void CreateTable(NpgsqlConnection con, string tableName, List cols) + { + //NpgsqlConnection con = new(connectionString); + var createtxt = $"CREATE TABLE {tableName}("; + foreach (var item in cols) + { + createtxt += $"{item.ColumnName} {item.PostgreType},"; + if (cols.Last() == item) + { + createtxt = createtxt.TrimEnd(','); + } + } + createtxt += ")"; + con.Open(); + using (var cmd = new NpgsqlCommand(createtxt, con)) + { + cmd.ExecuteNonQuery(); + } + con.Close(); + } + + private static void DropTable(NpgsqlConnection con, string tableName) + { + con.Open(); + using (var cmd = new NpgsqlCommand($"DROP TABLE IF EXISTS {tableName}", con)) + { + cmd.ExecuteNonQuery(); + } + con.Close(); + } + + private static List LoadTableSchema(NpgsqlConnection con, string tableName) + { + var temp = new List(); + con.Open(); + var dt = new DataTable(); + using (var cmd = new NpgsqlCommand($"SELECT column_name, udt_name FROM information_schema.columns WHERE table_name = '{tableName}'", con)) + using (var reader = cmd.ExecuteReader()) + { + dt.Load(reader); + } + foreach (DataRow row in dt.Rows) + { + if (row != null) + { + var newcol = new PostgreDataCol(row["column_name"]?.ToString(), row["udt_name"]?.ToString()); + temp.Add(newcol); + } + } + + con.Close(); + return temp; + } + + private static string GenerateInsertCommand(string tablename, List cols) + { + var colstxt = ""; + foreach (var item in cols) + { + colstxt += $"{item.ColumnName},"; + if (cols.Last() == item) + { + colstxt = colstxt.TrimEnd(','); + } + } + return $"COPY {tablename}({colstxt}) FROM STDIN(FORMAT BINARY)"; + } + + + } +} diff --git a/Extensions/PostgreSQL/PostgresqlDataSourceExtension.cs b/Extensions/PostgreSQL/PostgresqlDataSourceExtension.cs new file mode 100644 index 0000000..3ca9095 --- /dev/null +++ b/Extensions/PostgreSQL/PostgresqlDataSourceExtension.cs @@ -0,0 +1,47 @@ +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.PostgresqlExtension.Settings; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Npgsql; +using System.ComponentModel.Composition; +using System.Runtime.CompilerServices; + +namespace Cosmos.DataTransfer.PostgresqlExtension; +[Export(typeof(IDataSourceExtension))] + +internal class PostgresqlDataSourceExtension : IDataSourceExtensionWithSettings +{ + public string DisplayName => "PostgreSQL"; + + public IEnumerable GetSettings() + { + yield return new PostgreSourceSettings(); + } + + public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var settings = config.Get(); + settings.Validate(); + + await using var connection = new NpgsqlConnection(settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + await using var command = new NpgsqlCommand(settings.QueryText, connection); + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + while (await reader.ReadAsync(cancellationToken)) + { + var columns = await reader.GetColumnSchemaAsync(cancellationToken); + Dictionary fields = new(); + foreach (var column in columns) + { + var value = column.ColumnOrdinal.HasValue ? reader[column.ColumnOrdinal.Value] : reader[column.ColumnName]; + if (value == DBNull.Value) + { + value = null; + } + fields[column.ColumnName] = value; + } + yield return new DictionaryDataItem(fields); + } + } +} + diff --git a/Extensions/PostgreSQL/Program.cs b/Extensions/PostgreSQL/Program.cs new file mode 100644 index 0000000..864ddc4 --- /dev/null +++ b/Extensions/PostgreSQL/Program.cs @@ -0,0 +1 @@ +Console.WriteLine(); \ No newline at end of file diff --git a/Extensions/PostgreSQL/Properties/PublishProfiles/FolderProfile.pubxml b/Extensions/PostgreSQL/Properties/PublishProfiles/FolderProfile.pubxml new file mode 100644 index 0000000..d6854c4 --- /dev/null +++ b/Extensions/PostgreSQL/Properties/PublishProfiles/FolderProfile.pubxml @@ -0,0 +1,15 @@ + + + + + Release + Any CPU + ..\..\Core\Cosmos.DataTransfer.Core\bin\Debug\net6.0\Extensions + FileSystem + <_TargetId>Folder + net6.0 + false + + \ No newline at end of file diff --git a/Extensions/PostgreSQL/README.md b/Extensions/PostgreSQL/README.md new file mode 100644 index 0000000..b15dea8 --- /dev/null +++ b/Extensions/PostgreSQL/README.md @@ -0,0 +1,35 @@ +# PostgreSQL Extension + +The PostgreSQL data transfer extension provides source and sink capabilities for reading from and writing to table data in PostgreSQL Server. + +> **Note**: When specifying the PostgreSQL extension as the Source or Sink property in configuration, utilize the name **PostgreSQL**. + +## Settings + +Source and sink settings both require a `ConnectionString` parameter. Specify the database name in the connection string. + +Source settings also require a `QueryText` parameter to define the data to select from SQL. This can combine data from multiple tables, views, etc. but should produce a single result set. + +### Source + +```json +{ + "ConnectionString": "", + "QueryText": "" +} +``` + +Sink settings require a `TableName` to define where to insert data. +- `AppendDataToTable`: Set to true to use table's schema and append data to the table. +- `DropAndCreateTable`: Set to true to drop and recreate the table. Schema will be guessed from the source data. + +### Sink + +```json +{ + "ConnectionString": "", + "TableName": "", + "AppendDataToTable": true | false, + "DropAndCreateTable": true | false +} +``` \ No newline at end of file diff --git a/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs b/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs new file mode 100644 index 0000000..dc02d49 --- /dev/null +++ b/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs @@ -0,0 +1,13 @@ +using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.Interfaces.Manifest; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + public class PostgreBaseSettings : IDataExtensionSettings + { + [Required] + [SensitiveValue] + public string? ConnectionString { get; set; } + } +} \ No newline at end of file diff --git a/Extensions/PostgreSQL/Settings/PostgreSinkSettings.cs b/Extensions/PostgreSQL/Settings/PostgreSinkSettings.cs new file mode 100644 index 0000000..3ab0bde --- /dev/null +++ b/Extensions/PostgreSQL/Settings/PostgreSinkSettings.cs @@ -0,0 +1,14 @@ +using System.ComponentModel.DataAnnotations; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + + public class PostgreSinkSettings : PostgreBaseSettings + { + [Required] + public string TableName { get; set; } + public bool? AppendDataToTable { get; set; } + public bool? DropAndCreateTable { get; set; } + + } +} \ No newline at end of file diff --git a/Extensions/PostgreSQL/Settings/PostgreSourceSettings.cs b/Extensions/PostgreSQL/Settings/PostgreSourceSettings.cs new file mode 100644 index 0000000..96549f3 --- /dev/null +++ b/Extensions/PostgreSQL/Settings/PostgreSourceSettings.cs @@ -0,0 +1,15 @@ +using Cosmos.DataTransfer.Interfaces.Manifest; +using System.ComponentModel.DataAnnotations; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + public class PostgreSourceSettings:PostgreBaseSettings + { + [Required] + [SensitiveValue] + public string? ConnectionString { get; set; } + + [Required] + public string? QueryText { get; set; } + } +} diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj index 3031dea..5800558 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj @@ -8,7 +8,7 @@ - +