Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Users/philipthomas msft/new avad #4595

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a71d474
initial checkin
philipthomas-MSFT May 16, 2024
749c7ff
feedRangeFetails added to headers as a tuple. may created a public ty…
philipthomas-MSFT May 30, 2024
158b26d
FeedRangeDetail type in place of tuple
philipthomas-MSFT May 30, 2024
5472e18
some tests but impl is incomplete
philipthomas-MSFT Jun 3, 2024
1889f0f
merge
philipthomas-MSFT Jul 2, 2024
5e3f277
merge
philipthomas-MSFT Jul 2, 2024
02b220e
revert after bad merge
philipthomas-MSFT Jul 2, 2024
ae7ea26
revert after a bad merge
philipthomas-MSFT Jul 2, 2024
003da7e
revert after a bad merge
philipthomas-MSFT Jul 2, 2024
4af2d91
changes
philipthomas-MSFT Jul 8, 2024
96262c0
more test to come
philipthomas-MSFT Jul 15, 2024
00e32d3
add a test for FeedRange on Header, Find overlapping ranges using Par…
philipthomas-MSFT Jul 16, 2024
8fc8608
removing the type FeedRangeDetails, collectionRId was not needed.
philipthomas-MSFT Jul 16, 2024
d62f05f
mark as Preview
philipthomas-MSFT Jul 16, 2024
76ee8f0
FindOverlappingRanges test. Just FeedRange.
philipthomas-MSFT Jul 17, 2024
5b2cd52
The start, not finished, test for container.FindOverlappingRangesAsync
philipthomas-MSFT Jul 17, 2024
18ce956
partition key tests
philipthomas-MSFT Jul 18, 2024
b1ed9a1
more testing. still not satisfied. will do more until the end of week.
philipthomas-MSFT Jul 18, 2024
df0eb32
more test changes.
philipthomas-MSFT Jul 22, 2024
77be0f9
add partitionkey to test
philipthomas-MSFT Jul 22, 2024
570d5ea
Merge remote-tracking branch 'origin/master' into users/philipthomas-…
philipthomas-MSFT Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

Expand Down Expand Up @@ -59,13 +58,15 @@ public static ChangeFeedPartitionKeyResultSetIteratorCore Create(
container: container,
mode: mode,
changeFeedStartFrom: startFrom,
options: requestOptions);
options: requestOptions,
feedRangeEpk: lease?.FeedRange is FeedRangeEpk epk ? epk : default);
}

private readonly CosmosClientContext clientContext;

private readonly ChangeFeedRequestOptions changeFeedOptions;
private readonly ChangeFeedMode mode;
private readonly FeedRangeEpk feedRangeEpk;

private ChangeFeedStartFrom changeFeedStartFrom;
private bool hasMoreResultsInternal;
Expand All @@ -74,13 +75,15 @@ private ChangeFeedPartitionKeyResultSetIteratorCore(
ContainerInternal container,
ChangeFeedMode mode,
ChangeFeedStartFrom changeFeedStartFrom,
ChangeFeedRequestOptions options)
ChangeFeedRequestOptions options,
FeedRangeEpk feedRangeEpk)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.mode = mode;
this.changeFeedStartFrom = changeFeedStartFrom ?? throw new ArgumentNullException(nameof(changeFeedStartFrom));
this.clientContext = this.container.ClientContext;
this.changeFeedOptions = options;
this.feedRangeEpk = feedRangeEpk;
}

public override bool HasMoreResults => this.hasMoreResultsInternal;
Expand Down Expand Up @@ -139,7 +142,10 @@ public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, Cancella
this.hasMoreResultsInternal = responseMessage.IsSuccessStatusCode;
responseMessage.Headers.ContinuationToken = etag;
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)this.changeFeedStartFrom.FeedRange);


// Set the FeedRangeEpk response header.
responseMessage.Headers.FeedRangeEpk = this.feedRangeEpk;

return responseMessage;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ public abstract class ChangeFeedProcessorContext
/// <summary>
/// Gets the headers related to the service response that provided the changes.
/// </summary>
public abstract Headers Headers { get; }
public abstract Headers Headers { get; }

#if PREVIEW
public
#else
internal
#endif
abstract FeedRange FeedRange { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ internal ChangeFeedObserverContextCore(
public CosmosDiagnostics Diagnostics => this.responseMessage.Diagnostics;

public Headers Headers => this.responseMessage.Headers;

#if PREVIEW
public
#else
internal
#endif
FeedRange FeedRange => new FeedRangeEpk(this.responseMessage.Headers.FeedRangeEpk.Range);

public async Task CheckpointAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ public ChangeFeedProcessorContextCore(ChangeFeedObserverContextCore changeFeedOb

public override CosmosDiagnostics Diagnostics => this.changeFeedObserverContextCore.Diagnostics;

public override Headers Headers => this.changeFeedObserverContextCore.Headers;
public override Headers Headers => this.changeFeedObserverContextCore.Headers;

#if PREVIEW
public
#else
internal
#endif
override FeedRange FeedRange => this.changeFeedObserverContextCore.FeedRange;
}
}
13 changes: 12 additions & 1 deletion Microsoft.Azure.Cosmos/src/Headers/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,17 @@ internal static SubStatusCodes GetSubStatusCodes(string value)
}

