Skip to content

Commit

Permalink
Optimize send feature
Browse files Browse the repository at this point in the history
  • Loading branch information
HakanL committed Mar 1, 2024
1 parent 06ca901 commit d265316
Showing 1 changed file with 97 additions and 3 deletions.
100 changes: 97 additions & 3 deletions src/Haukcode.sACN/SACNClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ public SendData()
}
}

public class OutputData : IDisposable
{
public Stopwatch LastSent = Stopwatch.StartNew();

public double AgeMS => LastSent.Elapsed.TotalMilliseconds;

public IMemoryOwner<byte> SendData;

public void Dispose()
{
SendData.Dispose();
}
}

private const int ReceiveBufferSize = 20480;
private const int SendBufferSize = 1024;
private static readonly IPEndPoint _blankEndpoint = new(IPAddress.Any, 0);
Expand All @@ -70,6 +84,7 @@ public SendData()
private int droppedPackets;
private int slowSends;
private readonly HashSet<(IPAddress Destination, ushort UniverseId)> usedDestinations = new();
private readonly Dictionary<(IPAddress Destination, ushort UniverseId), OutputData> outputDataPerDestination = new();

public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int port = 5568)
{
Expand Down Expand Up @@ -188,6 +203,8 @@ private void ConfigureSendSocket(Socket socket)

public string SenderName { get; }

public bool OptimizeSend { get; set; }

public IObservable<Exception> OnError => this.errorSubject.AsObservable();

public SendStatistics SendStatistics
Expand Down Expand Up @@ -269,7 +286,7 @@ private async Task Receiver()
}
catch (Exception ex)
{
if (!(ex is OperationCanceledException))
if (ex is not OperationCanceledException)
{
this.errorSubject.OnNext(ex);
}
Expand Down Expand Up @@ -302,8 +319,10 @@ private async Task Sender()

var socketData = GetSendSocket(sendData.UniverseId);

var destination = sendData.Destination ?? socketData.Destination;

var watch = Stopwatch.StartNew();
await socketData.Socket.SendToAsync(sendData.Data.Memory[..sendData.DataLength], SocketFlags.None, sendData.Destination ?? socketData.Destination);
await socketData.Socket.SendToAsync(sendData.Data.Memory[..sendData.DataLength], SocketFlags.None, destination);
watch.Stop();

if (watch.ElapsedMilliseconds > 20)
Expand Down Expand Up @@ -389,11 +408,18 @@ public void DropDMXUniverse(ushort universeId)
/// <param name="startCode">Start code (default 0)</param>
public void SendMulticast(ushort universeId, ReadOnlyMemory<byte> data, byte priority = 100, ushort syncAddress = 0, byte startCode = 0)
{
// See if we should send
if (SkipSending(null, universeId, data))
return;

byte sequenceId = GetNewSequenceId(universeId);

var packet = new SACNDataPacket(universeId, SenderName, SenderId, sequenceId, data, priority, syncAddress, startCode);

SendPacket(universeId, packet);

var outputData = UpdateOutputData(null, universeId);
data.CopyTo(outputData.SendData.Memory);
}

/// <summary>
Expand All @@ -406,11 +432,18 @@ public void SendMulticast(ushort universeId, ReadOnlyMemory<byte> data, byte pri
/// <param name="startCode">Start code (default 0)</param>
public void SendUnicast(IPAddress address, ushort universeId, ReadOnlyMemory<byte> data, byte priority = 100, ushort syncAddress = 0, byte startCode = 0)
{
// See if we should send
if (SkipSending(address, universeId, data))
return;

byte sequenceId = GetNewSequenceId(universeId);

var packet = new SACNDataPacket(universeId, SenderName, SenderId, sequenceId, data, priority, syncAddress, startCode);

SendPacket(universeId, address, packet);

var outputData = UpdateOutputData(address, universeId);
data.CopyTo(outputData.SendData.Memory);
}

/// <summary>
Expand Down Expand Up @@ -484,7 +517,14 @@ public void SendPacket(ushort universeId, IPAddress destination, SACNPacket pack
this.usedDestinations.Add((destination, universeId));

if (IsOperational)
{
this.sendQueue.Add(newSendData);
}
else
{
// Clear queue
while (this.sendQueue.TryTake(out _)) ;
}
}

/// <summary>
Expand All @@ -507,11 +547,13 @@ public void SendPacket(ushort universeId, SACNPacket packet)

this.usedDestinations.Add((null, universeId));
if (IsOperational)
{
this.sendQueue.Add(newSendData);
}
else
{
// Clear queue
while (this.sendQueue.TryTake(out _));
while (this.sendQueue.TryTake(out _)) ;
}

//var socketData = GetSendSocket(universeId);
Expand All @@ -525,6 +567,53 @@ public void SendPacket(ushort universeId, SACNPacket packet)
//}
}

private bool SkipSending(IPAddress destination, ushort universeId, ReadOnlyMemory<byte> data)
{
if (!OptimizeSend)
return false;

OutputData outputData;
lock (this.lockObject)
{
if (!this.outputDataPerDestination.TryGetValue((destination, universeId), out outputData))
return false;
}

if (outputData.AgeMS > 1_000)
// Send at least once every second
return false;

// Check if the data has changed
if (data.Length != outputData.SendData.Memory.Length)
return false;

if (data.Span.SequenceEqual(outputData.SendData.Memory.Span))
// Identical
return true;

return false;
}

private OutputData UpdateOutputData(IPAddress destination, ushort universeId)
{
lock (this.lockObject)
{
if (!this.outputDataPerDestination.TryGetValue((null, universeId), out var outputData))
{
outputData = new OutputData
{
SendData = this.memoryPool.Rent(512)
};

this.outputDataPerDestination.Add((null, universeId), outputData);
}

outputData.LastSent.Restart();

return outputData;
}
}

public void WarmUpSockets(IEnumerable<ushort> universeIds)
{
foreach (ushort universeId in universeIds)
Expand Down Expand Up @@ -596,6 +685,11 @@ public void Dispose()

this.listenSocket.Close();
this.listenSocket.Dispose();

foreach (var kvp in this.outputDataPerDestination)
{
kvp.Value.Dispose();
}
}
}
}

0 comments on commit d265316

Please sign in to comment.