Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Binary Encoding: Adds Binary Encoding Support for Point Operations #4652

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2cdf16b
Code changes to support binary encoding for point operations.
kundadebdatta Aug 23, 2024
387005b
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Sep 18, 2024
c0e5952
Code changes to introduce cosmos buffered stream wrapper.
kundadebdatta Sep 25, 2024
07eff74
Code changes to remove unnecessary using statement.
kundadebdatta Sep 25, 2024
265a7d3
Code changes to remove the pooling logic.
kundadebdatta Sep 25, 2024
bcdc75e
Code changes to port fixes into newtonsoft reader and writer to addre…
kundadebdatta Sep 25, 2024
4e35d0f
Code changes to set inner stream on disposal of buffered stream.
kundadebdatta Sep 26, 2024
4ac933f
Minor cosmetic code changes.
kundadebdatta Sep 26, 2024
8033075
Code changes to use clonable stream instead of buffered stream.
kundadebdatta Sep 27, 2024
4359dc8
Revert "Code changes to use clonable stream instead of buffered stream."
kundadebdatta Oct 1, 2024
88d5431
Code changes to use clonable stream from request handler when the out…
kundadebdatta Oct 2, 2024
abbcb11
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 2, 2024
16583c3
Code changes to address review comments.
kundadebdatta Oct 2, 2024
b3da27d
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 2, 2024
7fc7407
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 3, 2024
c275cf1
Code changes to fix test failures.
kundadebdatta Oct 3, 2024
da1f135
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 7, 2024
fbf9ffc
Code changes to address review comments.
kundadebdatta Oct 7, 2024
96c8581
Adding more stream tests.
kundadebdatta Oct 7, 2024
2baa15c
Code changes to fix batch item emulator tests.
kundadebdatta Oct 7, 2024
a97101a
Code changes to use cloneable stream in buffered stream.
kundadebdatta Oct 9, 2024
6981e01
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 9, 2024
9ed19b6
Code changes to fix build failures.
kundadebdatta Oct 9, 2024
dbc9596
Code changes to match ContainerCore.Items to master
kundadebdatta Oct 9, 2024
3929404
Code changes to fix build failures.
kundadebdatta Oct 9, 2024
b8151e0
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 9, 2024
cc67f67
Code changes to address review comments.
kundadebdatta Oct 10, 2024
de8aab6
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
d72dbe7
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
c932a1f
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
8855ebd
Code changes to fix container creation using binary encoding.
kundadebdatta Oct 16, 2024
7009570
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 18, 2024
f67ae93
Code changes to add internal types in a common place.
kundadebdatta Oct 18, 2024
b2b6af7
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 22, 2024
aea8a3a
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 23, 2024
3245664
Code changes to remove ContentSerializationFormat. Addressing minor r…
kundadebdatta Oct 23, 2024
cce11dd
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public override async Task<ResponseMessage> SendAsync(
request.Headers.Add(HttpConstants.HttpHeaders.Prefer, HttpConstants.HttpHeaderValues.PreferReturnMinimal);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
&& request.OperationType.IsPointOperation()
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
&& request.ResourceType == ResourceType.Document)
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
request.Headers.Add(HttpConstants.HttpHeaders.SupportedSerializationFormats, SupportedSerializationFormats.CosmosBinary.ToString());
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
if (request.Content != null)
{
request.Headers.Add(HttpConstants.HttpHeaders.ContentSerializationFormat, SupportedSerializationFormats.CosmosBinary.ToString());
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
}
}

