Skip to content

Commit

Permalink
Merge pull request #279 from AArnott/fix276
Browse files Browse the repository at this point in the history
JsonMessageFormatter GC pressure fixes
  • Loading branch information
AArnott authored Jun 2, 2019
2 parents 5a6cc9b + 53b04f4 commit 90afe62
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 60 deletions.
48 changes: 34 additions & 14 deletions src/StreamJsonRpc.Tests/JsonMessageFormatterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,33 @@ public void ProtocolVersion_RejectsOtherVersions()
[Fact]
public void EncodingProperty_UsedToFormat()
{
JsonRpcRequest msg = new JsonRpcRequest { Method = "a" };
var builder = new Sequence<byte>();
var formatter = new JsonMessageFormatter();
var msg = new JsonRpcRequest { Method = "a" };

formatter.Encoding = Encoding.ASCII;
formatter.Serialize(builder, msg);
long asciiLength = builder.AsReadOnlySequence.Length;
var readMsg = (JsonRpcRequest)formatter.Deserialize(builder.AsReadOnlySequence);
Assert.Equal(msg.Method, readMsg.Method);
var formatter = new JsonMessageFormatter(Encoding.ASCII);
long asciiLength = MeasureLength(msg, formatter);

builder.Reset();
formatter.Encoding = Encoding.UTF32;
formatter.Serialize(builder, msg);
long utf32Length = builder.AsReadOnlySequence.Length;
readMsg = (JsonRpcRequest)formatter.Deserialize(builder.AsReadOnlySequence);
Assert.Equal(msg.Method, readMsg.Method);
var utf32Length = MeasureLength(msg, formatter);
Assert.Equal(asciiLength * 4, utf32Length - Encoding.UTF32.GetPreamble().Length);
}

[Fact]
public void EncodingPreambleWrittenOnlyOncePerMessage()
{
// Contrive a very long message, designed to exceed any buffer that would be used internally by the formatter.
// The goal here is to result in multiple write operations in order to coerce a second preamble to be written if there were a bug.
var msg = new JsonRpcRequest { Method = new string('a', 16 * 1024) };

Assert.Equal(utf32Length - Encoding.UTF32.GetPreamble().Length, asciiLength * 4);
var formatter = new JsonMessageFormatter(Encoding.ASCII);
long asciiLength = MeasureLength(msg, formatter);

formatter.Encoding = Encoding.UTF32;
var utf32Length = MeasureLength(msg, formatter);
Assert.Equal(asciiLength * 4, utf32Length - Encoding.UTF32.GetPreamble().Length);

// Measure UTF32 again to verify the length doesn't change (and the preamble is thus applied to each message).
var utf32Length2 = MeasureLength(msg, formatter);
Assert.Equal(utf32Length, utf32Length2);
}

[Fact]
Expand All @@ -102,4 +111,15 @@ public void JTokenParserHonorsSettingsOnSerializer()
Assert.IsType<string>(value);
Assert.Equal("2019-01-29T03:37:28.4433841Z", value);
}

private static long MeasureLength(JsonRpcRequest msg, JsonMessageFormatter formatter)
{
var builder = new Sequence<byte>();
formatter.Serialize(builder, msg);
var length = builder.AsReadOnlySequence.Length;
var readMsg = (JsonRpcRequest)formatter.Deserialize(builder.AsReadOnlySequence);
Assert.Equal(msg.Method, readMsg.Method);

return length;
}
}
15 changes: 12 additions & 3 deletions src/StreamJsonRpc/HeaderDelimitedMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public class HeaderDelimitedMessageHandler : PipeMessageHandler
private static readonly byte[] ContentTypeHeaderName = HeaderEncoding.GetBytes(ContentTypeHeaderNameText);
private static readonly byte[] CrlfBytes = HeaderEncoding.GetBytes("\r\n");

/// <summary>
/// The <see cref="IBufferWriter{T}"/> sent to the <see cref="TextFormatter"/> to write the message.
/// </summary>
private readonly Sequence<byte> contentSequenceBuilder = new Sequence<byte>(ArrayPool<byte>.Shared);

/// <summary>
/// Backing field for <see cref="SubType"/>.
/// </summary>
Expand Down Expand Up @@ -222,10 +227,10 @@ unsafe int WriteHeaderText(string value, Span<byte> memory)