return null;
}
}

#if PREVIEW
public
#else
internal
#endif
virtual FeedRangeEpk FeedRangeEpk
{
get;
set;
}
}
}
19 changes: 18 additions & 1 deletion Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,24 @@ public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder(
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint(
string processorName,
ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate);
ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate);

/// <summary>
/// Takes a given list of ranges and find overlapping ranges for the given partition key.
/// </summary>
/// <param name="partitionKey">A given partition key.</param>
/// <param name="feedRanges">A given list of ranges.</param>
/// <param name="cancellationToken"></param>
/// <returns>A list of overlapping ranges for the the given partition key.</returns>
public abstract Task<IReadOnlyList<Cosmos.FeedRange>> FindOverlappingRangesAsync(Cosmos.PartitionKey partitionKey, IReadOnlyList<Cosmos.FeedRange> feedRanges, CancellationToken cancellationToken = default);

/// <summary>
/// Takes a given list of ranges and find overlapping ranges for the given feed range.
/// </summary>
/// <param name="feedRange">A given feed range.</param>
/// <param name="feedRanges">A given list of ranges.</param>
/// <returns>A list of overlapping ranges for the the given feed range epk.</returns>
public abstract IReadOnlyList<Cosmos.FeedRange> FindOverlappingRanges(Cosmos.FeedRange feedRange, IReadOnlyList<Cosmos.FeedRange> feedRanges);

#if PREVIEW
/// <summary>
Expand Down
76 changes: 70 additions & 6 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed;
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Routing;
Expand Down Expand Up @@ -256,7 +253,7 @@ public Task<ResponseMessage> ReplaceContainerStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
}
}

public async Task<IReadOnlyList<FeedRange>> GetFeedRangesAsync(
ITrace trace,
Expand Down Expand Up @@ -699,6 +696,73 @@ public override FeedIterator<T> GetChangeFeedIteratorWithQuery<T>(
return new FeedIteratorCore<T>(
changeFeedIteratorCore,
responseCreator: this.ClientContext.ResponseFactory.CreateChangeFeedUserTypeResponse<T>);
}
}

public override async Task<IReadOnlyList<Cosmos.FeedRange>> FindOverlappingRangesAsync(
Cosmos.PartitionKey partitionKey,
IReadOnlyList<Cosmos.FeedRange> feedRanges,
CancellationToken cancellationToken = default)
{
List<FeedRange> overlappingRanges = new ();

foreach (Range<string> range in ContainerCore.ConvertToRange(feedRanges))
{
if (Range<string>.CheckOverlapping(
range1: await this.ConvertToRangeAsync(
partitionKey: partitionKey,
cancellationToken: cancellationToken),
range2: range))
{
overlappingRanges.Add(new FeedRangeEpk(range));
}
}

return overlappingRanges;
}

public override IReadOnlyList<Cosmos.FeedRange> FindOverlappingRanges(
Cosmos.FeedRange feedRange,
IReadOnlyList<Cosmos.FeedRange> feedRanges)
{
List<FeedRange> overlappingRanges = new List<FeedRange>();

foreach (Range<string> range in ContainerCore.ConvertToRange(feedRanges))
{
if (Range<string>.CheckOverlapping(
range1: ContainerCore.ConvertToRange(feedRange),
range2: range))
{
overlappingRanges.Add(new FeedRangeEpk(range));
}
}

return overlappingRanges;
}

private async Task<Range<string>> ConvertToRangeAsync(PartitionKey partitionKey, CancellationToken cancellationToken)
{
PartitionKeyDefinition partitionKeyDefinition = await this.GetPartitionKeyDefinitionAsync(cancellationToken);
Range<string> range = Range<string>.GetPointRange(partitionKey.InternalKey.GetEffectivePartitionKeyString(partitionKeyDefinition));

return range;
}

private static IEnumerable<Range<string>> ConvertToRange(IReadOnlyList<FeedRange> fromFeedRanges)
{
foreach (FeedRange fromFeedRange in fromFeedRanges)
{
yield return ContainerCore.ConvertToRange(fromFeedRange);
}
}

private static Range<string> ConvertToRange(FeedRange fromFeedRange)
{
if (fromFeedRange is not FeedRangeEpk feedRangeEpk)
{
return default;
}

return feedRangeEpk.Range;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,26 @@ public override Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
requestOptions: requestOptions,
task: (trace) => base.DeleteAllItemsByPartitionKeyStreamAsync(partitionKey, trace, requestOptions, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response));
}
}

public override async Task<IReadOnlyList<FeedRange>> FindOverlappingRangesAsync(
Cosmos.PartitionKey partitionKey,
IReadOnlyList<FeedRange> feedRanges,
CancellationToken cancellationToken = default)
{
return await base.FindOverlappingRangesAsync(
partitionKey: partitionKey,
feedRanges: feedRanges,
cancellationToken: cancellationToken);
}

public override IReadOnlyList<FeedRange> FindOverlappingRanges(
Cosmos.FeedRange feedRange,
IReadOnlyList<Cosmos.FeedRange> feedRanges)
{
return base.FindOverlappingRanges(
feedRange: feedRange,
feedRanges: feedRanges);
}
}
}
Loading
Loading