From 2cbf23dc8ebf0823958769ff2bdefde70a1ac6d9 Mon Sep 17 00:00:00 2001 From: "Arooshi Avasthy (from Dev Box)" Date: Fri, 9 Aug 2024 00:57:19 -0700 Subject: [PATCH 1/3] Add serializer and unit tests --- .../src/ThinClientTransportSerializer.cs | 285 ++++++++++++++++++ .../ThinClientTransportSerializerTests.cs | 279 +++++++++++++++++ 2 files changed, 564 insertions(+) create mode 100644 Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs create mode 100644 Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs diff --git a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs new file mode 100644 index 0000000000..e354938347 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs @@ -0,0 +1,285 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Buffers; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + + /// + /// blabla + /// + internal static class ThinClientTransportSerializer + { + public const string RoutedViaProxy = "x-ms-thinclient-route-via-proxy"; + public const string ProxyStartEpk = "x-ms-thinclient-range-min"; + public const string ProxyEndEpk = "x-ms-thinclient-range-max"; + + public const string ProxyOperationType = "x-ms-thinclient-proxy-operation-type"; + public const string ProxyResourceType = "x-ms-thinclient-proxy-resource-type"; + + private static readonly PartitionKeyDefinition HashV2SinglePath; + + static ThinClientTransportSerializer() + { + HashV2SinglePath = new PartitionKeyDefinition + { + Kind = PartitionKind.Hash, + Version = Documents.PartitionKeyDefinitionVersion.V2, + }; + HashV2SinglePath.Paths.Add("/id"); + } + + /// + /// Wrapper to expose a public bufferprovider for the RNTBD stack. + /// +#pragma warning disable CA1034 // Nested types should not be visible + public sealed class BufferProviderWrapper +#pragma warning restore CA1034 // Nested types should not be visible + { + internal BufferProvider Provider { get; set; } = new (); + } + + /// + /// Serialize the Proxy request to the RNTBD protocol format. + /// Today this takes the HttprequestMessage and reconstructs the DSR. + /// If the SDK can push properties to the HttpRequestMessage then the handler above having + /// the DSR can allow us to push that directly to the serialization. + /// + public static async Task SerializeProxyRequestAsync( + BufferProviderWrapper bufferProvider, + string accountName, + HttpRequestMessage requestMessage) + { + // Skip this and use the original DSR. + OperationType operationType = (OperationType)Enum.Parse(typeof(OperationType), requestMessage.Headers.GetValues(ProxyOperationType).First()); + ResourceType resourceType = (ResourceType)Enum.Parse(typeof(ResourceType), requestMessage.Headers.GetValues(ProxyResourceType).First()); + + Guid activityId = Guid.Parse(requestMessage.Headers.GetValues(HttpConstants.HttpHeaders.ActivityId).First()); + + Stream requestStream = null; + if (requestMessage.Content != null) + { + requestStream = await requestMessage.Content.ReadAsStreamAsync(); + } + + RequestNameValueCollection dictionaryCollection = new RequestNameValueCollection(); + foreach (KeyValuePair> header in requestMessage.Headers) + { + dictionaryCollection.Set(header.Key, string.Join(",", header.Value)); + } + + using DocumentServiceRequest request = new (operationType, resourceType, requestMessage.RequestUri.PathAndQuery, + requestStream, AuthorizationTokenType.PrimaryMasterKey, + dictionaryCollection); + + if (operationType.IsPointOperation()) + { + string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey); + + if (string.IsNullOrEmpty(partitionKey)) + { + throw new InternalServerErrorException(); + } + + string epk = GetEffectivePartitionKeyHash(partitionKey); + + request.Properties = new Dictionary + { + { "x-ms-effective-partition-key", HexStringUtility.HexStringToBytes(epk) } + }; + } + else if (request.Headers[ProxyStartEpk] != null) + { + // Re-add EPK headers removed by RequestInvokerHandler through Properties + request.Properties = new Dictionary + { + { WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyStartEpk]) }, + { WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyEndEpk]) } + }; + + request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString()); + request.Headers.Add(HttpConstants.HttpHeaders.StartEpk, request.Headers[ProxyStartEpk]); + request.Headers.Add(HttpConstants.HttpHeaders.EndEpk, request.Headers[ProxyEndEpk]); + } + + await request.EnsureBufferedBodyAsync(); + + using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest = + Documents.Rntbd.TransportSerialization.BuildRequestForProxy(request, + new ResourceOperation(operationType, resourceType), + activityId, + bufferProvider.Provider, + accountName, + out _, + out _); + + // TODO: consider using the SerializedRequest directly. + MemoryStream memoryStream = new MemoryStream(serializedRequest.RequestSize); + await serializedRequest.CopyToStreamAsync(memoryStream); + memoryStream.Position = 0; + return memoryStream; + } + + public static string GetEffectivePartitionKeyHash(string partitionJson) + { + return Documents.PartitionKey.FromJsonString(partitionJson).InternalKey.GetEffectivePartitionKeyString(HashV2SinglePath); + } + + /// + /// Deserialize the Proxy Response from the RNTBD protocol format to the Http format needed by the caller. + /// Today this takes the HttpResponseMessage and reconstructs the modified Http response. + /// + public static async Task ConvertProxyResponseAsync(HttpResponseMessage responseMessage) + { + using Stream responseStream = await responseMessage.Content.ReadAsStreamAsync(); + + (StatusCodes status, byte[] metadata) = await ThinClientTransportSerializer.ReadHeaderAndMetadataAsync(responseStream); + + if (responseMessage.StatusCode != (HttpStatusCode)status) + { + throw new InternalServerErrorException("Status code mismatch"); + } + + Rntbd.BytesDeserializer bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); + if (!Documents.Rntbd.HeadersTransportSerialization.TryParseMandatoryResponseHeaders(ref bytesDeserializer, out bool payloadPresent, out _)) + { + throw new InternalServerErrorException("Length mismatch"); + } + + MemoryStream bodyStream = null; + if (payloadPresent) + { + int length = await ThinClientTransportSerializer.ReadBodyLengthAsync(responseStream); + bodyStream = new MemoryStream(length); + await responseStream.CopyToAsync(bodyStream); + bodyStream.Position = 0; + } + + // TODO(Perf): Clean this up. + bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); + StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse( + status, + Guid.NewGuid(), + bodyStream, + HttpConstants.Versions.CurrentVersion, + ref bytesDeserializer); + + HttpResponseMessage response = new HttpResponseMessage((HttpStatusCode)storeResponse.StatusCode) + { + RequestMessage = responseMessage.RequestMessage + }; + + if (bodyStream != null) + { + response.Content = new StreamContent(bodyStream); + } + + foreach (string header in storeResponse.Headers.Keys()) + { + if (header == HttpConstants.HttpHeaders.SessionToken) + { + string newSessionToken = storeResponse.PartitionKeyRangeId + ":" + storeResponse.Headers.Get(header); + response.Headers.TryAddWithoutValidation(header, newSessionToken); + } + else + { + response.Headers.TryAddWithoutValidation(header, storeResponse.Headers.Get(header)); + } + } + + response.Headers.TryAddWithoutValidation(RoutedViaProxy, "1"); + return response; + } + + private static async Task<(StatusCodes, byte[] metadata)> ReadHeaderAndMetadataAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(24); + const int headerLength = 24; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = 0; + read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + uint totalLength = BitConverter.ToUInt32(header, 0); + StatusCodes status = (StatusCodes)BitConverter.ToUInt32(header, 4); + + if (totalLength < headerLength) + { + throw new InternalServerErrorException("Length mismatch"); + } + + int metadataLength = (int)totalLength - headerLength; + byte[] metadata = new byte[metadataLength]; + int responseMetadataRead = 0; + while (responseMetadataRead < metadataLength) + { + int read = 0; + read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + responseMetadataRead += read; + } + + return (status, metadata); + } + finally + { + ArrayPool.Shared.Return(header); + } + } + + private static async Task ReadBodyLengthAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(4); + const int headerLength = 4; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = 0; + read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + return BitConverter.ToInt32(header, 0); + } + finally + { + ArrayPool.Shared.Return(header); + } + + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs new file mode 100644 index 0000000000..7940162508 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs @@ -0,0 +1,279 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Buffers; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + + internal static class ThinClientTransportSerializer + { + public const string RoutedViaProxy = "x-ms-thinclient-route-via-proxy"; + public const string ProxyStartEpk = "x-ms-thinclient-range-min"; + public const string ProxyEndEpk = "x-ms-thinclient-range-max"; + + public const string ProxyOperationType = "x-ms-thinclient-proxy-operation-type"; + public const string ProxyResourceType = "x-ms-thinclient-proxy-resource-type"; + + private static readonly PartitionKeyDefinition HashV2SinglePath; + + static ThinClientTransportSerializer() + { + HashV2SinglePath = new PartitionKeyDefinition + { + Kind = PartitionKind.Hash, + Version = Documents.PartitionKeyDefinitionVersion.V2, + }; + HashV2SinglePath.Paths.Add("/id"); + } + + /// + /// Wrapper to expose a public buffer provider for the RNTBD stack. + /// + public sealed class BufferProviderWrapper + { + internal BufferProvider Provider { get; set; } = new(); + } + + /// + /// Serialize the Proxy request to the RNTBD protocol format. + /// Today this takes the HttprequestMessage and reconstructs the DSR. + /// If the SDK can push properties to the HttpRequestMessage then the handler above having + /// the DSR can allow us to push that directly to the serialization. + /// + public static async Task SerializeProxyRequestAsync( + BufferProviderWrapper bufferProvider, + string accountName, + HttpRequestMessage requestMessage) + { + OperationType operationType = Enum.Parse(requestMessage.Headers.GetValues(ProxyOperationType).First()); + ResourceType resourceType = Enum.Parse(requestMessage.Headers.GetValues(ProxyResourceType).First()); + + Guid activityId = Guid.Parse(requestMessage.Headers.GetValues(HttpConstants.HttpHeaders.ActivityId).First()); + + Stream requestStream = null; + if (requestMessage.Content != null) + { + requestStream = await requestMessage.Content.ReadAsStreamAsync(); + } + + RequestNameValueCollection dictionaryCollection = new(); + foreach (KeyValuePair> header in requestMessage.Headers) + { + dictionaryCollection.Set(header.Key, string.Join(",", header.Value)); + } + + using DocumentServiceRequest request = new( + operationType, + resourceType, + requestMessage.RequestUri.PathAndQuery, + requestStream, + AuthorizationTokenType.PrimaryMasterKey, + dictionaryCollection); + + if (operationType.IsPointOperation()) + { + string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey); + + if (string.IsNullOrEmpty(partitionKey)) + { + throw new InternalServerErrorException("Partition key is missing or empty."); + } + + string epk = GetEffectivePartitionKeyHash(partitionKey); + + request.Properties = new Dictionary + { + { "x-ms-effective-partition-key", HexStringUtility.HexStringToBytes(epk) } + }; + } + else if (request.Headers[ProxyStartEpk] != null) + { + // Re-add EPK headers removed by RequestInvokerHandler through Properties + request.Properties = new Dictionary + { + { WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyStartEpk]) }, + { WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyEndEpk]) } + }; + + request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString()); + request.Headers.Add(HttpConstants.HttpHeaders.StartEpk, request.Headers[ProxyStartEpk]); + request.Headers.Add(HttpConstants.HttpHeaders.EndEpk, request.Headers[ProxyEndEpk]); + } + + await request.EnsureBufferedBodyAsync(); + + using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest = + Documents.Rntbd.TransportSerialization.BuildRequestForProxy( + request, + new ResourceOperation(operationType, resourceType), + activityId, + bufferProvider.Provider, + accountName, + out _, + out _); + + MemoryStream memoryStream = new(serializedRequest.RequestSize); + await serializedRequest.CopyToStreamAsync(memoryStream); + memoryStream.Position = 0; + return memoryStream; + } + + public static string GetEffectivePartitionKeyHash(string partitionJson) + { + return Documents.PartitionKey.FromJsonString(partitionJson).InternalKey.GetEffectivePartitionKeyString(HashV2SinglePath); + } + + /// + /// Deserialize the Proxy Response from the RNTBD protocol format to the Http format needed by the caller. + /// Today this takes the HttpResponseMessage and reconstructs the modified Http response. + /// + public static async Task ConvertProxyResponseAsync(HttpResponseMessage responseMessage) + { + using Stream responseStream = await responseMessage.Content.ReadAsStreamAsync(); + + (StatusCodes status, byte[] metadata) = await ReadHeaderAndMetadataAsync(responseStream); + + if (responseMessage.StatusCode != (HttpStatusCode)status) + { + throw new InternalServerErrorException("Status code mismatch"); + } + + Rntbd.BytesDeserializer bytesDeserializer = new(metadata, metadata.Length); + if (!Documents.Rntbd.HeadersTransportSerialization.TryParseMandatoryResponseHeaders(ref bytesDeserializer, out bool payloadPresent, out _)) + { + throw new InternalServerErrorException("Length mismatch"); + } + + MemoryStream bodyStream = null; + if (payloadPresent) + { + int length = await ReadBodyLengthAsync(responseStream); + bodyStream = new MemoryStream(length); + await responseStream.CopyToAsync(bodyStream); + bodyStream.Position = 0; + } + + // TODO: Clean this up. + bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); + StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse( + status, + Guid.NewGuid(), + bodyStream, + HttpConstants.Versions.CurrentVersion, + ref bytesDeserializer); + + HttpResponseMessage response = new((HttpStatusCode)storeResponse.StatusCode) + { + RequestMessage = responseMessage.RequestMessage + }; + + if (bodyStream != null) + { + response.Content = new StreamContent(bodyStream); + } + + foreach (string header in storeResponse.Headers.Keys()) + { + if (header == HttpConstants.HttpHeaders.SessionToken) + { + string newSessionToken = $"{storeResponse.PartitionKeyRangeId}:{storeResponse.Headers.Get(header)}"; + response.Headers.TryAddWithoutValidation(header, newSessionToken); + } + else + { + response.Headers.TryAddWithoutValidation(header, storeResponse.Headers.Get(header)); + } + } + + response.Headers.TryAddWithoutValidation(RoutedViaProxy, "1"); + return response; + } + + private static async Task<(StatusCodes, byte[] metadata)> ReadHeaderAndMetadataAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(24); + const int headerLength = 24; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected end of stream while reading header bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + uint totalLength = BitConverter.ToUInt32(header, 0); + StatusCodes status = (StatusCodes)BitConverter.ToUInt32(header, 4); + + if (totalLength < headerLength) + { + throw new InternalServerErrorException("Header length mismatch"); + } + + int metadataLength = (int)totalLength - headerLength; + byte[] metadata = new byte[metadataLength]; + int responseMetadataRead = 0; + while (responseMetadataRead < metadataLength) + { + int read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected end of stream while reading metadata bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + responseMetadataRead += read; + } + + return (status, metadata); + } + finally + { + ArrayPool.Shared.Return(header); + } + } + + private static async Task ReadBodyLengthAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(4); + const int headerLength = 4; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected end of stream while reading body length", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + return BitConverter.ToInt32(header, 0); + } + finally + { + ArrayPool.Shared.Return(header); + } + } + } +} \ No newline at end of file From 7a12b1a0a1b52c60bc5050be9f4f328bffbd36e4 Mon Sep 17 00:00:00 2001 From: "Arooshi Avasthy (from Dev Box)" Date: Fri, 9 Aug 2024 12:52:07 -0700 Subject: [PATCH 2/3] Transport serializer tests --- .../src/Handler/HandlerConstants.cs | 9 +- .../src/ThinClientTransportSerializer.cs | 558 +++++++++--------- .../ThinClientTransportSerializerTests.cs | 352 ++++------- 3 files changed, 406 insertions(+), 513 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Handler/HandlerConstants.cs b/Microsoft.Azure.Cosmos/src/Handler/HandlerConstants.cs index 8003ab7ad9..70c083c4a1 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/HandlerConstants.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/HandlerConstants.cs @@ -7,6 +7,13 @@ internal static class HandlerConstants { public const string StartEpkString = "x-ms-start-epk-string"; public const string EndEpkString = "x-ms-end-epk-string"; - public const string ResourceUri = "x-ms-resource-uri"; + public const string ResourceUri = "x-ms-resource-uri"; + + public const string RoutedViaProxy = "x-ms-thinclient-route-via-proxy"; + public const string ProxyStartEpk = "x-ms-thinclient-range-min"; + public const string ProxyEndEpk = "x-ms-thinclient-range-max"; + + public const string ProxyOperationType = "x-ms-thinclient-proxy-operation-type"; + public const string ProxyResourceType = "x-ms-thinclient-proxy-resource-type"; } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs index e354938347..95a4170e85 100644 --- a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs +++ b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs @@ -1,285 +1,279 @@ -//------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -//------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos -{ - using System; - using System.Buffers; - using System.Collections.Generic; - using System.IO; - using System.Linq; - using System.Net; - using System.Net.Http; - using System.Threading.Tasks; - using Microsoft.Azure.Documents; - using Microsoft.Azure.Documents.Collections; - - /// - /// blabla - /// - internal static class ThinClientTransportSerializer - { - public const string RoutedViaProxy = "x-ms-thinclient-route-via-proxy"; - public const string ProxyStartEpk = "x-ms-thinclient-range-min"; - public const string ProxyEndEpk = "x-ms-thinclient-range-max"; - - public const string ProxyOperationType = "x-ms-thinclient-proxy-operation-type"; - public const string ProxyResourceType = "x-ms-thinclient-proxy-resource-type"; - - private static readonly PartitionKeyDefinition HashV2SinglePath; - - static ThinClientTransportSerializer() - { - HashV2SinglePath = new PartitionKeyDefinition - { - Kind = PartitionKind.Hash, - Version = Documents.PartitionKeyDefinitionVersion.V2, - }; - HashV2SinglePath.Paths.Add("/id"); - } - - /// - /// Wrapper to expose a public bufferprovider for the RNTBD stack. - /// -#pragma warning disable CA1034 // Nested types should not be visible - public sealed class BufferProviderWrapper -#pragma warning restore CA1034 // Nested types should not be visible - { - internal BufferProvider Provider { get; set; } = new (); - } - - /// - /// Serialize the Proxy request to the RNTBD protocol format. - /// Today this takes the HttprequestMessage and reconstructs the DSR. - /// If the SDK can push properties to the HttpRequestMessage then the handler above having - /// the DSR can allow us to push that directly to the serialization. - /// - public static async Task SerializeProxyRequestAsync( - BufferProviderWrapper bufferProvider, - string accountName, - HttpRequestMessage requestMessage) - { - // Skip this and use the original DSR. - OperationType operationType = (OperationType)Enum.Parse(typeof(OperationType), requestMessage.Headers.GetValues(ProxyOperationType).First()); - ResourceType resourceType = (ResourceType)Enum.Parse(typeof(ResourceType), requestMessage.Headers.GetValues(ProxyResourceType).First()); - - Guid activityId = Guid.Parse(requestMessage.Headers.GetValues(HttpConstants.HttpHeaders.ActivityId).First()); - - Stream requestStream = null; - if (requestMessage.Content != null) - { - requestStream = await requestMessage.Content.ReadAsStreamAsync(); - } - - RequestNameValueCollection dictionaryCollection = new RequestNameValueCollection(); - foreach (KeyValuePair> header in requestMessage.Headers) - { - dictionaryCollection.Set(header.Key, string.Join(",", header.Value)); - } - - using DocumentServiceRequest request = new (operationType, resourceType, requestMessage.RequestUri.PathAndQuery, - requestStream, AuthorizationTokenType.PrimaryMasterKey, - dictionaryCollection); - - if (operationType.IsPointOperation()) - { - string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey); - - if (string.IsNullOrEmpty(partitionKey)) - { - throw new InternalServerErrorException(); - } - - string epk = GetEffectivePartitionKeyHash(partitionKey); - - request.Properties = new Dictionary - { - { "x-ms-effective-partition-key", HexStringUtility.HexStringToBytes(epk) } - }; - } - else if (request.Headers[ProxyStartEpk] != null) - { - // Re-add EPK headers removed by RequestInvokerHandler through Properties - request.Properties = new Dictionary - { - { WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyStartEpk]) }, - { WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyEndEpk]) } - }; - - request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString()); - request.Headers.Add(HttpConstants.HttpHeaders.StartEpk, request.Headers[ProxyStartEpk]); - request.Headers.Add(HttpConstants.HttpHeaders.EndEpk, request.Headers[ProxyEndEpk]); - } - - await request.EnsureBufferedBodyAsync(); - - using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest = - Documents.Rntbd.TransportSerialization.BuildRequestForProxy(request, - new ResourceOperation(operationType, resourceType), - activityId, - bufferProvider.Provider, - accountName, - out _, - out _); - - // TODO: consider using the SerializedRequest directly. - MemoryStream memoryStream = new MemoryStream(serializedRequest.RequestSize); - await serializedRequest.CopyToStreamAsync(memoryStream); - memoryStream.Position = 0; - return memoryStream; - } - - public static string GetEffectivePartitionKeyHash(string partitionJson) - { - return Documents.PartitionKey.FromJsonString(partitionJson).InternalKey.GetEffectivePartitionKeyString(HashV2SinglePath); - } - - /// - /// Deserialize the Proxy Response from the RNTBD protocol format to the Http format needed by the caller. - /// Today this takes the HttpResponseMessage and reconstructs the modified Http response. - /// - public static async Task ConvertProxyResponseAsync(HttpResponseMessage responseMessage) - { - using Stream responseStream = await responseMessage.Content.ReadAsStreamAsync(); - - (StatusCodes status, byte[] metadata) = await ThinClientTransportSerializer.ReadHeaderAndMetadataAsync(responseStream); - - if (responseMessage.StatusCode != (HttpStatusCode)status) - { - throw new InternalServerErrorException("Status code mismatch"); - } - - Rntbd.BytesDeserializer bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); - if (!Documents.Rntbd.HeadersTransportSerialization.TryParseMandatoryResponseHeaders(ref bytesDeserializer, out bool payloadPresent, out _)) - { - throw new InternalServerErrorException("Length mismatch"); - } - - MemoryStream bodyStream = null; - if (payloadPresent) - { - int length = await ThinClientTransportSerializer.ReadBodyLengthAsync(responseStream); - bodyStream = new MemoryStream(length); - await responseStream.CopyToAsync(bodyStream); - bodyStream.Position = 0; - } - - // TODO(Perf): Clean this up. - bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); - StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse( - status, - Guid.NewGuid(), - bodyStream, - HttpConstants.Versions.CurrentVersion, - ref bytesDeserializer); - - HttpResponseMessage response = new HttpResponseMessage((HttpStatusCode)storeResponse.StatusCode) +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Buffers; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + + /// + /// The ThinClientTransportSerializer class provides methods for serializing and deserializing proxy requests and responses + /// to and from the RNTBD (Remote Network Transport Binary Data) protocol format. This class is used internally within the + /// Azure Cosmos DB SDK to handle communication with the backend services. + /// + internal static class ThinClientTransportSerializer + { + private static readonly PartitionKeyDefinition HashV2SinglePath; + + static ThinClientTransportSerializer() + { + HashV2SinglePath = new PartitionKeyDefinition + { + Kind = PartitionKind.Hash, + Version = Documents.PartitionKeyDefinitionVersion.V2, + }; + HashV2SinglePath.Paths.Add("/id"); + } + + /// + /// Wrapper to expose a public bufferprovider for the RNTBD stack. + /// + public sealed class BufferProviderWrapper + { + internal BufferProvider Provider { get; set; } = new (); + } + + /// + /// Serialize the Proxy request to the RNTBD protocol format. + /// Today this takes the HttprequestMessage and reconstructs the DSR. + /// If the SDK can push properties to the HttpRequestMessage then the handler above having + /// the DSR can allow us to push that directly to the serialization. + /// + public static async Task SerializeProxyRequestAsync( + BufferProviderWrapper bufferProvider, + string accountName, + HttpRequestMessage requestMessage) + { + // Skip this and use the original DSR. + OperationType operationType = (OperationType)Enum.Parse(typeof(OperationType), requestMessage.Headers.GetValues(HandlerConstants.ProxyOperationType).First()); + ResourceType resourceType = (ResourceType)Enum.Parse(typeof(ResourceType), requestMessage.Headers.GetValues(HandlerConstants.ProxyResourceType).First()); + + Guid activityId = Guid.Parse(requestMessage.Headers.GetValues(HttpConstants.HttpHeaders.ActivityId).First()); + + Stream requestStream = null; + if (requestMessage.Content != null) + { + requestStream = await requestMessage.Content.ReadAsStreamAsync(); + } + + RequestNameValueCollection dictionaryCollection = new (); + foreach (KeyValuePair> header in requestMessage.Headers) + { + dictionaryCollection.Set(header.Key, string.Join(",", header.Value)); + } + + using DocumentServiceRequest request = new ( + operationType, + resourceType, + requestMessage.RequestUri.PathAndQuery, + requestStream, + AuthorizationTokenType.PrimaryMasterKey, + dictionaryCollection); + + if (operationType.IsPointOperation()) + { + string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey); + + if (string.IsNullOrEmpty(partitionKey)) + { + throw new InternalServerErrorException("Partition key is missing or empty."); + } + + string epk = GetEffectivePartitionKeyHash(partitionKey); + + request.Properties = new Dictionary + { + { "x-ms-effective-partition-key", HexStringUtility.HexStringToBytes(epk) } + }; + } + else if (request.Headers[HandlerConstants.ProxyStartEpk] != null) + { + // Re-add EPK headers removed by RequestInvokerHandler through Properties + request.Properties = new Dictionary + { + { WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[HandlerConstants.ProxyStartEpk]) }, + { WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[HandlerConstants.ProxyEndEpk]) } + }; + + request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString()); + request.Headers.Add(HttpConstants.HttpHeaders.StartEpk, request.Headers[HandlerConstants.ProxyStartEpk]); + request.Headers.Add(HttpConstants.HttpHeaders.EndEpk, request.Headers[HandlerConstants.ProxyEndEpk]); + } + + await request.EnsureBufferedBodyAsync(); + + using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest = + Documents.Rntbd.TransportSerialization.BuildRequestForProxy( + request, + new ResourceOperation(operationType, resourceType), + activityId, + bufferProvider.Provider, + accountName, + out _, + out _); + + // TODO: consider using the SerializedRequest directly. + MemoryStream memoryStream = new MemoryStream(serializedRequest.RequestSize); + await serializedRequest.CopyToStreamAsync(memoryStream); + memoryStream.Position = 0; + return memoryStream; + } + + public static string GetEffectivePartitionKeyHash(string partitionJson) + { + return Documents.PartitionKey.FromJsonString(partitionJson).InternalKey.GetEffectivePartitionKeyString(HashV2SinglePath); + } + + /// + /// Deserialize the Proxy Response from the RNTBD protocol format to the Http format needed by the caller. + /// Today this takes the HttpResponseMessage and reconstructs the modified Http response. + /// + public static async Task ConvertProxyResponseAsync(HttpResponseMessage responseMessage) + { + using Stream responseStream = await responseMessage.Content.ReadAsStreamAsync(); + + (StatusCodes status, byte[] metadata) = await ReadHeaderAndMetadataAsync(responseStream); + + if (responseMessage.StatusCode != (HttpStatusCode)status) + { + throw new InternalServerErrorException("Status code mismatch"); + } + + Rntbd.BytesDeserializer bytesDeserializer = new (metadata, metadata.Length); + if (!Documents.Rntbd.HeadersTransportSerialization.TryParseMandatoryResponseHeaders(ref bytesDeserializer, out bool payloadPresent, out _)) + { + throw new InternalServerErrorException("Length mismatch"); + } + + MemoryStream bodyStream = null; + if (payloadPresent) + { + int length = await ReadBodyLengthAsync(responseStream); + bodyStream = new MemoryStream(length); + await responseStream.CopyToAsync(bodyStream); + bodyStream.Position = 0; + } + + // TODO: Clean this up. + bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); + StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse( + status, + Guid.NewGuid(), + bodyStream, + HttpConstants.Versions.CurrentVersion, + ref bytesDeserializer); + + HttpResponseMessage response = new ((HttpStatusCode)storeResponse.StatusCode) { RequestMessage = responseMessage.RequestMessage }; - - if (bodyStream != null) - { - response.Content = new StreamContent(bodyStream); - } - - foreach (string header in storeResponse.Headers.Keys()) - { - if (header == HttpConstants.HttpHeaders.SessionToken) - { - string newSessionToken = storeResponse.PartitionKeyRangeId + ":" + storeResponse.Headers.Get(header); - response.Headers.TryAddWithoutValidation(header, newSessionToken); - } - else - { - response.Headers.TryAddWithoutValidation(header, storeResponse.Headers.Get(header)); - } - } - - response.Headers.TryAddWithoutValidation(RoutedViaProxy, "1"); - return response; - } - - private static async Task<(StatusCodes, byte[] metadata)> ReadHeaderAndMetadataAsync(Stream stream) - { - byte[] header = ArrayPool.Shared.Rent(24); - const int headerLength = 24; - try - { - int headerRead = 0; - while (headerRead < headerLength) - { - int read = 0; - read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); - - if (read == 0) - { - throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); - } - - headerRead += read; - } - - uint totalLength = BitConverter.ToUInt32(header, 0); - StatusCodes status = (StatusCodes)BitConverter.ToUInt32(header, 4); - - if (totalLength < headerLength) - { - throw new InternalServerErrorException("Length mismatch"); - } - - int metadataLength = (int)totalLength - headerLength; - byte[] metadata = new byte[metadataLength]; - int responseMetadataRead = 0; - while (responseMetadataRead < metadataLength) - { - int read = 0; - read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead); - - if (read == 0) - { - throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); - } - - responseMetadataRead += read; - } - - return (status, metadata); - } - finally - { - ArrayPool.Shared.Return(header); - } - } - - private static async Task ReadBodyLengthAsync(Stream stream) - { - byte[] header = ArrayPool.Shared.Rent(4); - const int headerLength = 4; - try - { - int headerRead = 0; - while (headerRead < headerLength) - { - int read = 0; - read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); - - if (read == 0) - { - throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); - } - - headerRead += read; - } - - return BitConverter.ToInt32(header, 0); - } - finally - { - ArrayPool.Shared.Return(header); - } - - } - } -} + + if (bodyStream != null) + { + response.Content = new StreamContent(bodyStream); + } + + foreach (string header in storeResponse.Headers.Keys()) + { + if (header == HttpConstants.HttpHeaders.SessionToken) + { + string newSessionToken = $"{storeResponse.PartitionKeyRangeId}:{storeResponse.Headers.Get(header)}"; + response.Headers.TryAddWithoutValidation(header, newSessionToken); + } + else + { + response.Headers.TryAddWithoutValidation(header, storeResponse.Headers.Get(header)); + } + } + + response.Headers.TryAddWithoutValidation(HandlerConstants.RoutedViaProxy, "1"); + return response; + } + + private static async Task<(StatusCodes, byte[] metadata)> ReadHeaderAndMetadataAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(24); + const int headerLength = 24; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected end of stream while reading header bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + uint totalLength = BitConverter.ToUInt32(header, 0); + StatusCodes status = (StatusCodes)BitConverter.ToUInt32(header, 4); + + if (totalLength < headerLength) + { + throw new InternalServerErrorException("Header length mismatch"); + } + + int metadataLength = (int)totalLength - headerLength; + byte[] metadata = new byte[metadataLength]; + int responseMetadataRead = 0; + while (responseMetadataRead < metadataLength) + { + int read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected end of stream while reading metadata bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + responseMetadataRead += read; + } + + return (status, metadata); + } + finally + { + ArrayPool.Shared.Return(header); + } + } + + private static async Task ReadBodyLengthAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(4); + const int headerLength = 4; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected end of stream while reading body length", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + return BitConverter.ToInt32(header, 0); + } + finally + { + ArrayPool.Shared.Return(header); + } + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs index 7940162508..35f0eec695 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs @@ -2,277 +2,169 @@ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos +namespace Microsoft.Azure.Cosmos.Tests { + using Moq; using System; - using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Collections; + using global::Microsoft.VisualStudio.TestTools.UnitTesting; - internal static class ThinClientTransportSerializer + namespace Microsoft.Azure.Cosmos.Tests { - public const string RoutedViaProxy = "x-ms-thinclient-route-via-proxy"; - public const string ProxyStartEpk = "x-ms-thinclient-range-min"; - public const string ProxyEndEpk = "x-ms-thinclient-range-max"; - - public const string ProxyOperationType = "x-ms-thinclient-proxy-operation-type"; - public const string ProxyResourceType = "x-ms-thinclient-proxy-resource-type"; - - private static readonly PartitionKeyDefinition HashV2SinglePath; - - static ThinClientTransportSerializer() - { - HashV2SinglePath = new PartitionKeyDefinition - { - Kind = PartitionKind.Hash, - Version = Documents.PartitionKeyDefinitionVersion.V2, - }; - HashV2SinglePath.Paths.Add("/id"); - } - - /// - /// Wrapper to expose a public buffer provider for the RNTBD stack. - /// - public sealed class BufferProviderWrapper - { - internal BufferProvider Provider { get; set; } = new(); - } - - /// - /// Serialize the Proxy request to the RNTBD protocol format. - /// Today this takes the HttprequestMessage and reconstructs the DSR. - /// If the SDK can push properties to the HttpRequestMessage then the handler above having - /// the DSR can allow us to push that directly to the serialization. - /// - public static async Task SerializeProxyRequestAsync( - BufferProviderWrapper bufferProvider, - string accountName, - HttpRequestMessage requestMessage) + [TestClass] + public class ThinClientTransportSerializerTests { - OperationType operationType = Enum.Parse(requestMessage.Headers.GetValues(ProxyOperationType).First()); - ResourceType resourceType = Enum.Parse(requestMessage.Headers.GetValues(ProxyResourceType).First()); + private readonly Mock mockBufferProviderWrapper; + private readonly string testAccountName = "testAccount"; + private readonly Uri testUri = new Uri("http://localhost/dbs/db1/colls/coll1/docs/doc1"); - Guid activityId = Guid.Parse(requestMessage.Headers.GetValues(HttpConstants.HttpHeaders.ActivityId).First()); - - Stream requestStream = null; - if (requestMessage.Content != null) + public ThinClientTransportSerializerTests() { - requestStream = await requestMessage.Content.ReadAsStreamAsync(); + this.mockBufferProviderWrapper = new Mock(); } - RequestNameValueCollection dictionaryCollection = new(); - foreach (KeyValuePair> header in requestMessage.Headers) - { - dictionaryCollection.Set(header.Key, string.Join(",", header.Value)); + [TestMethod] + public async Task SerializeProxyRequestAsync_ShouldSerializeRequest() + { + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Read"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + + // Act + Stream result = await ThinClientTransportSerializer.SerializeProxyRequestAsync( + this.mockBufferProviderWrapper.Object, + this.testAccountName, + requestMessage); + + // Assert + Assert.IsNotNull(result); + Assert.IsInstanceOfType(result, typeof(Stream)); } - using DocumentServiceRequest request = new( - operationType, - resourceType, - requestMessage.RequestUri.PathAndQuery, - requestStream, - AuthorizationTokenType.PrimaryMasterKey, - dictionaryCollection); - - if (operationType.IsPointOperation()) - { - string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey); - - if (string.IsNullOrEmpty(partitionKey)) - { - throw new InternalServerErrorException("Partition key is missing or empty."); - } - - string epk = GetEffectivePartitionKeyHash(partitionKey); - - request.Properties = new Dictionary - { - { "x-ms-effective-partition-key", HexStringUtility.HexStringToBytes(epk) } - }; + [TestMethod] + public async Task SerializeProxyRequestAsync_ThrowsException_WhenPartitionKeyIsMissing() + { + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Read"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + + // Act & Assert + await Assert.ThrowsExceptionAsync(() => + ThinClientTransportSerializer.SerializeProxyRequestAsync( + this.mockBufferProviderWrapper.Object, + this.testAccountName, + requestMessage)); } - else if (request.Headers[ProxyStartEpk] != null) - { - // Re-add EPK headers removed by RequestInvokerHandler through Properties - request.Properties = new Dictionary - { - { WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyStartEpk]) }, - { WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyEndEpk]) } - }; - request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString()); - request.Headers.Add(HttpConstants.HttpHeaders.StartEpk, request.Headers[ProxyStartEpk]); - request.Headers.Add(HttpConstants.HttpHeaders.EndEpk, request.Headers[ProxyEndEpk]); + [TestMethod] + public async Task SerializeProxyRequestAsync_InvalidOperationType_ThrowsException() + { + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "InvalidOperation"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + + // Act & Assert + await Assert.ThrowsExceptionAsync(() => + ThinClientTransportSerializer.SerializeProxyRequestAsync( + this.mockBufferProviderWrapper.Object, + this.testAccountName, + requestMessage)); } - await request.EnsureBufferedBodyAsync(); - - using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest = - Documents.Rntbd.TransportSerialization.BuildRequestForProxy( - request, - new ResourceOperation(operationType, resourceType), - activityId, - bufferProvider.Provider, - accountName, - out _, - out _); - - MemoryStream memoryStream = new(serializedRequest.RequestSize); - await serializedRequest.CopyToStreamAsync(memoryStream); - memoryStream.Position = 0; - return memoryStream; - } - - public static string GetEffectivePartitionKeyHash(string partitionJson) - { - return Documents.PartitionKey.FromJsonString(partitionJson).InternalKey.GetEffectivePartitionKeyString(HashV2SinglePath); - } - - /// - /// Deserialize the Proxy Response from the RNTBD protocol format to the Http format needed by the caller. - /// Today this takes the HttpResponseMessage and reconstructs the modified Http response. - /// - public static async Task ConvertProxyResponseAsync(HttpResponseMessage responseMessage) - { - using Stream responseStream = await responseMessage.Content.ReadAsStreamAsync(); - - (StatusCodes status, byte[] metadata) = await ReadHeaderAndMetadataAsync(responseStream); - - if (responseMessage.StatusCode != (HttpStatusCode)status) + [TestMethod] + public async Task SerializeProxyRequestAsync_WithRequestBody_ShouldSerializeRequest() { - throw new InternalServerErrorException("Status code mismatch"); - } - - Rntbd.BytesDeserializer bytesDeserializer = new(metadata, metadata.Length); - if (!Documents.Rntbd.HeadersTransportSerialization.TryParseMandatoryResponseHeaders(ref bytesDeserializer, out bool payloadPresent, out _)) - { - throw new InternalServerErrorException("Length mismatch"); + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, this.testUri) + { + Content = new StringContent("{ \"key\": \"value\" }") + }; + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Create"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + + // Act + Stream result = await ThinClientTransportSerializer.SerializeProxyRequestAsync( + this.mockBufferProviderWrapper.Object, + this.testAccountName, + requestMessage); + + // Assert + Assert.IsNotNull(result); + Assert.IsInstanceOfType(result, typeof(Stream)); + Assert.IsTrue(result.Length > 0); } - MemoryStream bodyStream = null; - if (payloadPresent) + [TestMethod] + public async Task ConvertProxyResponseAsync_WithPayload_ShouldConvertResponse() { - int length = await ReadBodyLengthAsync(responseStream); - bodyStream = new MemoryStream(length); - await responseStream.CopyToAsync(bodyStream); - bodyStream.Position = 0; - } + // Arrange + MemoryStream content = new MemoryStream(); + StreamWriter writer = new StreamWriter(content); + await writer.WriteAsync("payload content"); + await writer.FlushAsync(); + content.Position = 0; - // TODO: Clean this up. - bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); - StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse( - status, - Guid.NewGuid(), - bodyStream, - HttpConstants.Versions.CurrentVersion, - ref bytesDeserializer); - - HttpResponseMessage response = new((HttpStatusCode)storeResponse.StatusCode) - { - RequestMessage = responseMessage.RequestMessage - }; + HttpResponseMessage responseMessage = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StreamContent(content) + }; - if (bodyStream != null) - { - response.Content = new StreamContent(bodyStream); - } + // Act + HttpResponseMessage result = await ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage); - foreach (string header in storeResponse.Headers.Keys()) - { - if (header == HttpConstants.HttpHeaders.SessionToken) - { - string newSessionToken = $"{storeResponse.PartitionKeyRangeId}:{storeResponse.Headers.Get(header)}"; - response.Headers.TryAddWithoutValidation(header, newSessionToken); - } - else - { - response.Headers.TryAddWithoutValidation(header, storeResponse.Headers.Get(header)); - } + // Assert + Assert.IsNotNull(result); + Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); + Assert.AreEqual("payload content", await result.Content.ReadAsStringAsync()); } - response.Headers.TryAddWithoutValidation(RoutedViaProxy, "1"); - return response; - } - - private static async Task<(StatusCodes, byte[] metadata)> ReadHeaderAndMetadataAsync(Stream stream) - { - byte[] header = ArrayPool.Shared.Rent(24); - const int headerLength = 24; - try + [TestMethod] + public async Task ConvertProxyResponseAsync_ShouldConvertResponse() { - int headerRead = 0; - while (headerRead < headerLength) + // Arrange + HttpResponseMessage responseMessage = new HttpResponseMessage(HttpStatusCode.OK) { - int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); - - if (read == 0) - { - throw new DocumentClientException("Unexpected end of stream while reading header bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); - } + Content = new StreamContent(new MemoryStream()) + }; - headerRead += read; - } + // Act + HttpResponseMessage result = await ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage); - uint totalLength = BitConverter.ToUInt32(header, 0); - StatusCodes status = (StatusCodes)BitConverter.ToUInt32(header, 4); - - if (totalLength < headerLength) - { - throw new InternalServerErrorException("Header length mismatch"); - } - - int metadataLength = (int)totalLength - headerLength; - byte[] metadata = new byte[metadataLength]; - int responseMetadataRead = 0; - while (responseMetadataRead < metadataLength) - { - int read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead); - - if (read == 0) - { - throw new DocumentClientException("Unexpected end of stream while reading metadata bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); - } - - responseMetadataRead += read; - } - - return (status, metadata); + // Assert + Assert.IsNotNull(result); + Assert.AreEqual(responseMessage.StatusCode, result.StatusCode); } - finally - { - ArrayPool.Shared.Return(header); - } - } - private static async Task ReadBodyLengthAsync(Stream stream) - { - byte[] header = ArrayPool.Shared.Rent(4); - const int headerLength = 4; - try + [TestMethod] + public async Task ConvertProxyResponseAsync_StatusCodeMismatch_ThrowsException() { - int headerRead = 0; - while (headerRead < headerLength) + // Arrange + HttpResponseMessage responseMessage = new HttpResponseMessage(HttpStatusCode.BadRequest) { - int read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); - - if (read == 0) - { - throw new DocumentClientException("Unexpected end of stream while reading body length", HttpStatusCode.Gone, SubStatusCodes.Unknown); - } + Content = new StreamContent(new MemoryStream()) + }; - headerRead += read; - } - - return BitConverter.ToInt32(header, 0); - } - finally - { - ArrayPool.Shared.Return(header); + // Act & Assert + await Assert.ThrowsExceptionAsync(() => + ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage)); } } } From 900d54afcce5f7dc7f4da994425ce46d8ebf8152 Mon Sep 17 00:00:00 2001 From: Arooshi Avasthy Date: Mon, 12 Aug 2024 10:36:59 -0700 Subject: [PATCH 3/3] Updated Tests --- .../src/ThinClientTransportSerializer.cs | 2 +- .../ThinClientTransportSerializerTests.cs | 220 +++++++----------- 2 files changed, 82 insertions(+), 140 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs index 95a4170e85..bff6c91e9b 100644 --- a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs +++ b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs @@ -37,7 +37,7 @@ static ThinClientTransportSerializer() /// /// Wrapper to expose a public bufferprovider for the RNTBD stack. /// - public sealed class BufferProviderWrapper + public class BufferProviderWrapper { internal BufferProvider Provider { get; set; } = new (); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs index 35f0eec695..a111434b2c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ThinClientTransportSerializerTests.cs @@ -15,157 +15,99 @@ namespace Microsoft.Azure.Cosmos.Tests using Microsoft.Azure.Cosmos; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Collections; - using global::Microsoft.VisualStudio.TestTools.UnitTesting; + using Microsoft.VisualStudio.TestTools.UnitTesting; - namespace Microsoft.Azure.Cosmos.Tests + [TestClass] + public class ThinClientTransportSerializerTests { - [TestClass] - public class ThinClientTransportSerializerTests - { - private readonly Mock mockBufferProviderWrapper; - private readonly string testAccountName = "testAccount"; - private readonly Uri testUri = new Uri("http://localhost/dbs/db1/colls/coll1/docs/doc1"); + private readonly Mock mockBufferProviderWrapper; + private readonly string testAccountName = "testAccount"; + private readonly Uri testUri = new Uri("http://localhost/dbs/db1/colls/coll1/docs/doc1"); - public ThinClientTransportSerializerTests() - { - this.mockBufferProviderWrapper = new Mock(); - } + public ThinClientTransportSerializerTests() + { + this.mockBufferProviderWrapper = new Mock(); + } - [TestMethod] - public async Task SerializeProxyRequestAsync_ShouldSerializeRequest() - { - // Arrange - HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); - requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Read"); - requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); - requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); - requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + [TestMethod] + public async Task SerializeProxyRequestAsync_ShouldSerializeRequest() + { + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Read"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + + // Act + Stream result = await ThinClientTransportSerializer.SerializeProxyRequestAsync( + this.mockBufferProviderWrapper.Object, + this.testAccountName, + requestMessage); + + // Assert + Assert.IsNotNull(result); + Assert.IsInstanceOfType(result, typeof(Stream)); + } - // Act - Stream result = await ThinClientTransportSerializer.SerializeProxyRequestAsync( + [TestMethod] + public async Task SerializeProxyRequestAsync_ThrowsException_WhenPartitionKeyIsMissing() + { + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Read"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + + // Act & Assert + await Assert.ThrowsExceptionAsync(() => + ThinClientTransportSerializer.SerializeProxyRequestAsync( this.mockBufferProviderWrapper.Object, this.testAccountName, - requestMessage); - - // Assert - Assert.IsNotNull(result); - Assert.IsInstanceOfType(result, typeof(Stream)); - } - - [TestMethod] - public async Task SerializeProxyRequestAsync_ThrowsException_WhenPartitionKeyIsMissing() - { - // Arrange - HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); - requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Read"); - requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); - requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); - - // Act & Assert - await Assert.ThrowsExceptionAsync(() => - ThinClientTransportSerializer.SerializeProxyRequestAsync( - this.mockBufferProviderWrapper.Object, - this.testAccountName, - requestMessage)); - } - - [TestMethod] - public async Task SerializeProxyRequestAsync_InvalidOperationType_ThrowsException() - { - // Arrange - HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); - requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "InvalidOperation"); - requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); - requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); - requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); - - // Act & Assert - await Assert.ThrowsExceptionAsync(() => - ThinClientTransportSerializer.SerializeProxyRequestAsync( - this.mockBufferProviderWrapper.Object, - this.testAccountName, - requestMessage)); - } - - [TestMethod] - public async Task SerializeProxyRequestAsync_WithRequestBody_ShouldSerializeRequest() - { - // Arrange - HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, this.testUri) - { - Content = new StringContent("{ \"key\": \"value\" }") - }; - requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Create"); - requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); - requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); - requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + requestMessage)); + } - // Act - Stream result = await ThinClientTransportSerializer.SerializeProxyRequestAsync( + [TestMethod] + public async Task SerializeProxyRequestAsync_InvalidOperationType_ThrowsException() + { + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, this.testUri); + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "InvalidOperation"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + + // Act & Assert + await Assert.ThrowsExceptionAsync(() => + ThinClientTransportSerializer.SerializeProxyRequestAsync( this.mockBufferProviderWrapper.Object, this.testAccountName, - requestMessage); - - // Assert - Assert.IsNotNull(result); - Assert.IsInstanceOfType(result, typeof(Stream)); - Assert.IsTrue(result.Length > 0); - } - - [TestMethod] - public async Task ConvertProxyResponseAsync_WithPayload_ShouldConvertResponse() - { - // Arrange - MemoryStream content = new MemoryStream(); - StreamWriter writer = new StreamWriter(content); - await writer.WriteAsync("payload content"); - await writer.FlushAsync(); - content.Position = 0; - - HttpResponseMessage responseMessage = new HttpResponseMessage(HttpStatusCode.OK) - { - Content = new StreamContent(content) - }; - - // Act - HttpResponseMessage result = await ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage); - - // Assert - Assert.IsNotNull(result); - Assert.AreEqual(HttpStatusCode.OK, result.StatusCode); - Assert.AreEqual("payload content", await result.Content.ReadAsStringAsync()); - } - - [TestMethod] - public async Task ConvertProxyResponseAsync_ShouldConvertResponse() - { - // Arrange - HttpResponseMessage responseMessage = new HttpResponseMessage(HttpStatusCode.OK) - { - Content = new StreamContent(new MemoryStream()) - }; - - // Act - HttpResponseMessage result = await ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage); - - // Assert - Assert.IsNotNull(result); - Assert.AreEqual(responseMessage.StatusCode, result.StatusCode); - } + requestMessage)); + } - [TestMethod] - public async Task ConvertProxyResponseAsync_StatusCodeMismatch_ThrowsException() + [TestMethod] + public async Task SerializeProxyRequestAsync_WithRequestBody_ShouldSerializeRequest() + { + // Arrange + HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, this.testUri) { - // Arrange - HttpResponseMessage responseMessage = new HttpResponseMessage(HttpStatusCode.BadRequest) - { - Content = new StreamContent(new MemoryStream()) - }; - - // Act & Assert - await Assert.ThrowsExceptionAsync(() => - ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage)); - } + Content = new StringContent("{ \"key\": \"value\" }") + }; + requestMessage.Headers.Add(HandlerConstants.ProxyOperationType, "Create"); + requestMessage.Headers.Add(HandlerConstants.ProxyResourceType, "Document"); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, Guid.NewGuid().ToString()); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.PartitionKey, "[\"testPartitionKey\"]"); + + // Act + Stream result = await ThinClientTransportSerializer.SerializeProxyRequestAsync( + this.mockBufferProviderWrapper.Object, + this.testAccountName, + requestMessage); + + // Assert + Assert.IsNotNull(result); + Assert.IsInstanceOfType(result, typeof(Stream)); + Assert.IsTrue(result.Length > 0); } } } \ No newline at end of file