Skip to content

Commit

Permalink
Revert "segregate TLS and non TLS paths"
Browse files Browse the repository at this point in the history
This reverts commit 363bfb9.
  • Loading branch information
badrishc committed Dec 23, 2024
1 parent 363bfb9 commit 2b901d5
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 91 deletions.
4 changes: 2 additions & 2 deletions benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public override void Dispose()

public override bool TryClose() => throw new NotImplementedException();

public void Send(Request request)
public async ValueTask Send(Request request)
{
networkReceiveBuffer = request.buffer;
unsafe { networkReceiveBufferPtr = request.bufferPtr; }

OnNetworkReceive(request.buffer.Length);
await OnNetworkReceiveAsync(request.buffer.Length);

Debug.Assert(networkBytesRead == 0);
Debug.Assert(networkReadHead == 0);
Expand Down
4 changes: 2 additions & 2 deletions benchmark/BDN.benchmark/Network/BasicOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public override void GlobalSetup()
}

[Benchmark]
public void InlinePing()
public async ValueTask InlinePing()
{
Send(ping);
await Send(ping);
}
}
}
5 changes: 4 additions & 1 deletion benchmark/BDN.benchmark/Network/NetworkBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public virtual void GlobalCleanup()
server.Dispose();
}

protected void Send(Request request) => networkHandler.Send(request);
protected ValueTask Send(Request request)
{
return networkHandler.Send(request);
}

protected unsafe void SetupOperation(ref Request request, ReadOnlySpan<byte> operation, int batchSize = batchSize)
{
Expand Down
40 changes: 20 additions & 20 deletions benchmark/BDN.benchmark/Network/RawStringOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,63 +65,63 @@ public override void GlobalSetup()
}

[Benchmark]
public void Set()
public async ValueTask Set()
{
Send(set);
await Send(set);
}

[Benchmark]
public void SetEx()
public async ValueTask SetEx()
{
Send(setex);
await Send(setex);
}

[Benchmark]
public void SetNx()
public async ValueTask SetNx()
{
Send(setnx);
await Send(setnx);
}

[Benchmark]
public void SetXx()
public async ValueTask SetXx()
{
Send(setxx);
await Send(setxx);
}

[Benchmark]
public void GetFound()
public async ValueTask GetFound()
{
Send(getf);
await Send(getf);
}

[Benchmark]
public void GetNotFound()
public async ValueTask GetNotFound()
{
Send(getnf);
await Send(getnf);
}

[Benchmark]
public void Increment()
public async ValueTask Increment()
{
Send(incr);
await Send(incr);
}

[Benchmark]
public void Decrement()
public async ValueTask Decrement()
{
Send(decr);
await Send(decr);
}

[Benchmark]
public void IncrementBy()
public async ValueTask IncrementBy()
{
Send(incrby);
await Send(incrby);
}

[Benchmark]
public void DecrementBy()
public async ValueTask DecrementBy()
{
Send(decrby);
await Send(decrby);
}
}
}
64 changes: 31 additions & 33 deletions libs/common/Networking/NetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,38 +267,11 @@ async Task AuthenticateAsClientAsync(SslClientAuthenticationOptions sslClientOpt
}
}

public void OnNetworkReceive(int bytesTransferred)
{
networkBytesRead += bytesTransferred;
transportReceiveBuffer = networkReceiveBuffer;
unsafe
{
transportReceiveBufferPtr = networkReceiveBufferPtr;
}
transportBytesRead = networkBytesRead;

// We do not have an active read task, so we will process on the network thread
Process();
EndTransformNetworkToTransport();
UpdateNetworkBuffers();
}

private void UpdateNetworkBuffers()
{
// Shift network buffer after processing is done
if (networkReadHead > 0)
ShiftNetworkReceiveBuffer();

// Double network buffer if out of space after processing is complete
if (networkBytesRead == networkReceiveBuffer.Length)
DoubleNetworkReceiveBuffer();
}

