From 057e3dca9db9bd6ec56540ac581d7676a9d4c637 Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Sun, 15 Sep 2024 19:53:58 +0200 Subject: [PATCH 1/8] improve publishing operation to avoid discarding values when a large queueSize is used for DataValues / Events or a large number of monitored items is created in a single subscription --- .../Subscription/IMonitoredItem.cs | 5 ++- .../Subscription/MonitoredItem.cs | 45 ++++++++++++------- .../Subscription/Subscription.cs | 22 +++++++-- 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs index 6f8730294..603899f19 100644 --- a/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs @@ -168,7 +168,7 @@ public interface IEventMonitoredItem : IMonitoredItem /// Publishes all available event notifications. /// /// True if the caller should re-queue the item for publishing after the next interval elaspses. - bool Publish(OperationContext context, Queue notifications); + bool Publish(OperationContext context, Queue notifications, uint maxNotificationsPerPublish); /// /// Modifies the attributes for monitored item. @@ -212,7 +212,8 @@ public interface IDataChangeMonitoredItem : IMonitoredItem bool Publish( OperationContext context, Queue notifications, - Queue diagnostics); + Queue diagnostics, + uint maxNotificationsPerPublish); } /// diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index cefd14eef..7f1fa74c6 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -30,7 +30,6 @@ using System; using System.Collections.Generic; using System.Globalization; -using System.Threading; using System.Xml; namespace Opc.Ua.Server @@ -1127,7 +1126,7 @@ private void IncrementSampleTime() /// /// Publishes all available event notifications. /// - public virtual bool Publish(OperationContext context, Queue notifications) + public virtual bool Publish(OperationContext context, Queue notifications, uint maxNotificationsPerPublish) { if (context == null) throw new ArgumentNullException(nameof(context)); if (notifications == null) throw new ArgumentNullException(nameof(notifications)); @@ -1190,8 +1189,18 @@ public virtual bool Publish(OperationContext context, Queue noti notifications.Enqueue(overflowEvent); } + + int notificationCount = overflowEvent != null ? 1 : 0; + for (int ii = 0; ii < m_events.Count; ii++) { + //stop publishing if maxNotificationsPerPublish is reached + if (notificationCount >= maxNotificationsPerPublish) + { + break; + } + + EventFieldList fields = (EventFieldList)m_events[ii]; // apply any diagnostic masks. @@ -1208,9 +1217,9 @@ public virtual bool Publish(OperationContext context, Queue noti } notifications.Enqueue(m_events[ii]); - } - m_events.Clear(); + m_events.RemoveAt(ii); + } // place event at the end of the queue. if (overflowEvent != null && !m_discardOldest) @@ -1221,13 +1230,15 @@ public virtual bool Publish(OperationContext context, Queue noti Utils.LogTrace(Utils.TraceMasks.OperationDetail, "MONITORED ITEM: Publish(QueueSize={0})", notifications.Count); } + bool moreValuesToPublish = m_events?.Count > 0; + // reset state variables. m_overflow = false; - m_readyToPublish = false; - m_readyToTrigger = false; + m_readyToPublish = moreValuesToPublish; + m_readyToTrigger = moreValuesToPublish; m_triggered = false; - return false; + return moreValuesToPublish; } } @@ -1237,7 +1248,8 @@ public virtual bool Publish(OperationContext context, Queue noti public virtual bool Publish( OperationContext context, Queue notifications, - Queue diagnostics) + Queue diagnostics, + uint maxNotificationsPerPublish) { if (context == null) throw new ArgumentNullException(nameof(context)); if (notifications == null) throw new ArgumentNullException(nameof(notifications)); @@ -1279,21 +1291,19 @@ public virtual bool Publish( IncrementSampleTime(); } - - m_readyToPublish = false; - // check if queueing enabled. if (m_queue != null && (!m_resendData || m_queue.ItemsInQueue != 0)) { DataValue value = null; ServiceResult error = null; + uint notificationCount = 0; - while (m_queue.Publish(out value, out error)) + while (notificationCount < maxNotificationsPerPublish && m_queue.Publish(out value, out error)) { Publish(context, notifications, diagnostics, value, error); + notificationCount++; if (m_resendData) { - m_readyToPublish = m_queue.ItemsInQueue > 0; break; } } @@ -1306,13 +1316,16 @@ public virtual bool Publish( Publish(context, notifications, diagnostics, m_lastValue, m_lastError); } - // reset state variables. + bool moreValuesToPublish = m_queue?.ItemsInQueue > 0; + + // reset state variables. m_overflow = false; - m_readyToTrigger = false; + m_readyToPublish = moreValuesToPublish; + m_readyToTrigger = moreValuesToPublish; m_resendData = false; m_triggered = false; - return false; + return moreValuesToPublish; } } diff --git a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs index 34557b237..bb5017297 100644 --- a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs +++ b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs @@ -847,23 +847,30 @@ private NotificationMessage InnerPublish( // check for monitored items that are ready to publish. LinkedListNode current = m_itemsToPublish.First; + //Limit the amount of values a monitored item publishes at once + uint maxNotificationsPerMonitoredItem = m_maxNotificationsPerPublish == 0 ? uint.MaxValue : m_maxNotificationsPerPublish * 3; + while (current != null) { LinkedListNode next = current.Next; IMonitoredItem monitoredItem = current.Value; + bool hasMoreValuesToPublish; if ((monitoredItem.MonitoredItemType & MonitoredItemTypeMask.DataChange) != 0) { - ((IDataChangeMonitoredItem)monitoredItem).Publish(context, datachanges, datachangeDiagnostics); + hasMoreValuesToPublish = ((IDataChangeMonitoredItem)monitoredItem).Publish(context, datachanges, datachangeDiagnostics, maxNotificationsPerMonitoredItem); } else { - ((IEventMonitoredItem)monitoredItem).Publish(context, events); + hasMoreValuesToPublish = ((IEventMonitoredItem)monitoredItem).Publish(context, events, maxNotificationsPerMonitoredItem); } // add back to list to check. - m_itemsToPublish.Remove(current); - m_itemsToCheck.AddLast(current); + if (!hasMoreValuesToPublish) + { + m_itemsToPublish.Remove(current); + m_itemsToCheck.AddLast(current); + } // check there are enough notifications for a message. if (m_maxNotificationsPerPublish > 0 && events.Count + datachanges.Count > m_maxNotificationsPerPublish) @@ -888,6 +895,13 @@ private NotificationMessage InnerPublish( m_diagnostics.EventNotificationsCount += (uint)(eventCount - events.Count); m_diagnostics.NotificationsCount += (uint)notificationCount; } + + //stop fetching messages from MIs when message queue is full to avoid discards + // use m_maxMessageCount - 2 to put remaining values into the last allowed message (each MI is allowed to publish 3 up to messages at once) + if (messages.Count >= m_maxMessageCount - 2) + { + break; + } } current = next; From ff23131f7dcae07a68ded9c0ee6e790aacdf9fc5 Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Sun, 15 Sep 2024 20:00:59 +0200 Subject: [PATCH 2/8] implement changes in custom DataChangeMonitoredItem --- .../DataChangeMonitoredItem.cs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs b/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs index c03661639..a169599b4 100644 --- a/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs +++ b/Applications/Quickstarts.Servers/SampleNodeManager/DataChangeMonitoredItem.cs @@ -697,7 +697,7 @@ private void IncrementSampleTime() /// /// Called by the subscription to publish any notification. /// - public bool Publish(OperationContext context, Queue notifications, Queue diagnostics) + public bool Publish(OperationContext context, Queue notifications, Queue diagnostics, uint maxNotificationsPerPublish) { lock (m_lock) { @@ -715,9 +715,6 @@ public bool Publish(OperationContext context, Queue n IncrementSampleTime(); } - // update publish flag. - m_readyToPublish = false; - m_readyToTrigger = false; // check if queuing is enabled. if (m_queue != null && (!m_resendData || m_queue.ItemsInQueue != 0)) @@ -725,13 +722,15 @@ public bool Publish(OperationContext context, Queue n DataValue value = null; ServiceResult error = null; - while (m_queue.Publish(out value, out error)) + uint notificationCount = 0; + + while (notificationCount < maxNotificationsPerPublish && m_queue.Publish(out value, out error)) { Publish(context, value, error, notifications, diagnostics); + notificationCount++; if (m_resendData) { - m_readyToPublish = m_queue.ItemsInQueue > 0; break; } } @@ -741,10 +740,14 @@ public bool Publish(OperationContext context, Queue n Publish(context, m_lastValue, m_lastError, notifications, diagnostics); } + bool moreValuesToPublish = m_queue?.ItemsInQueue > 0; + // update flags + m_readyToPublish = moreValuesToPublish; + m_readyToTrigger = moreValuesToPublish; m_resendData = false; - return true; + return moreValuesToPublish; } } From 82725eeeae8c6418c9c15cd277e729d40bccd555 Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Thu, 19 Sep 2024 07:09:09 +0200 Subject: [PATCH 3/8] only place overflow event at the end of the event queue --- Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index 7f1fa74c6..78dc7d0fc 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -1222,7 +1222,7 @@ public virtual bool Publish(OperationContext context, Queue noti } // place event at the end of the queue. - if (overflowEvent != null && !m_discardOldest) + if (overflowEvent != null && !m_discardOldest && !(m_events.Count > 0)) { notifications.Enqueue(overflowEvent); } @@ -1233,7 +1233,7 @@ public virtual bool Publish(OperationContext context, Queue noti bool moreValuesToPublish = m_events?.Count > 0; // reset state variables. - m_overflow = false; + m_overflow = m_overflow && moreValuesToPublish && !m_discardOldest; m_readyToPublish = moreValuesToPublish; m_readyToTrigger = moreValuesToPublish; m_triggered = false; From 15aa944f8a97c57f5acd7d0d17a0ca550e8a9548 Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Tue, 8 Oct 2024 07:29:16 +0200 Subject: [PATCH 4/8] address review feedback --- Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs | 2 +- Libraries/Opc.Ua.Server/Subscription/Subscription.cs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index 78dc7d0fc..4b7fe8c09 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -1218,7 +1218,7 @@ public virtual bool Publish(OperationContext context, Queue noti notifications.Enqueue(m_events[ii]); - m_events.RemoveAt(ii); + m_events.RemoveAt(ii--); } // place event at the end of the queue. diff --git a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs index bb5017297..eac460177 100644 --- a/Libraries/Opc.Ua.Server/Subscription/Subscription.cs +++ b/Libraries/Opc.Ua.Server/Subscription/Subscription.cs @@ -865,7 +865,10 @@ private NotificationMessage InnerPublish( hasMoreValuesToPublish = ((IEventMonitoredItem)monitoredItem).Publish(context, events, maxNotificationsPerMonitoredItem); } - // add back to list to check. + // if item has more values to publish leave it at the front of the list + // to execute publish in next cycle, no checking needed + // if no more values to publish are left add it to m_itemsToCheck + // to check status on next publish cylce if (!hasMoreValuesToPublish) { m_itemsToPublish.Remove(current); From fad3b9d186a3113bf2ba666e11f63124285a6fb3 Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Tue, 8 Oct 2024 11:47:12 +0200 Subject: [PATCH 5/8] fix notification count increment --- Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index 4b7fe8c09..cd39dc748 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -1200,7 +1200,6 @@ public virtual bool Publish(OperationContext context, Queue noti break; } - EventFieldList fields = (EventFieldList)m_events[ii]; // apply any diagnostic masks. @@ -1217,7 +1216,7 @@ public virtual bool Publish(OperationContext context, Queue noti } notifications.Enqueue(m_events[ii]); - + notificationCount++; m_events.RemoveAt(ii--); } From 2ffc444802718fd5378f9529257da170844d378d Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Wed, 9 Oct 2024 19:39:18 +0200 Subject: [PATCH 6/8] fix publishing of Events --- Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index cd39dc748..8092e0ad2 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -1200,7 +1200,7 @@ public virtual bool Publish(OperationContext context, Queue noti break; } - EventFieldList fields = (EventFieldList)m_events[ii]; + EventFieldList fields = m_events[ii]; // apply any diagnostic masks. for (int jj = 0; jj < fields.EventFields.Count; jj++) @@ -1209,15 +1209,12 @@ public virtual bool Publish(OperationContext context, Queue noti StatusResult result = value as StatusResult; - if (result != null) - { - result.ApplyDiagnosticMasks(context.DiagnosticsMask, context.StringTable); - } + result?.ApplyDiagnosticMasks(context.DiagnosticsMask, context.StringTable); } notifications.Enqueue(m_events[ii]); + m_events.Remove(m_events[ii]); notificationCount++; - m_events.RemoveAt(ii--); } // place event at the end of the queue. From c982aa1067e64767c2fcfa7e2cf78ce9ef7848ef Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Wed, 9 Oct 2024 22:22:59 +0200 Subject: [PATCH 7/8] fix event publishing add tests --- .../Subscription/MonitoredItem.cs | 15 +- .../Opc.Ua.Server.Tests/CommonTestWorkers.cs | 47 +++++ .../Opc.Ua.Server.Tests/MonitoredItemTests.cs | 194 ++++++++++++++++++ 3 files changed, 248 insertions(+), 8 deletions(-) create mode 100644 Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index 8092e0ad2..14554fb19 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -1178,7 +1178,7 @@ public virtual bool Publish(OperationContext context, Queue noti // fetch the event fields. overflowEvent = GetEventFields( - new FilterContext(m_server.NamespaceUris, m_server.TypeTree, Session.PreferredLocales), + new FilterContext(m_server.NamespaceUris, m_server.TypeTree, Session?.PreferredLocales), m_filterToUse as EventFilter, e); } @@ -1191,17 +1191,14 @@ public virtual bool Publish(OperationContext context, Queue noti int notificationCount = overflowEvent != null ? 1 : 0; - - for (int ii = 0; ii < m_events.Count; ii++) + int eventsToRemove = 0; + foreach (EventFieldList fields in m_events) { //stop publishing if maxNotificationsPerPublish is reached if (notificationCount >= maxNotificationsPerPublish) { break; } - - EventFieldList fields = m_events[ii]; - // apply any diagnostic masks. for (int jj = 0; jj < fields.EventFields.Count; jj++) { @@ -1212,11 +1209,13 @@ public virtual bool Publish(OperationContext context, Queue noti result?.ApplyDiagnosticMasks(context.DiagnosticsMask, context.StringTable); } - notifications.Enqueue(m_events[ii]); - m_events.Remove(m_events[ii]); + notifications.Enqueue(fields); notificationCount++; + eventsToRemove++; } + m_events.RemoveRange(0, eventsToRemove); + // place event at the end of the queue. if (overflowEvent != null && !m_discardOldest && !(m_events.Count > 0)) { diff --git a/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs b/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs index 423289594..5d6436610 100644 --- a/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs +++ b/Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs @@ -375,6 +375,10 @@ public static void SubscriptionTest( QueueSize = queueSize } }); + + //add event item + itemsToCreate.Add(CreateEventMonitoredItem(queueSize, ref handleCounter)); + response = services.CreateMonitoredItems(requestHeader, id, TimestampsToReturn.Neither, itemsToCreate, out MonitoredItemCreateResultCollection itemCreateResults, out DiagnosticInfoCollection diagnosticInfos); ServerFixtureUtils.ValidateResponse(response, itemCreateResults, itemsToCreate); @@ -682,6 +686,49 @@ int samplingInterval ServerFixtureUtils.ValidateResponse(response, itemCreateResults, itemsToCreate); ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, itemsToCreate, response.StringTable); } + + private static MonitoredItemCreateRequest CreateEventMonitoredItem(uint queueSize, ref uint handleCounter) + { + var whereClause = new ContentFilter(); + + whereClause.Push(FilterOperator.Equals, new FilterOperand[] { + new SimpleAttributeOperand() { + AttributeId = Attributes.Value, + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = new QualifiedNameCollection(new QualifiedName[] { "EventType" }) + }, + new LiteralOperand { + Value = new Variant(new NodeId(ObjectTypeIds.BaseEventType)) + } + }); + + var mi = new MonitoredItemCreateRequest() { + ItemToMonitor = new ReadValueId() { + AttributeId = Attributes.EventNotifier, + NodeId = ObjectIds.Server + }, + MonitoringMode = MonitoringMode.Reporting, + RequestedParameters = new MonitoringParameters() { + ClientHandle = ++handleCounter, + SamplingInterval = -1, + Filter = new ExtensionObject( + new EventFilter { + SelectClauses = new SimpleAttributeOperandCollection( + new SimpleAttributeOperand[] { + new SimpleAttributeOperand{ + AttributeId = Attributes.Value, + TypeDefinitionId = ObjectTypeIds.BaseEventType, + BrowsePath = new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Message}) + } + }), + WhereClause = whereClause, + }), + DiscardOldest = true, + QueueSize = queueSize + } + }; + return mi; + } #endregion } diff --git a/Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs b/Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs new file mode 100644 index 000000000..fd6a1bb73 --- /dev/null +++ b/Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs @@ -0,0 +1,194 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using BenchmarkDotNet.Attributes; +using Microsoft.AspNetCore.Hosting.Server; +using Moq; +using NUnit.Framework; + +namespace Opc.Ua.Server.Tests +{ + /// + /// Test MonitoredItem + /// + [TestFixture, Category("MonitoredItem")] + [SetCulture("en-us"), SetUICulture("en-us")] + [Parallelizable] + [MemoryDiagnoser] + public class MonitoreItemTests + { + + #region MonitoredItemDurable + [Test] + public void CreateMI() + { + MonitoredItem monitoredItem = CreateMonitoredItem(); + Assert.That(monitoredItem, Is.Not.Null); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + + var statuscode = new ServiceResult(StatusCodes.Good); + var dataValue = new DataValue(new Variant(true)); + + monitoredItem.QueueValue(dataValue, statuscode); + + //bug in current implementation fixed with durable + //Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(1)); + + var result = new Queue(); + var result2 = new Queue(); + monitoredItem.Publish(new OperationContext(monitoredItem), result, result2, 1); + + Assert.That(result, Is.Not.Empty); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + MonitoredItemNotification publishResult = result.FirstOrDefault(); + Assert.That(publishResult?.Value, Is.EqualTo(dataValue)); + DiagnosticInfo publishErrorResult = result2.FirstOrDefault(); + Assert.That(publishErrorResult.InnerStatusCode, Is.EqualTo((StatusCode)StatusCodes.Good)); + + } + + [Test] + public void CreateEventMI() + { + MonitoredItem monitoredItem = CreateMonitoredItem(true); + Assert.That(monitoredItem, Is.Not.Null); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(1)); + + + var result = new Queue(); + monitoredItem.Publish(new OperationContext(monitoredItem), result, 1); + + Assert.That(result, Is.Not.Empty); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + EventFieldList publishResult = result.FirstOrDefault(); + Assert.That(publishResult, Is.Not.Null); + Assert.That(publishResult.Handle, Is.AssignableTo(typeof(AuditUrlMismatchEventState))); + } + + [Test] + public void CreateMIQueueNoQueue() + { + + MonitoredItem monitoredItem = CreateMonitoredItem(false, 0); + + Assert.That(monitoredItem.QueueSize, Is.EqualTo(1)); + + var statuscode = new ServiceResult(StatusCodes.Good); + var dataValue = new DataValue(new Variant(true)); + + monitoredItem.QueueValue(dataValue, statuscode); + + + var result = new Queue(); + var result2 = new Queue(); + monitoredItem.Publish(new OperationContext(monitoredItem), result, result2, 1); + + Assert.That(result, Is.Not.Empty); + MonitoredItemNotification publishResult = result.FirstOrDefault(); + Assert.That(publishResult?.Value, Is.EqualTo(dataValue)); + DiagnosticInfo publishErrorResult = result2.FirstOrDefault(); + Assert.That(publishErrorResult.InnerStatusCode, Is.EqualTo((StatusCode)StatusCodes.Good)); + } + + [Test] + public void CreateEventMIOverflow() + { + MonitoredItem monitoredItem = CreateMonitoredItem(true, 2); + Assert.That(monitoredItem, Is.Not.Null); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2)); + + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2)); + + + var result = new Queue(); + monitoredItem.Publish(new OperationContext(monitoredItem), result, 3); + + Assert.That(result, Is.Not.Empty); + Assert.That(result.Count, Is.EqualTo(3)); + EventFieldList publishResult = result.LastOrDefault(); + Assert.That(publishResult, Is.Not.Null); + Assert.That(publishResult.Handle, Is.AssignableTo(typeof(EventQueueOverflowEventState))); + } + + + [Test] + public void CreateEventMIPublishPartial() + { + MonitoredItem monitoredItem = CreateMonitoredItem(true, 3); + Assert.That(monitoredItem, Is.Not.Null); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(3)); + + + var result = new Queue(); + monitoredItem.Publish(new OperationContext(monitoredItem), result, 2); + + Assert.That(result, Is.Not.Empty); + Assert.That(result.Count, Is.EqualTo(2)); + EventFieldList publishResult = result.LastOrDefault(); + Assert.That(publishResult, Is.Not.Null); + Assert.That(publishResult.Handle, Is.AssignableTo(typeof(AuditUrlMismatchEventState))); + + var result2 = new Queue(); + monitoredItem.Publish(new OperationContext(monitoredItem), result2, 2); + + Assert.That(result2, Is.Not.Empty); + Assert.That(result2.Count, Is.EqualTo(1)); + EventFieldList publishResult2 = result2.LastOrDefault(); + Assert.That(publishResult2, Is.Not.Null); + Assert.That(publishResult2.Handle, Is.AssignableTo(typeof(AuditUrlMismatchEventState))); + } + #endregion + + #region private methods + private MonitoredItem CreateMonitoredItem(bool events = false, uint queueSize = 10) + { + MonitoringFilter filter = events ? new EventFilter() : new MonitoringFilter(); + + var serverMock = new Mock(); + serverMock.Setup(s => s.NamespaceUris).Returns(new NamespaceTable()); + serverMock.Setup(s => s.TypeTree).Returns(new TypeTable(new NamespaceTable())); + + var nodeMangerMock = new Mock(); + + return new MonitoredItem( + serverMock.Object, + nodeMangerMock.Object, + null, + 1, + 2, + new ReadValueId(), + DiagnosticsMasks.All, + TimestampsToReturn.Server, + MonitoringMode.Reporting, + 3, + filter, + filter, + null, + 1000.0, + queueSize, + false, + 1000 + ); + } + #endregion + } +} From 409a51b23736fe24446b1f1e6a3e87e705401f11 Mon Sep 17 00:00:00 2001 From: Roman Ettlinger Date: Thu, 10 Oct 2024 06:13:59 +0200 Subject: [PATCH 8/8] fix edge case --- .../Subscription/MonitoredItem.cs | 30 ++++--- .../Opc.Ua.Server.Tests/MonitoredItemTests.cs | 79 ++++++++++++++++++- 2 files changed, 93 insertions(+), 16 deletions(-) diff --git a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs index 14554fb19..7c45146f5 100644 --- a/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs +++ b/Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs @@ -1148,6 +1148,7 @@ public virtual bool Publish(OperationContext context, Queue noti // go to the next sampling interval. IncrementSampleTime(); + bool moreValuesToPublish = false; // publish events. if (m_events != null) { @@ -1183,14 +1184,16 @@ public virtual bool Publish(OperationContext context, Queue noti e); } + + int notificationCount = 0; + // place event at the beginning of the queue. if (overflowEvent != null && m_discardOldest) { notifications.Enqueue(overflowEvent); + notificationCount++; } - - int notificationCount = overflowEvent != null ? 1 : 0; int eventsToRemove = 0; foreach (EventFieldList fields in m_events) { @@ -1202,11 +1205,7 @@ public virtual bool Publish(OperationContext context, Queue noti // apply any diagnostic masks. for (int jj = 0; jj < fields.EventFields.Count; jj++) { - object value = fields.EventFields[jj].Value; - - StatusResult result = value as StatusResult; - - result?.ApplyDiagnosticMasks(context.DiagnosticsMask, context.StringTable); + (fields.EventFields[jj].Value as StatusResult)?.ApplyDiagnosticMasks(context.DiagnosticsMask, context.StringTable); } notifications.Enqueue(fields); @@ -1216,17 +1215,24 @@ public virtual bool Publish(OperationContext context, Queue noti m_events.RemoveRange(0, eventsToRemove); - // place event at the end of the queue. - if (overflowEvent != null && !m_discardOldest && !(m_events.Count > 0)) + moreValuesToPublish = m_events?.Count > 0; + + // place overflow event at the end of the queue if there is still space in the publish. + if (overflowEvent != null && !m_discardOldest) { - notifications.Enqueue(overflowEvent); + if (notificationCount < maxNotificationsPerPublish) + { + notifications.Enqueue(overflowEvent); + } + else + { + moreValuesToPublish = true; + } } Utils.LogTrace(Utils.TraceMasks.OperationDetail, "MONITORED ITEM: Publish(QueueSize={0})", notifications.Count); } - bool moreValuesToPublish = m_events?.Count > 0; - // reset state variables. m_overflow = m_overflow && moreValuesToPublish && !m_discardOldest; m_readyToPublish = moreValuesToPublish; diff --git a/Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs b/Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs index fd6a1bb73..4f2440df1 100644 --- a/Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs +++ b/Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs @@ -123,6 +123,74 @@ public void CreateEventMIOverflow() Assert.That(publishResult.Handle, Is.AssignableTo(typeof(EventQueueOverflowEventState))); } + [Test] + public void CreateEventMIOverflowMultiplePublish() + { + MonitoredItem monitoredItem = CreateMonitoredItem(true, 2); + Assert.That(monitoredItem, Is.Not.Null); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2)); + + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2)); + + + var result = new Queue(); + bool moreItems = monitoredItem.Publish(new OperationContext(monitoredItem), result, 2); + + Assert.That(moreItems, Is.True); + Assert.That(result, Is.Not.Empty); + Assert.That(result.Count, Is.EqualTo(2)); + EventFieldList publishResult = result.LastOrDefault(); + Assert.That(publishResult, Is.Not.Null); + Assert.That(publishResult.Handle, Is.AssignableTo(typeof(AuditUrlMismatchEventState))); + + + var result2 = new Queue(); + bool moreItems2 = monitoredItem.Publish(new OperationContext(monitoredItem), result2, 2); + + Assert.That(moreItems2, Is.False); + Assert.That(result2, Is.Not.Empty); + Assert.That(result2.Count, Is.EqualTo(1)); + EventFieldList publishResult2 = result2.FirstOrDefault(); + Assert.That(publishResult2, Is.Not.Null); + Assert.That(publishResult2.Handle, Is.AssignableTo(typeof(EventQueueOverflowEventState))); + } + + [Test] + public void CreateEventMIOverflowNoDiscard() + { + MonitoredItem monitoredItem = CreateMonitoredItem(true, 2, true); + Assert.That(monitoredItem, Is.Not.Null); + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0)); + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2)); + + + monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null)); + + Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2)); + + + var result = new Queue(); + monitoredItem.Publish(new OperationContext(monitoredItem), result, 3); + + Assert.That(result, Is.Not.Empty); + Assert.That(result.Count, Is.EqualTo(3)); + EventFieldList publishResult = result.FirstOrDefault(); + Assert.That(publishResult, Is.Not.Null); + Assert.That(publishResult.Handle, Is.AssignableTo(typeof(EventQueueOverflowEventState))); + } + [Test] public void CreateEventMIPublishPartial() @@ -139,8 +207,10 @@ public void CreateEventMIPublishPartial() var result = new Queue(); - monitoredItem.Publish(new OperationContext(monitoredItem), result, 2); + bool moreItems = monitoredItem.Publish(new OperationContext(monitoredItem), result, 2); + + Assert.That(moreItems, Is.True); Assert.That(result, Is.Not.Empty); Assert.That(result.Count, Is.EqualTo(2)); EventFieldList publishResult = result.LastOrDefault(); @@ -148,8 +218,9 @@ public void CreateEventMIPublishPartial() Assert.That(publishResult.Handle, Is.AssignableTo(typeof(AuditUrlMismatchEventState))); var result2 = new Queue(); - monitoredItem.Publish(new OperationContext(monitoredItem), result2, 2); + bool moreItems2 = monitoredItem.Publish(new OperationContext(monitoredItem), result2, 2); + Assert.That(moreItems2, Is.False); Assert.That(result2, Is.Not.Empty); Assert.That(result2.Count, Is.EqualTo(1)); EventFieldList publishResult2 = result2.LastOrDefault(); @@ -159,7 +230,7 @@ public void CreateEventMIPublishPartial() #endregion #region private methods - private MonitoredItem CreateMonitoredItem(bool events = false, uint queueSize = 10) + private MonitoredItem CreateMonitoredItem(bool events = false, uint queueSize = 10, bool discardOldest = false) { MonitoringFilter filter = events ? new EventFilter() : new MonitoringFilter(); @@ -185,7 +256,7 @@ private MonitoredItem CreateMonitoredItem(bool events = false, uint queueSize = null, 1000.0, queueSize, - false, + discardOldest, 1000 ); }