diff --git a/src/Haukcode.sACN/SACNClient.cs b/src/Haukcode.sACN/SACNClient.cs index 1a8db94..6ea2960 100644 --- a/src/Haukcode.sACN/SACNClient.cs +++ b/src/Haukcode.sACN/SACNClient.cs @@ -46,6 +46,20 @@ public SendData() } } + public class OutputData : IDisposable + { + public Stopwatch LastSent = Stopwatch.StartNew(); + + public double AgeMS => LastSent.Elapsed.TotalMilliseconds; + + public IMemoryOwner SendData; + + public void Dispose() + { + SendData.Dispose(); + } + } + private const int ReceiveBufferSize = 20480; private const int SendBufferSize = 1024; private static readonly IPEndPoint _blankEndpoint = new(IPAddress.Any, 0); @@ -70,6 +84,7 @@ public SendData() private int droppedPackets; private int slowSends; private readonly HashSet<(IPAddress Destination, ushort UniverseId)> usedDestinations = new(); + private readonly Dictionary<(IPAddress Destination, ushort UniverseId), OutputData> outputDataPerDestination = new(); public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int port = 5568) { @@ -188,6 +203,8 @@ private void ConfigureSendSocket(Socket socket) public string SenderName { get; } + public bool OptimizeSend { get; set; } + public IObservable OnError => this.errorSubject.AsObservable(); public SendStatistics SendStatistics @@ -269,7 +286,7 @@ private async Task Receiver() } catch (Exception ex) { - if (!(ex is OperationCanceledException)) + if (ex is not OperationCanceledException) { this.errorSubject.OnNext(ex); } @@ -302,8 +319,10 @@ private async Task Sender() var socketData = GetSendSocket(sendData.UniverseId); + var destination = sendData.Destination ?? socketData.Destination; + var watch = Stopwatch.StartNew(); - await socketData.Socket.SendToAsync(sendData.Data.Memory[..sendData.DataLength], SocketFlags.None, sendData.Destination ?? socketData.Destination); + await socketData.Socket.SendToAsync(sendData.Data.Memory[..sendData.DataLength], SocketFlags.None, destination); watch.Stop(); if (watch.ElapsedMilliseconds > 20) @@ -389,11 +408,18 @@ public void DropDMXUniverse(ushort universeId) /// Start code (default 0) public void SendMulticast(ushort universeId, ReadOnlyMemory data, byte priority = 100, ushort syncAddress = 0, byte startCode = 0) { + // See if we should send + if (SkipSending(null, universeId, data)) + return; + byte sequenceId = GetNewSequenceId(universeId); var packet = new SACNDataPacket(universeId, SenderName, SenderId, sequenceId, data, priority, syncAddress, startCode); SendPacket(universeId, packet); + + var outputData = UpdateOutputData(null, universeId); + data.CopyTo(outputData.SendData.Memory); } /// @@ -406,11 +432,18 @@ public void SendMulticast(ushort universeId, ReadOnlyMemory data, byte pri /// Start code (default 0) public void SendUnicast(IPAddress address, ushort universeId, ReadOnlyMemory data, byte priority = 100, ushort syncAddress = 0, byte startCode = 0) { + // See if we should send + if (SkipSending(address, universeId, data)) + return; + byte sequenceId = GetNewSequenceId(universeId); var packet = new SACNDataPacket(universeId, SenderName, SenderId, sequenceId, data, priority, syncAddress, startCode); SendPacket(universeId, address, packet); + + var outputData = UpdateOutputData(address, universeId); + data.CopyTo(outputData.SendData.Memory); } /// @@ -484,7 +517,14 @@ public void SendPacket(ushort universeId, IPAddress destination, SACNPacket pack this.usedDestinations.Add((destination, universeId)); if (IsOperational) + { this.sendQueue.Add(newSendData); + } + else + { + // Clear queue + while (this.sendQueue.TryTake(out _)) ; + } } /// @@ -507,11 +547,13 @@ public void SendPacket(ushort universeId, SACNPacket packet) this.usedDestinations.Add((null, universeId)); if (IsOperational) + { this.sendQueue.Add(newSendData); + } else { // Clear queue - while (this.sendQueue.TryTake(out _)); + while (this.sendQueue.TryTake(out _)) ; } //var socketData = GetSendSocket(universeId); @@ -525,6 +567,53 @@ public void SendPacket(ushort universeId, SACNPacket packet) //} } + private bool SkipSending(IPAddress destination, ushort universeId, ReadOnlyMemory data) + { + if (!OptimizeSend) + return false; + + OutputData outputData; + lock (this.lockObject) + { + if (!this.outputDataPerDestination.TryGetValue((destination, universeId), out outputData)) + return false; + } + + if (outputData.AgeMS > 1_000) + // Send at least once every second + return false; + + // Check if the data has changed + if (data.Length != outputData.SendData.Memory.Length) + return false; + + if (data.Span.SequenceEqual(outputData.SendData.Memory.Span)) + // Identical + return true; + + return false; + } + + private OutputData UpdateOutputData(IPAddress destination, ushort universeId) + { + lock (this.lockObject) + { + if (!this.outputDataPerDestination.TryGetValue((null, universeId), out var outputData)) + { + outputData = new OutputData + { + SendData = this.memoryPool.Rent(512) + }; + + this.outputDataPerDestination.Add((null, universeId), outputData); + } + + outputData.LastSent.Restart(); + + return outputData; + } + } + public void WarmUpSockets(IEnumerable universeIds) { foreach (ushort universeId in universeIds) @@ -596,6 +685,11 @@ public void Dispose() this.listenSocket.Close(); this.listenSocket.Dispose(); + + foreach (var kvp in this.outputDataPerDestination) + { + kvp.Value.Dispose(); + } } } }