Skip to content

Commit

Permalink
SynchronizationContext and blocking calls (#1078)
Browse files Browse the repository at this point in the history
* Adding context

* Adding netfx test project

* conditional Task.Run

* Using TaskHelper

* Wrapping more calls

* yml update

* Adding query in test

* Undoing

* Removing duplicates

* Removing more duplicates

* Adding UTs for TaskHelper

* Removing unneeded file

* Undoing more changes

* missing indent

* Changing assemblyname

* Testing another SNK

* YML changes

* AssemblyName

* Testing properties

* Testing with 471

* Testing netcore

* net461

* Original setup

* Testing with vstest

* paths

* Using full path

* Updating vstest

* Forcing version

* Refactoring

* Undoing other changes

* Parameters

* breaklines

* Undoing some changes

* Inline wrapper not conditional

* Fixing tests

* internal properties

* Fixing tests

* Fixing more tests

* Fixing tests
  • Loading branch information
ealsur authored and j82w committed Dec 11, 2019
1 parent 736369f commit 232e180
Show file tree
Hide file tree
Showing 36 changed files with 1,172 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public ChangeFeedProcessorBuilder WithLeaseContainer(Container leaseContainer)
if (this.leaseContainer != null) throw new InvalidOperationException("The builder already defined a lease container.");
if (this.LeaseStoreManager != null) throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");

this.leaseContainer = (ContainerCore)leaseContainer;
this.leaseContainer = (ContainerInlineCore)leaseContainer;
return this;
}

Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public virtual Task<AccountProperties> ReadAccountAsync()
/// <returns>Cosmos database proxy</returns>
public virtual Database GetDatabase(string id)
{
return new DatabaseCore(this.ClientContext, id);
return new DatabaseInlineCore(new DatabaseCore(this.ClientContext, id));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;

// This class acts as a wrapper for environments that use SynchronizationContext.
internal sealed class ConflictsInlineCore : Conflicts
{
private readonly ConflictsCore conflicts;

internal ConflictsInlineCore(ConflictsCore conflicts)
{
if (conflicts == null)
{
throw new ArgumentNullException(nameof(conflicts));
}

this.conflicts = conflicts;
}

public override Task<ResponseMessage> DeleteAsync(
ConflictProperties conflict,
PartitionKey partitionKey,
CancellationToken cancellationToken = default(CancellationToken))
{
return TaskHelper.RunInlineIfNeededAsync(() => this.conflicts.DeleteAsync(conflict, partitionKey, cancellationToken));
}

public override FeedIterator GetConflictQueryStreamIterator(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryStreamIterator(queryText, continuationToken, requestOptions);
}

public override FeedIterator<T> GetConflictQueryIterator<T>(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryIterator<T>(queryText, continuationToken, requestOptions);
}

public override FeedIterator GetConflictQueryStreamIterator(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryStreamIterator(queryDefinition, continuationToken, requestOptions);
}

public override FeedIterator<T> GetConflictQueryIterator<T>(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.conflicts.GetConflictQueryIterator<T>(queryDefinition, continuationToken, requestOptions);
}

public override Task<ItemResponse<T>> ReadCurrentAsync<T>(
ConflictProperties cosmosConflict,
PartitionKey partitionKey,
CancellationToken cancellationToken = default(CancellationToken))
{
return TaskHelper.RunInlineIfNeededAsync(() => this.conflicts.ReadCurrentAsync<T>(cosmosConflict, partitionKey, cancellationToken));
}

public override T ReadConflictContent<T>(ConflictProperties cosmosConflict)
{
return this.conflicts.ReadConflictContent<T>(cosmosConflict);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ internal ContainerCore(
id: containerId);

this.Database = database;
this.Conflicts = new ConflictsCore(this.ClientContext, this);
this.Scripts = new ScriptsCore(this, this.ClientContext);
this.Conflicts = new ConflictsInlineCore(new ConflictsCore(this.ClientContext, this));
this.Scripts = new ScriptsInlineCore(new ScriptsCore(this, this.ClientContext));
this.cachedUriSegmentWithoutId = this.GetResourceSegmentUriWithoutId();
this.queryClient = cosmosQueryClient ?? new CosmosQueryClientCore(this.ClientContext, this);
this.BatchExecutor = this.InitializeBatchExecutorForContainer();
Expand Down
252 changes: 252 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

// This class acts as a wrapper for environments that use SynchronizationContext.
internal sealed partial class ContainerInlineCore : Container
{
private readonly ContainerCore container;

public override string Id => this.container.Id;

public override Conflicts Conflicts => this.container.Conflicts;

public override Scripts.Scripts Scripts => this.container.Scripts;

internal CosmosClientContext ClientContext => this.container.ClientContext;

internal Uri LinkUri => this.container.LinkUri;

internal ContainerInlineCore(ContainerCore container)
{
if (container == null)
{
throw new ArgumentNullException(nameof(container));
}

this.container = container;
}

public override Task<ContainerResponse> ReadContainerAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadContainerAsync(requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReadContainerStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadContainerStreamAsync(requestOptions, cancellationToken));
}

public override Task<ContainerResponse> ReplaceContainerAsync(
ContainerProperties containerProperties,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceContainerAsync(containerProperties, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReplaceContainerStreamAsync(
ContainerProperties containerProperties,
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceContainerStreamAsync(containerProperties, requestOptions, cancellationToken));
}

public override Task<ContainerResponse> DeleteContainerAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteContainerAsync(requestOptions, cancellationToken));
}

public override Task<ResponseMessage> DeleteContainerStreamAsync(
ContainerRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteContainerStreamAsync(requestOptions, cancellationToken));
}

public override Task<int?> ReadThroughputAsync(CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadThroughputAsync(cancellationToken));
}