await this.ValidateAndSetConsistencyLevelAsync(request);
this.SetPriorityLevel(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public override byte[] ReadAsBytes()
public override DateTime? ReadAsDateTime()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
return null;
}
Expand All @@ -211,7 +211,7 @@ public override byte[] ReadAsBytes()
public override DateTimeOffset? ReadAsDateTimeOffset()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down Expand Up @@ -260,7 +260,7 @@ public override byte[] ReadAsBytes()
public override string ReadAsString()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand All @@ -278,7 +278,7 @@ public override string ReadAsString()
private double? ReadNumberValue()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public override void WriteValue(bool value)
/// <param name="value">The <see cref="short"/> value to write.</param>
public override void WriteValue(short value)
{
base.WriteValue((long)value);
base.WriteValue((Int16)value);
this.jsonWriter.WriteInt16Value(value);
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand Down
20 changes: 20 additions & 0 deletions Microsoft.Azure.Cosmos/src/RequestOptions/ItemRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,26 @@ public ConsistencyLevel? ConsistencyLevel
/// </remarks>
public DedicatedGatewayRequestOptions DedicatedGatewayRequestOptions { get; set; }

/// <summary>
/// Gets or sets the boolean to enable binary response for point operations like Create, Upsert, Read, Patch, and Replace.
/// Setting this option to true will cause the response to be in binary format, which can reduce networking and CPU load
/// by not sending the resource back over the network and serializing it on the client.
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// ItemRequestOptions requestOptions = new ItemRequestOptions() { EnableBinaryResponseOnPointOperations = true };
/// ItemResponse itemResponse = await this.container.CreateItemAsync<ToDoActivity>(tests, new PartitionKey(test.status), requestOptions);
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
/// Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode);
/// Assert.IsNotNull(itemResponse.Resource);
/// ]]>
/// </code>
/// </example>
/// <remarks>
/// This is optimal for workloads where the returned resource can be processed in binary format.
/// </remarks>
internal bool EnableBinaryResponseOnPointOperations { get; set; }
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
Expand Down
121 changes: 92 additions & 29 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ public async Task<ResponseMessage> CreateItemStreamAsync(
ITrace trace,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
}

public async Task<ItemResponse<T>> CreateItemAsync<T>(
Expand All @@ -79,7 +81,8 @@ public async Task<ItemResponse<T>> CreateItemAsync<T>(
itemId: null,
item: item,
operationType: OperationType.Create,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -92,15 +95,17 @@ public async Task<ResponseMessage> ReadItemStreamAsync(
ITrace trace,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> ReadItemAsync<T>(
Expand All @@ -115,7 +120,8 @@ public async Task<ItemResponse<T>> ReadItemAsync<T>(
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -136,7 +142,9 @@ public async Task<ResponseMessage> UpsertItemStreamAsync(
operationType: OperationType.Upsert,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> UpsertItemAsync<T>(
Expand All @@ -156,7 +164,8 @@ public async Task<ItemResponse<T>> UpsertItemAsync<T>(
itemId: null,
item: item,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -178,7 +187,9 @@ public async Task<ResponseMessage> ReplaceItemStreamAsync(
operationType: OperationType.Replace,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> ReplaceItemAsync<T>(
Expand All @@ -204,7 +215,8 @@ public async Task<ItemResponse<T>> ReplaceItemAsync<T>(
itemId: id,
item: item,
operationType: OperationType.Replace,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -217,15 +229,17 @@ public async Task<ResponseMessage> DeleteItemStreamAsync(
ITrace trace,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> DeleteItemAsync<T>(
Expand All @@ -240,7 +254,8 @@ public async Task<ItemResponse<T>> DeleteItemAsync<T>(
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -828,7 +843,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
string itemId,
T item,
OperationType operationType,
ItemRequestOptions requestOptions,
ItemRequestOptions requestOptions,
JsonSerializationFormat? targetRequestSerializationFormat,
ITrace trace,
CancellationToken cancellationToken)
{
Expand All @@ -840,7 +856,7 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
Stream itemStream;
using (trace.StartChild("ItemSerialize"))
{
itemStream = this.ClientContext.SerializerCore.ToStream<T>(item);
itemStream = this.ClientContext.SerializerCore.ToStream<T>(item);
}

// User specified PK value, no need to extract it
Expand All @@ -853,7 +869,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
operationType,
requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: targetRequestSerializationFormat);
}

PartitionKeyMismatchRetryPolicy requestRetryPolicy = null;
Expand All @@ -866,7 +883,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
itemId,
itemStream,
operationType,
requestOptions,
requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -897,7 +915,9 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
OperationType operationType,
ItemRequestOptions requestOptions,
ITrace trace,
CancellationToken cancellationToken)
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
CancellationToken cancellationToken,
JsonSerializationFormat? targetRequestSerializationFormat,
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
JsonSerializationFormat? targetResponseSerializationFormat = default)
{
if (trace == null)
{
Expand All @@ -910,8 +930,18 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
}

ContainerInternal.ValidatePartitionKey(partitionKey, requestOptions);
string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

// Convert Text to Binary Stream.
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
if (CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
expectedSerializationFormat: JsonSerializationFormat.Text,
targetSerializationFormat: targetRequestSerializationFormat,
inputStream: streamPayload,
outputStream: out Stream outputRequestStream))
{
streamPayload = outputRequestStream;
}

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
resourceType: ResourceType.Document,
Expand All @@ -923,9 +953,20 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
streamPayload: streamPayload,
requestEnricher: null,
trace: trace,
cancellationToken: cancellationToken);

return responseMessage;
cancellationToken: cancellationToken);

// Convert Binary Stream to Text.
if ((requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
expectedSerializationFormat: JsonSerializationFormat.Binary,
targetSerializationFormat: targetResponseSerializationFormat,
inputStream: responseMessage?.Content,
outputStream: out Stream outputResponseStream))
{
responseMessage.Content = outputResponseStream;
}

return responseMessage;
}

public override async Task<PartitionKey> GetPartitionKeyValueFromStreamAsync(
Expand Down Expand Up @@ -1120,13 +1161,14 @@ public async Task<ItemResponse<T>> PatchItemAsync<T>(
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(responseMessage);
}

public Task<ResponseMessage> PatchItemStreamAsync(
public async Task<ResponseMessage> PatchItemStreamAsync(
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
string id,
PartitionKey partitionKey,
IReadOnlyList<PatchOperation> patchOperations,
ITrace trace,
PatchItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default,
JsonSerializationFormat? targetResponseSerializationFormat = default)
{
if (trace == null)
{
Expand Down Expand Up @@ -1159,8 +1201,8 @@ public Task<ResponseMessage> PatchItemStreamAsync(
{
patchOperationsStream = this.ClientContext.SerializerCore.ToStream(new PatchSpec(patchOperations, requestOptions));
}

return this.ClientContext.ProcessResourceOperationStreamAsync(
ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.GetResourceUri(
requestOptions,
OperationType.Patch,
Expand All @@ -1174,7 +1216,19 @@ public Task<ResponseMessage> PatchItemStreamAsync(
streamPayload: patchOperationsStream,
requestEnricher: null,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken);

if ((requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
expectedSerializationFormat: JsonSerializationFormat.Binary,
targetSerializationFormat: targetResponseSerializationFormat,
inputStream: responseMessage?.Content,
outputStream: out Stream outputResponseStream))
{
responseMessage.Content = outputResponseStream;
}

return responseMessage;
}

public Task<ResponseMessage> PatchItemStreamAsync(
Expand Down Expand Up @@ -1217,7 +1271,9 @@ public Task<ResponseMessage> PatchItemStreamAsync(
operationType: OperationType.Patch,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
Expand Down Expand Up @@ -1252,6 +1308,13 @@ private ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderPrivate(
container: this,
changeFeedProcessor: changeFeedProcessor,
applyBuilderConfiguration: changeFeedProcessor.ApplyBuildConfiguration).WithChangeFeedMode(mode);
}

private static JsonSerializationFormat GetTargetRequestSerializationFormat()
{
return ConfigurationManager.IsBinaryEncodingEnabled()
? JsonSerializationFormat.Binary
: JsonSerializationFormat.Text;
}
}
}
Loading
Loading