From 294b70254c703e152471583d035dbdfbe9bdaeb0 Mon Sep 17 00:00:00 2001 From: Adam Barath Date: Mon, 18 Mar 2024 13:27:45 +0100 Subject: [PATCH 1/4] fix for CosmosDB to CosmosDB doesn't migrate "Id" field #100 https://github.com/AzureCosmosDB/data-migration-desktop-tool/issues/100 --- Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs b/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs index 93032cc..a31acd2 100644 --- a/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs +++ b/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs @@ -36,7 +36,7 @@ public static class DataItemExtensions { object? value = source.GetValue(field); var fieldName = field; - if (string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase) && requireStringId && !containsLowercaseIdField) + if (string.Equals(field, "id", StringComparison.CurrentCulture) && requireStringId && !containsLowercaseIdField) { value = value?.ToString(); fieldName = "id"; From adb140e2045a94e25ca3a6400bc2dfeaad7b2b51 Mon Sep 17 00:00:00 2001 From: Stefan Negritoiu Date: Wed, 20 Mar 2024 18:15:56 -0700 Subject: [PATCH 2/4] introduce IgnoreNullValue setting for CosmosDB sink useful to save space when storing sparse documents --- .../CosmosDataSinkExtension.cs | 2 +- .../CosmosExtensionServices.cs | 8 ++++++++ .../CosmosSinkSettings.cs | 1 + .../DataItemExtensions.cs | 11 +++++++++-- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs index c3696c4..b8ca140 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs @@ -93,7 +93,7 @@ void ReportCount(int i) } } - var convertedObjects = dataItems.Select(di => di.BuildDynamicObjectTree(true)).Where(o => o != null).OfType(); + var convertedObjects = dataItems.Select(di => di.BuildDynamicObjectTree(requireStringId: true, ignoreNullValues: settings.IgnoreNullValues)).Where(o => o != null).OfType(); var batches = convertedObjects.Buffer(settings.BatchSize); var retry = GetRetryPolicy(settings.MaxRetryCount, settings.InitialRetryDurationMs); await foreach (var batch in batches.WithCancellation(cancellationToken)) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs index dd6766e..610015f 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs @@ -22,6 +22,14 @@ public static CosmosClient CreateClient(CosmosSettingsBase settings, string disp AllowBulkExecution = true, EnableContentResponseOnWrite = false, }; + + if (settings is CosmosSinkSettings sinkSettings) + { + clientOptions.SerializerOptions = new CosmosSerializationOptions + { + IgnoreNullValues = sinkSettings.IgnoreNullValues + }; + } CosmosClient? cosmosClient; if (settings.UseRbacAuth) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs index 3b1a878..a17a73d 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs @@ -15,6 +15,7 @@ public class CosmosSinkSettings : CosmosSettingsBase, IDataExtensionSettings public bool IsServerlessAccount { get; set; } = false; public bool UseSharedThroughput { get; set; } = false; public DataWriteMode WriteMode { get; set; } = DataWriteMode.Insert; + public bool IgnoreNullValues { get; set; } = false; public List? PartitionKeyPaths { get; set; } public override IEnumerable Validate(ValidationContext validationContext) diff --git a/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs b/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs index 93032cc..fdc45fd 100644 --- a/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs +++ b/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs @@ -11,10 +11,12 @@ public static class DataItemExtensions /// If true, adds a new GUID "id" field to any top level items where one is not already present. /// A dynamic object containing the entire data structure. /// The returned ExpandoObject can be used directly as an IDictionary. - public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false) + public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false, bool ignoreNullValues = false) { - if (source == null) + if (source == null) + { return null; + } var fields = source.GetFieldNames().ToList(); var item = new ExpandoObject(); @@ -35,6 +37,11 @@ public static class DataItemExtensions foreach (string field in fields) { object? value = source.GetValue(field); + if (ignoreNullValues && value == null) + { + continue; + } + var fieldName = field; if (string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase) && requireStringId && !containsLowercaseIdField) { From 963361ad15f9c6b87687ec821ff0d1ec8b6561e5 Mon Sep 17 00:00:00 2001 From: Stefan Negritoiu Date: Wed, 20 Mar 2024 19:05:16 -0700 Subject: [PATCH 3/4] include AddRate when reporting progress --- .../CosmosDataSinkExtension.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs index c3696c4..df98d3e 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs @@ -89,7 +89,7 @@ void ReportCount(int i) addedCount += i; if (addedCount % 500 == 0) { - logger.LogInformation("{AddedCount} records added after {TotalSeconds}s", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}"); + logger.LogInformation("{AddedCount} records added after {TotalSeconds}s ({AddRate} records/s)", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}", $"{(int)(addedCount / (timer.ElapsedMilliseconds / 1000.0))}"); } } @@ -111,7 +111,7 @@ void ReportCount(int i) throw new Exception($"Only {addedCount} of {inputCount} records were added to Cosmos"); } - logger.LogInformation("Added {AddedCount} total records in {TotalSeconds}s", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}"); + logger.LogInformation("Added {AddedCount} total records in {TotalSeconds}s ({AddRate} records/s)", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}", $"{(int)(addedCount / (timer.ElapsedMilliseconds / 1000.0))}"); } private static AsyncRetryPolicy GetRetryPolicy(int maxRetryCount, int initialRetryDuration) From d850a5b9cdda38a1cedcc6c03d9d02b41d3f3c6e Mon Sep 17 00:00:00 2001 From: John Bowen Date: Fri, 29 Mar 2024 12:12:54 -0700 Subject: [PATCH 4/4] Adding option to pass through alternate cased id fields --- .../CosmosDataSinkExtensionTests.cs | 78 +++++++++++++++++++ .../CosmosDataSinkExtension.cs | 5 +- .../CosmosSinkSettings.cs | 1 + Extensions/Cosmos/README.md | 3 +- .../DataItemExtensions.cs | 40 +++++++--- 5 files changed, 115 insertions(+), 12 deletions(-) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs index 09505a6..c12a98b 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs @@ -1,4 +1,5 @@ using Cosmos.DataTransfer.Interfaces; +using Microsoft.VisualStudio.TestTools.UnitTesting; namespace Cosmos.DataTransfer.CosmosExtension.UnitTests { @@ -54,5 +55,82 @@ public void BuildDynamicObjectTree_WithNestedArrays_WorksCorrectly() Assert.AreEqual("sub2-1", secondSubArray[0].id); } + + [TestMethod] + public void BuildDynamicObjectTree_WithAnyCaseIds_UsesSourceIdValue() + { + var numeric = Random.Shared.Next(); + var lower = Guid.NewGuid().ToString(); + var upper = Guid.NewGuid().ToString(); + var mixed = Guid.NewGuid().ToString(); + var reversed = Guid.NewGuid().ToString(); + var item = new CosmosDictionaryDataItem(new Dictionary() + { + { "id", numeric }, + }); + + dynamic obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!; + Assert.AreEqual(numeric.ToString(), obj.id); + + item = new CosmosDictionaryDataItem(new Dictionary() + { + { "id", lower }, + }); + + obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!; + Assert.AreEqual(lower, obj.id); + + item = new CosmosDictionaryDataItem(new Dictionary() + { + { "ID", upper }, + }); + obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!; + Assert.AreEqual(upper, obj.id); + + item = new CosmosDictionaryDataItem(new Dictionary() + { + { "Id", mixed }, + }); + obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!; + Assert.AreEqual(mixed, obj.id); + + item = new CosmosDictionaryDataItem(new Dictionary() + { + { "iD", reversed }, + }); + obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!; + Assert.AreEqual(reversed, obj.id); + } + + [TestMethod] + public void BuildDynamicObjectTree_WithPreservedMixedCaseIds_PassesThroughSourceValues() + { + var id = Random.Shared.Next(); + var upper = Guid.NewGuid().ToString(); + var mixed = Guid.NewGuid().ToString(); + var item = new CosmosDictionaryDataItem(new Dictionary() + { + { "id", id }, + { "ID", upper }, + { "Id", mixed } + }); + + dynamic obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: true)!; + Assert.AreEqual(id.ToString(), obj.id); + Assert.AreEqual(upper, obj.ID); + Assert.AreEqual(mixed, obj.Id); + + item = new CosmosDictionaryDataItem(new Dictionary() + { + { "ID", upper }, + { "Id", mixed } + }); + obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: true)!; + Assert.AreEqual(upper, obj.ID); + Assert.AreEqual(mixed, obj.Id); + string? cosmosId = obj.id; + Assert.IsNotNull(cosmosId); + Assert.IsFalse(string.IsNullOrWhiteSpace(cosmosId)); + } } } diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs index c3696c4..93f90ff 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs @@ -93,7 +93,10 @@ void ReportCount(int i) } } - var convertedObjects = dataItems.Select(di => di.BuildDynamicObjectTree(true)).Where(o => o != null).OfType(); + var convertedObjects = dataItems + .Select(di => di.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: settings.PreserveMixedCaseIds)) + .Where(o => o != null) + .OfType(); var batches = convertedObjects.Buffer(settings.BatchSize); var retry = GetRetryPolicy(settings.MaxRetryCount, settings.InitialRetryDurationMs); await foreach (var batch in batches.WithCancellation(cancellationToken)) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs index 3b1a878..46c6349 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs @@ -14,6 +14,7 @@ public class CosmosSinkSettings : CosmosSettingsBase, IDataExtensionSettings public bool UseAutoscaleForCreatedContainer { get; set; } = true; public bool IsServerlessAccount { get; set; } = false; public bool UseSharedThroughput { get; set; } = false; + public bool PreserveMixedCaseIds { get; set; } = false; public DataWriteMode WriteMode { get; set; } = DataWriteMode.Insert; public List? PartitionKeyPaths { get; set; } diff --git a/Extensions/Cosmos/README.md b/Extensions/Cosmos/README.md index 8d9875a..8c70e15 100644 --- a/Extensions/Cosmos/README.md +++ b/Extensions/Cosmos/README.md @@ -44,7 +44,7 @@ Or with RBAC: } ``` -Sink requires an additional `PartitionKeyPath` parameter which is used when creating the container if it does not exist. To use hierarchical partition keys, instead use the `PartitionKeyPaths` setting to supply an array of up to 3 paths. It also supports an optional `RecreateContainer` parameter (`false` by default) to delete and then recreate the container to ensure only newly imported data is present. The optional `BatchSize` parameter (100 by default) sets the number of items to accumulate before inserting. `ConnectionMode` can be set to either `Gateway` (default) or `Direct` to control how the client connects to the CosmosDB service. For situations where a container is created as part of the transfer operation `CreatedContainerMaxThroughput` (in RUs) and `UseAutoscaleForCreatedContainer` provide the initial throughput settings which will be in effect when executing the transfer. To instead use shared throughput that has been provisioned at the database level, set the `UseSharedThroughput` parameter to `true`. The optional `WriteMode` parameter specifies the type of data write to use: `InsertStream`, `Insert`, `UpsertStream`, or `Upsert`. The `IsServerlessAccount` parameter specifies whether the target account uses Serverless instead of Provisioned throughput, which affects the way containers are created. Additional parameters allow changing the behavior of the Cosmos client appropriate to your environment. +Sink requires an additional `PartitionKeyPath` parameter which is used when creating the container if it does not exist. To use hierarchical partition keys, instead use the `PartitionKeyPaths` setting to supply an array of up to 3 paths. It also supports an optional `RecreateContainer` parameter (`false` by default) to delete and then recreate the container to ensure only newly imported data is present. The optional `BatchSize` parameter (100 by default) sets the number of items to accumulate before inserting. `ConnectionMode` can be set to either `Gateway` (default) or `Direct` to control how the client connects to the CosmosDB service. For situations where a container is created as part of the transfer operation `CreatedContainerMaxThroughput` (in RUs) and `UseAutoscaleForCreatedContainer` provide the initial throughput settings which will be in effect when executing the transfer. To instead use shared throughput that has been provisioned at the database level, set the `UseSharedThroughput` parameter to `true`. The optional `WriteMode` parameter specifies the type of data write to use: `InsertStream`, `Insert`, `UpsertStream`, or `Upsert`. The `IsServerlessAccount` parameter specifies whether the target account uses Serverless instead of Provisioned throughput, which affects the way containers are created. Additional parameters allow changing the behavior of the Cosmos client appropriate to your environment. The `PreserveMixedCaseIds` parameter (`false` by default) ignores differently cased `id` fields and writes them through without modification, while generating a separate lowercased `id` field as required by Cosmos. ### Sink @@ -62,6 +62,7 @@ Sink requires an additional `PartitionKeyPath` parameter which is used when crea "CreatedContainerMaxThroughput": 1000, "UseAutoscaleForCreatedContainer": true, "WriteMode": "InsertStream", + "PreserveMixedCaseIds": false, "IsServerlessAccount": false, "UseSharedThroughput": false } diff --git a/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs b/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs index a31acd2..510ccd3 100644 --- a/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs +++ b/Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs @@ -9,37 +9,57 @@ public static class DataItemExtensions /// /// /// If true, adds a new GUID "id" field to any top level items where one is not already present. + /// If true, disregards differently cased "id" fields for purposes of required "id" and passes them through. /// A dynamic object containing the entire data structure. /// The returned ExpandoObject can be used directly as an IDictionary. - public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false) + public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false, bool preserveMixedCaseIds = false) { if (source == null) return null; var fields = source.GetFieldNames().ToList(); var item = new ExpandoObject(); - + /* * If the item contains a lowercase id field, we can take it as is. - * If we have an uppercase Id or ID field, but no lowercase id, we will rename it to id. + * If we have an uppercase Id or ID field, but no lowercase id, we will rename it to id, unless `preserveMixedCaseIds` is set to true. + * If `preserveMixedCaseIds` is set to true, any differently cased "id" fields will be passed through as normal properties with no casing change and a separate "id" will be generated. * Then it can be used i.e. as CosmosDB primary key, when `requireStringId` is set to true. */ var containsLowercaseIdField = fields.Contains("id", StringComparer.CurrentCulture); var containsAnyIdField = fields.Contains("id", StringComparer.CurrentCultureIgnoreCase); - - if (requireStringId && !containsAnyIdField) + + if (requireStringId) { - item.TryAdd("id", Guid.NewGuid().ToString()); + bool mismatchedIdCasing = preserveMixedCaseIds && !containsLowercaseIdField; + if (!containsAnyIdField || mismatchedIdCasing) + { + item.TryAdd("id", Guid.NewGuid().ToString()); + } } - + foreach (string field in fields) { object? value = source.GetValue(field); var fieldName = field; - if (string.Equals(field, "id", StringComparison.CurrentCulture) && requireStringId && !containsLowercaseIdField) + if (requireStringId && string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase)) { - value = value?.ToString(); - fieldName = "id"; + if (preserveMixedCaseIds) + { + if (string.Equals(field, "id", StringComparison.CurrentCulture)) + { + value = value?.ToString(); + } + } + else if (!containsLowercaseIdField) + { + value = value?.ToString(); + fieldName = "id"; + } + else + { + value = value?.ToString(); + } } else if (value is IDataItem child) {