Skip to content

Commit

Permalink
Fix bug in StreamingUtf8JsonReader, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
peterrsongg committed Nov 21, 2024
1 parent fe0da63 commit 3124916
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ public Utf8JsonReader Reader
}

private Stream _stream;
// due to all the resizing that happens with the byte array, pooling is not used here because the
// byte array returned by Array.Resize is not owned by the pool.
private byte[] _buffer;

public StreamingUtf8JsonReader(Stream stream)
{
_stream = stream;
_buffer = ArrayPool<byte>.Shared.Rent(4096);
// the default for STJ's deserializer is 16,384 but we'll leave this at 4096 for now.
_buffer = new byte[4096];
// need to initialize the reader even if the buffer is empty because auto-default of unassigned fields is only
// supported in C# 11+
_reader = new Utf8JsonReader(_buffer);
Expand Down Expand Up @@ -95,10 +98,15 @@ public void PassReaderByRef(RefAction action)
/// <returns>true if there is more data, false otherwise.</returns>
public bool Read()
{
// hasMoreData can return false if the value starts in one buffer and leaks into the next buffer
bool hasMoreData = _reader.Read();
if (!hasMoreData)

while (!hasMoreData)
{
GetMoreBytesFromStream(_stream, ref _buffer, ref _reader);
if (_reader.IsFinalBlock)
break;

hasMoreData = _reader.Read();
}

Expand All @@ -109,24 +117,31 @@ public bool Read()
private static void GetMoreBytesFromStream(Stream stream, ref byte[] buffer, ref Utf8JsonReader reader)
{
int bytesRead;
bool resized = false;
// if Read() returned false and we are here that means that we couldn't fully parse the JSON token
// because it was too large to fit in the remainder of the buffer.
if (reader.BytesConsumed < buffer.Length)
{
ReadOnlySpan<byte> leftover = buffer.AsSpan((int)reader.BytesConsumed);
ReadOnlySpan<byte> leftover = buffer.AsSpan().Slice((int)reader.BytesConsumed);
int previousBufferLength = buffer.Length;

if (leftover.Length == buffer.Length)
{
resized = true;
Array.Resize(ref buffer, buffer.Length * 2);
}

leftover.CopyTo(buffer);
bytesRead = stream.Read(buffer, leftover.Length, buffer.Length - leftover.Length);
// remove null bytes if they exist, since we don't know when the stream will end and we could have doubled the buffer size.
// otherwise the json reader will throw an exception.
if (resized)
Array.Resize(ref buffer, bytesRead + previousBufferLength);
}
else
{
bytesRead = stream.Read(buffer, 0, buffer.Length);
}
if (bytesRead == 0)
ArrayPool<byte>.Shared.Return(buffer);

reader = new Utf8JsonReader(buffer, isFinalBlock: bytesRead == 0, reader.CurrentState);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using AWSSDK_DotNet.CommonTest.Utils;
using AWSSDK_DotNet.UnitTests;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.IO;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Amazon.Runtime.Internal.Transform;
namespace AWSSDK.UnitTests
{
/// <summary>
/// Protocol Tests already exists to test the marshalling and unmarhsalling of request and responses in json, but they don't test very
/// large payloads, which would trigger the logic for
/// This class just tests the wrapper class StreamingUtf8JsonReader.
/// </summary>
[TestClass]
public class StreamingUtf8JsonReaderTests
{
[TestMethod]
public void HandlesUtf8BOM()
{
// we can't use reflection to access the private fields of StreamingUtf8JsonReader since it is a ref struct so we have to test it this way.
var a = Convert.ToByte('{');
var b = Convert.ToByte('x');
var c = Convert.ToByte(':');
var d = Convert.ToByte('y');
var e = Convert.ToByte('}');

byte[] payload = new byte[] { 0xEF, 0xBB, 0xBF, a, b, c ,d, e};
MemoryStream stream = new MemoryStream(payload);
StreamingUtf8JsonReader reader = new StreamingUtf8JsonReader(stream);
bool firstIteration = true;
while (reader.Read())
{
if (firstIteration)
{
// make sure the BOM was removed
Assert.IsTrue(reader.Reader.TokenType == JsonTokenType.StartObject);
firstIteration = false;
return;
}
}
}
// This method tests that if the json token starts in one buffer but continues into the next buffer
// the reader can handle parsing it correctly.
[TestMethod]
public void Utf8JsonReaderHandlesJsonTokenThatSpansMultipleBuffers()
{
// Arrange
// here we're creating a json string that is greater than 4096 bytes to test the GetMoreBytesFromStream logic
var sb = new StringBuilder();
sb.Append("{ \"key\": \"");
sb.Append(new string('x', 7500)); // String with 5000 'x' characters
sb.Append("\" }");
string largeJson = sb.ToString();

byte[] payload = Encoding.UTF8.GetBytes(largeJson);
using (var stream = new MemoryStream(payload))
{
var reader = new StreamingUtf8JsonReader(stream);
string key = null, value = null;

while (reader.Read())
{
if (reader.Reader.TokenType == JsonTokenType.PropertyName)
{
key = reader.Reader.GetString();
}
else if (reader.Reader.TokenType == JsonTokenType.String)
{
value = reader.Reader.GetString();
}
}
Assert.AreEqual<string>("key", key);
Assert.AreEqual<string>(new string('x', 7500), value);
}
}
}
}

0 comments on commit 3124916

Please sign in to comment.