cancellationToken.ThrowIfCancellationRequested();
Encoding contentEncoding = this.Encoding;
using (var contentSequenceBuilder = new Sequence<byte>())
try
{
this.Formatter.Serialize(contentSequenceBuilder, content);
ReadOnlySequence<byte> contentSequence = contentSequenceBuilder.AsReadOnlySequence;
this.Formatter.Serialize(this.contentSequenceBuilder, content);
ReadOnlySequence<byte> contentSequence = this.contentSequenceBuilder.AsReadOnlySequence;
Memory<byte> headerMemory = this.Writer.GetMemory(1024);
int bytesWritten = 0;

Expand Down Expand Up @@ -271,6 +276,10 @@ unsafe int WriteHeaderText(string value, Span<byte> memory)
contentSequence.CopyTo(contentMemory.Span);
this.Writer.Advance((int)contentSequence.Length);
}
finally
{
this.contentSequenceBuilder.Reset();
}
}

/// <summary>
Expand Down
71 changes: 61 additions & 10 deletions src/StreamJsonRpc/JsonMessageFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace StreamJsonRpc
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -41,6 +42,31 @@ public class JsonMessageFormatter : IJsonRpcMessageTextFormatter
/// </summary>
private static readonly Encoding DefaultEncoding = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);

/// <summary>
/// The <see cref="char"/> array pool to use for each <see cref="JsonTextReader"/> instance.
/// </summary>
private static readonly IArrayPool<char> JsonCharArrayPool = new JsonArrayPool<char>(ArrayPool<char>.Shared);

/// <summary>
/// An exactly default instance of the <see cref="JsonSerializer"/> to use where no special settings
/// are needed.
/// </summary>
/// <remarks>
/// This is useful when calling such APIs as <see cref="JToken.FromObject(object, JsonSerializer)"/>
/// because <see cref="JToken.FromObject(object)"/> allocates a new serializer with each invocation.
/// </remarks>
private static readonly JsonSerializer DefaultSerializer = JsonSerializer.CreateDefault();

/// <summary>
/// The reusable <see cref="TextWriter"/> to use with newtonsoft.json's serializer.
/// </summary>
private readonly BufferTextWriter bufferTextWriter = new BufferTextWriter();

/// <summary>
/// The reusable <see cref="TextReader"/> to use with newtonsoft.json's deserializer.
/// </summary>
private readonly SequenceTextReader sequenceTextReader = new SequenceTextReader();

/// <summary>
/// The version of the JSON-RPC protocol being emulated by this instance.
/// </summary>
Expand Down Expand Up @@ -189,7 +215,7 @@ public JToken Serialize(JsonRpcMessage message)
// Pre-tokenize the user data so we can use their custom converters for just their data and not for the base message.
this.TokenizeUserData(message);

var json = JToken.FromObject(message);
var json = JToken.FromObject(message, DefaultSerializer);

// Fix up dropped fields that are mandatory
if (message is Protocol.JsonRpcResult && json["result"] == null)
Expand Down Expand Up @@ -262,22 +288,22 @@ private Exception CreateProtocolNonComplianceException(JToken message, string ex

private void WriteJToken(IBufferWriter<byte> contentBuffer, JToken json)
{
using (var streamWriter = new StreamWriter(contentBuffer.AsStream(), this.Encoding, 4096))
this.bufferTextWriter.Initialize(contentBuffer, this.Encoding);
using (var jsonWriter = new JsonTextWriter(this.bufferTextWriter))
{
using (var jsonWriter = new JsonTextWriter(streamWriter))
{
json.WriteTo(jsonWriter);
jsonWriter.Flush();
}
json.WriteTo(jsonWriter);
jsonWriter.Flush();
}
}

private JToken ReadJToken(ReadOnlySequence<byte> contentBuffer, Encoding encoding)
{
Requires.NotNull(encoding, nameof(encoding));

var jsonReader = new JsonTextReader(new StreamReader(contentBuffer.AsStream(), encoding))
this.sequenceTextReader.Initialize(contentBuffer, encoding);
var jsonReader = new JsonTextReader(this.sequenceTextReader)
{
ArrayPool = JsonCharArrayPool,
CloseInput = true,
Culture = this.JsonSerializer.Culture,
DateFormatString = this.JsonSerializer.DateFormatString,
Expand All @@ -286,8 +312,16 @@ private JToken ReadJToken(ReadOnlySequence<byte> contentBuffer, Encoding encodin
FloatParseHandling = this.JsonSerializer.FloatParseHandling,
MaxDepth = this.JsonSerializer.MaxDepth,
};
JToken json = JToken.ReadFrom(jsonReader);
return json;
try
{
JToken json = JToken.ReadFrom(jsonReader);
return json;
}
finally
{
// Return rented arrays
jsonReader.Close();
}
}

/// <summary>
Expand Down Expand Up @@ -467,5 +501,22 @@ public override T GetResult<T>()
return result.ToObject<T>(this.jsonSerializer);
}
}