/// <summary>
/// On network receive
/// </summary>
/// <param name="bytesTransferred">Number of bytes transferred</param>
public async ValueTask OnNetworkReceiveWithTLS(int bytesTransferred)
public async ValueTask OnNetworkReceiveAsync(int bytesTransferred)
{
// Wait for SslStream async processing to complete, if any (e.g., authentication phase)
while (readerStatus == TlsReaderStatus.Active)
Expand All @@ -310,10 +283,26 @@ public async ValueTask OnNetworkReceiveWithTLS(int bytesTransferred)
switch (readerStatus)
{
case TlsReaderStatus.Rest:
readerStatus = TlsReaderStatus.Active;
Read();
while (readerStatus == TlsReaderStatus.Active)
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
// Synchronously try to process the received data
if (sslStream == null)
{
transportReceiveBuffer = networkReceiveBuffer;
unsafe
{
transportReceiveBufferPtr = networkReceiveBufferPtr;
}
transportBytesRead = networkBytesRead;

// We do not have an active read task, so we will process on the network thread
Process();
}
else
{
readerStatus = TlsReaderStatus.Active;
Read();
while (readerStatus == TlsReaderStatus.Active)
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
}
break;
case TlsReaderStatus.Waiting:
// We have a ReadAsync task waiting for new data, set it to active status
Expand All @@ -331,7 +320,16 @@ public async ValueTask OnNetworkReceiveWithTLS(int bytesTransferred)
}

Debug.Assert(readerStatus != TlsReaderStatus.Active);
UpdateNetworkBuffers();

EndTransformNetworkToTransport();

// Shift network buffer after processing is done
if (networkReadHead > 0)
ShiftNetworkReceiveBuffer();

// Double network buffer if out of space after processing is complete
if (networkBytesRead == networkReceiveBuffer.Length)
DoubleNetworkReceiveBuffer();
}

[MethodImpl(MethodImplOptions.NoInlining)]
Expand Down
36 changes: 3 additions & 33 deletions libs/common/Networking/TcpNetworkHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public abstract class TcpNetworkHandlerBase<TServerHook, TNetworkSender> : Netwo
readonly string remoteEndpoint;
readonly string localEndpoint;
int closeRequested;
readonly bool useTLS;

/// <summary>
/// Constructor
Expand All @@ -40,7 +39,7 @@ public TcpNetworkHandlerBase(TServerHook serverHook, TNetworkSender networkSende

remoteEndpoint = socket.RemoteEndPoint is IPEndPoint remote ? $"{remote.Address}:{remote.Port}" : "";
localEndpoint = socket.LocalEndPoint is IPEndPoint local ? $"{local.Address}:{local.Port}" : "";
this.useTLS = useTLS;

AllocateNetworkReceiveBuffer();
}

Expand Down Expand Up @@ -138,36 +137,7 @@ void Dispose(SocketAsyncEventArgs e)
void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
// Complete receive event and release thread while we process data async
if (this.useTLS)
{
_ = HandleReceiveAsync(sender, e);
}
else
{
HandleReceiveSync(sender, e);
}
}

private void HandleReceiveSync(object sender, SocketAsyncEventArgs e)
{
try
{
do
{
if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || serverHook.Disposed)
{
// No more things to receive
Dispose(e);
break;
}
OnNetworkReceive(e.BytesTransferred);
e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead);
} while (!e.AcceptSocket.ReceiveAsync(e));
}
catch (Exception ex)
{
HandleReceiveFailure(ex, e);
}
_ = HandleReceiveAsync(sender, e);
}

private async ValueTask HandleReceiveAsync(object sender, SocketAsyncEventArgs e)
Expand All @@ -182,7 +152,7 @@ private async ValueTask HandleReceiveAsync(object sender, SocketAsyncEventArgs e
Dispose(e);
break;
}
var receiveTask = OnNetworkReceiveWithTLS(e.BytesTransferred);
var receiveTask = OnNetworkReceiveAsync(e.BytesTransferred);
if (!receiveTask.IsCompletedSuccessfully)
{
await receiveTask;
Expand Down

0 comments on commit 2b901d5

Please sign in to comment.