Skip to content

Commit

Permalink
Cosmos DB: Fix Hierarchical PK support (#871)
Browse files Browse the repository at this point in the history
* Bump package

* bump sdk

* Adding print

* Clear log

* Disable test

* Debugging

* Removing changes

* re-adding logs

* more logs

* Adding more logs

* more logs

* Removing all logs finally
  • Loading branch information
ealsur authored Sep 22, 2023
1 parent 4d9b059 commit ad089b5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 61 deletions.
2 changes: 1 addition & 1 deletion build/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- Extensions can have independent versions and only increment when released -->
<Version>3.0.0$(VersionSuffix)</Version>
<ExtensionsVersion>5.0.0$(VersionSuffix)</ExtensionsVersion> <!-- WebJobs.Extensions -->
<CosmosDBVersion>4.3.1$(VersionSuffix)</CosmosDBVersion>
<CosmosDBVersion>4.4.0$(VersionSuffix)</CosmosDBVersion>
<HttpVersion>3.2.0$(VersionSuffix)</HttpVersion>
<MobileAppsVersion>3.0.0$(VersionSuffix)</MobileAppsVersion>
<SendGridVersion>3.0.3$(VersionSuffix)</SendGridVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<WarningsAsErrors />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.33.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.35.4" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.37" />
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.1.0" />
Expand Down
135 changes: 76 additions & 59 deletions test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,66 +33,66 @@ public class CosmosDBEndToEndTests
[Fact]
public async Task CosmosDBEndToEnd()
{
_loggerProvider.ClearAllLogMessages();
using (var host = await StartHostAsync(typeof(EndToEndTestClass)))
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);

// Call the outputs function directly, which will write out 3 documents
// using with the 'input' property set to the value we provide.
var input = Guid.NewGuid().ToString();
var parameter = new Dictionary<string, object>();
parameter["input"] = input;
try
{
// Call the outputs function directly, which will write out 3 documents
// using with the 'input' property set to the value we provide.
var input = Guid.NewGuid().ToString();
var parameter = new Dictionary<string, object>();
parameter["input"] = input;

await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Outputs), parameter);
await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Outputs), parameter);

// Also insert a new Document so we can query on it.
var response = await client.GetContainer(DatabaseName, CollectionName).UpsertItemAsync<Item>(new Item() { Id = Guid.NewGuid().ToString() });
// Also insert a new Document so we can query on it.
var response = await client.GetContainer(DatabaseName, CollectionName).UpsertItemAsync<Item>(new Item() { Id = Guid.NewGuid().ToString() });

// Now craft a queue message to send to the Inputs, which will pull these documents.
var queueInput = new QueueItem
{
DocumentId = response.Resource.Id,
Input = input
};
// Now craft a queue message to send to the Inputs, which will pull these documents.
var queueInput = new QueueItem
{
DocumentId = response.Resource.Id,
Input = input
};

parameter.Clear();
parameter["item"] = JsonConvert.SerializeObject(queueInput);
parameter.Clear();
parameter["item"] = JsonConvert.SerializeObject(queueInput);

await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Inputs), parameter);
await host.GetJobHost().CallAsync(nameof(EndToEndTestClass.Inputs), parameter);

await TestHelpers.Await(() =>
{
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with string called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with retry called!")) == 8
&& logMessages.Count(p => p.Exception != null && p.Exception.InnerException.Message.Contains("Test exception") && !p.Category.StartsWith("Host.Results")) > 0;
});

// Make sure the Options were logged. Just check a few values.
string optionsMessage = _loggerProvider.GetAllLogMessages()
.Single(m => m.Category == "Microsoft.Azure.WebJobs.Hosting.OptionsLoggingService" && m.FormattedMessage.StartsWith(nameof(CosmosDBOptions)))
.FormattedMessage;
JObject loggedOptions = JObject.Parse(optionsMessage.Substring(optionsMessage.IndexOf(Environment.NewLine)));
Assert.Null(loggedOptions["ConnectionMode"].Value<string>());

// Clean-up leases
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
await TestHelpers.Await(() =>
{
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
}
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with string called!")) == 4
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger with retry called!")) == 8
&& logMessages.Count(p => p.Exception != null && p.Exception.InnerException.Message.Contains("Test exception") && !p.Category.StartsWith("Host.Results")) > 0;
});

// Make sure the Options were logged. Just check a few values.
string optionsMessage = _loggerProvider.GetAllLogMessages()
.Single(m => m.Category == "Microsoft.Azure.WebJobs.Hosting.OptionsLoggingService" && m.FormattedMessage.StartsWith(nameof(CosmosDBOptions)))
.FormattedMessage;
JObject loggedOptions = JObject.Parse(optionsMessage.Substring(optionsMessage.IndexOf(Environment.NewLine)));
Assert.Null(loggedOptions["ConnectionMode"].Value<string>());
}
finally
{
// Clean-up leases
await CleanUpLeaseContainer(client);

await host.StopAsync();
}
}
}

[Fact]
public async Task CosmosDBEndToEndCancellation()
{
_loggerProvider.ClearAllLogMessages();
using (var host = await StartHostAsync(typeof(EndToEndCancellationTestClass)))
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);
Expand All @@ -110,25 +110,23 @@ public async Task CosmosDBEndToEndCancellation()
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);

await TestHelpers.Await(() =>
try
{
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) > 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger canceled!")) == 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Saw the first document again!")) == 1
&& logMessages.Count(p => p.Exception is TaskCanceledException) > 0;
});

// Clean-up leases
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
await TestHelpers.Await(() =>
{
var logMessages = _loggerProvider.GetAllLogMessages();
return logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger called!")) > 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Trigger canceled!")) == 1
&& logMessages.Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains("Saw the first document again!")) == 1
&& logMessages.Count(p => p.Exception is TaskCanceledException) > 0;
});
}
finally
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
{
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
}
// Clean-up leases
await CleanUpLeaseContainer(client);

await host.StopAsync();
}
}
}
Expand All @@ -151,6 +149,25 @@ public static async Task<CosmosClient> InitializeDocumentClientAsync(IConfigurat
return client;
}

public static async Task CleanUpLeaseContainer(CosmosClient client)
{
Container leaseContainer = client.GetContainer(DatabaseName, LeaseCollectionName);
using FeedIterator<JObject> leaseIterator = leaseContainer.GetItemQueryIterator<JObject>();
while (leaseIterator.HasMoreResults)
{
FeedResponse<JObject> leaseIteratorResponse = await leaseIterator.ReadNextAsync();
foreach (JObject lease in leaseIteratorResponse)
{
ResponseMessage delete = await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), new PartitionKey(lease.Value<string>("id")));
if (delete.StatusCode == System.Net.HttpStatusCode.NotFound)
{
// Support old non-partitioned lease container in CI
await leaseContainer.DeleteItemStreamAsync(lease.Value<string>("id"), PartitionKey.None);
}
}
}
}

private async Task<IHost> StartHostAsync(Type testType)
{
ExplicitTypeLocator locator = new ExplicitTypeLocator(testType);
Expand Down

0 comments on commit ad089b5

Please sign in to comment.