/// <summary>
/// Adapts the .NET <see cref="ArrayPool{T}" /> to Newtonsoft.Json's <see cref="IArrayPool{T}" /> interface.
/// </summary>
private class JsonArrayPool<T> : IArrayPool<T>
{
private readonly ArrayPool<T> arrayPool;

internal JsonArrayPool(ArrayPool<T> arrayPool)
{
this.arrayPool = arrayPool ?? throw new ArgumentNullException(nameof(arrayPool));
}

public T[] Rent(int minimumLength) => this.arrayPool.Rent(minimumLength);

public void Return(T[] array) => this.arrayPool.Return(array);
}
}
}
2 changes: 1 addition & 1 deletion src/StreamJsonRpc/JsonRpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ private JsonRpcError CreateError(JsonRpcRequest request, Exception exception)
};
}

private async Task<JsonRpcMessage> DispatchIncomingRequestAsync(JsonRpcRequest request)
private async ValueTask<JsonRpcMessage> DispatchIncomingRequestAsync(JsonRpcRequest request)
{
Requires.NotNull(request, nameof(request));

Expand Down
56 changes: 25 additions & 31 deletions src/StreamJsonRpc/MessageHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,22 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);

using (var cts = CancellationTokenSource.CreateLinkedTokenSource(this.DisposalToken, cancellationToken))
try
{
try
{
JsonRpcMessage result = await this.ReadCoreAsync(cts.Token).ConfigureAwait(false);
return result;
}
catch (InvalidOperationException ex) when (cancellationToken.IsCancellationRequested)
{
// PipeReader.ReadAsync can throw InvalidOperationException in a race where PipeReader.Complete() has been
// called but we haven't noticed the CancellationToken was canceled yet.
throw new OperationCanceledException("Reading failed during cancellation.", ex, cancellationToken);
}
catch (ObjectDisposedException)
{
// If already canceled, throw that instead of ObjectDisposedException.
cancellationToken.ThrowIfCancellationRequested();
throw;
}
JsonRpcMessage result = await this.ReadCoreAsync(cancellationToken).ConfigureAwait(false);
return result;
}
catch (InvalidOperationException ex) when (cancellationToken.IsCancellationRequested)
{
// PipeReader.ReadAsync can throw InvalidOperationException in a race where PipeReader.Complete() has been
// called but we haven't noticed the CancellationToken was canceled yet.
throw new OperationCanceledException("Reading failed during cancellation.", ex, cancellationToken);
}
catch (ObjectDisposedException)
{
// If already canceled, throw that instead of ObjectDisposedException.
cancellationToken.ThrowIfCancellationRequested();
throw;
}
}

Expand All @@ -127,24 +124,21 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);

using (var cts = CancellationTokenSource.CreateLinkedTokenSource(this.DisposalToken, cancellationToken))
try
{
try
using (await this.sendingSemaphore.EnterAsync(cancellationToken).ConfigureAwait(false))
{
using (await this.sendingSemaphore.EnterAsync(cts.Token).ConfigureAwait(false))
{
cts.Token.ThrowIfCancellationRequested();
await this.WriteCoreAsync(content, cts.Token).ConfigureAwait(false);
await this.FlushAsync(cts.Token).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
// If already canceled, throw that instead of ObjectDisposedException.
cancellationToken.ThrowIfCancellationRequested();
throw;
await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
await this.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
// If already canceled, throw that instead of ObjectDisposedException.
cancellationToken.ThrowIfCancellationRequested();
throw;
}
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/StreamJsonRpc/StreamJsonRpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.2" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.3" />
<PackageReference Include="Nerdbank.Streams" Version="2.1.37" />
<PackageReference Include="Nerdbank.Streams" Version="2.2.26" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net46' ">
<AdditionalFiles Include="PublicAPI.Shipped.txt" />
Expand Down

0 comments on commit 90afe62

Please sign in to comment.