public override Task<ThroughputResponse> ReadThroughputAsync(
RequestOptions requestOptions,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadThroughputAsync(requestOptions, cancellationToken));
}

public override Task<ThroughputResponse> ReplaceThroughputAsync(
int throughput,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceThroughputAsync(throughput, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> CreateItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.CreateItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> CreateItemAsync<T>(T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.CreateItemAsync<T>(item, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReadItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadItemStreamAsync(id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> ReadItemAsync<T>(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReadItemAsync<T>(id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> UpsertItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.UpsertItemStreamAsync(streamPayload, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> UpsertItemAsync<T>(
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.UpsertItemAsync<T>(item, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> ReplaceItemStreamAsync(
Stream streamPayload,
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceItemStreamAsync(streamPayload, id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> ReplaceItemAsync<T>(
T item,
string id,
PartitionKey? partitionKey = null,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.ReplaceItemAsync<T>(item, id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ResponseMessage> DeleteItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteItemStreamAsync(id, partitionKey, requestOptions, cancellationToken));
}

public override Task<ItemResponse<T>> DeleteItemAsync<T>(
string id,
PartitionKey partitionKey,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
return TaskHelper.RunInlineIfNeededAsync(() => this.container.DeleteItemAsync<T>(id, partitionKey, requestOptions, cancellationToken));
}

public override FeedIterator GetItemQueryStreamIterator(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryStreamIterator(queryDefinition, continuationToken, requestOptions);
}

public override FeedIterator<T> GetItemQueryIterator<T>(
QueryDefinition queryDefinition,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryIterator<T>(queryDefinition, continuationToken, requestOptions);
}

public override FeedIterator GetItemQueryStreamIterator(string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryStreamIterator(queryText, continuationToken, requestOptions);
}

public override FeedIterator<T> GetItemQueryIterator<T>(
string queryText = null,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemQueryIterator<T>(queryText, continuationToken, requestOptions);
}

public override IOrderedQueryable<T> GetItemLinqQueryable<T>(bool allowSynchronousQueryExecution = false,
string continuationToken = null,
QueryRequestOptions requestOptions = null)
{
return this.container.GetItemLinqQueryable<T>(allowSynchronousQueryExecution, continuationToken, requestOptions);
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
string processorName,
ChangesHandler<T> onChangesDelegate)
{
return this.container.GetChangeFeedProcessorBuilder<T>(processorName, onChangesDelegate);
}

public override ChangeFeedProcessorBuilder GetChangeFeedEstimatorBuilder(string processorName,
ChangesEstimationHandler estimationDelegate,
TimeSpan? estimationPeriod = null)
{
return this.container.GetChangeFeedEstimatorBuilder(processorName, estimationDelegate, estimationPeriod);
}

public override TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey)
{
return this.container.CreateTransactionalBatch(partitionKey);
}

public static implicit operator ContainerCore(ContainerInlineCore containerInlineCore) => containerInlineCore.container;
}
}
8 changes: 4 additions & 4 deletions Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ public override Container GetContainer(string id)
throw new ArgumentNullException(nameof(id));
}

return new ContainerCore(
return new ContainerInlineCore(new ContainerCore(
this.ClientContext,
this,
id);
id));
}

public override Task<ResponseMessage> CreateContainerStreamAsync(
Expand Down Expand Up @@ -315,10 +315,10 @@ public override User GetUser(string id)
throw new ArgumentNullException(nameof(id));
}

return new UserCore(
return new UserInlineCore(new UserCore(
this.ClientContext,
this,
id);
id));
}

public Task<ResponseMessage> CreateUserStreamAsync(
Expand Down
Loading

0 comments on commit 232e180

Please sign in to comment.