From a656073352ee49bebbc358c2e2603de811ff0b21 Mon Sep 17 00:00:00 2001 From: J-N-K Date: Sun, 2 Jul 2023 12:12:53 +0200 Subject: [PATCH] Improve performance for state update handling (#3635) * Improve threading in EventHandler * refactor pagechangelistener to event * One executor per subscriber type, not per subscriber Signed-off-by: Jan N. Klug --- .../sitemap/SitemapSubscriptionService.java | 65 +++++++------- .../sitemap/internal/PageChangeListener.java | 82 +++++++----------- .../sitemap/internal/SitemapResource.java | 84 ++++++++++--------- .../sitemap/internal/SitemapResourceTest.java | 19 +++-- .../core/internal/events/EventHandler.java | 38 +++++++-- 5 files changed, 152 insertions(+), 136 deletions(-) diff --git a/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/SitemapSubscriptionService.java b/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/SitemapSubscriptionService.java index 70bafa9e4bf..d788c1762ea 100644 --- a/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/SitemapSubscriptionService.java +++ b/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/SitemapSubscriptionService.java @@ -42,6 +42,8 @@ import org.openhab.core.model.sitemap.sitemap.Widget; import org.openhab.core.thing.events.ChannelDescriptionChangedEvent; import org.openhab.core.ui.items.ItemUIRegistry; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; @@ -72,6 +74,7 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener private static final int DEFAULT_MAX_SUBSCRIPTIONS = 50; private final Logger logger = LoggerFactory.getLogger(SitemapSubscriptionService.class); + private final BundleContext bundleContext; public interface SitemapSubscriptionCallback { @@ -94,15 +97,17 @@ public interface SitemapSubscriptionCallback { private final Map creationInstants = new ConcurrentHashMap<>(); /* sitemap+page -> listener */ - private final Map pageChangeListeners = new ConcurrentHashMap<>(); + private final Map pageChangeListeners = new ConcurrentHashMap<>(); /* Max number of subscriptions at the same time */ private int maxSubscriptions = DEFAULT_MAX_SUBSCRIPTIONS; @Activate - public SitemapSubscriptionService(Map config, final @Reference ItemUIRegistry itemUIRegistry) { - applyConfig(config); + public SitemapSubscriptionService(Map config, final @Reference ItemUIRegistry itemUIRegistry, + BundleContext bundleContext) { this.itemUIRegistry = itemUIRegistry; + this.bundleContext = bundleContext; + applyConfig(config); } @Deactivate @@ -110,9 +115,7 @@ protected void deactivate() { pageOfSubscription.clear(); callbacks.clear(); creationInstants.clear(); - for (PageChangeListener listener : pageChangeListeners.values()) { - listener.dispose(); - } + pageChangeListeners.values().forEach(l -> l.serviceRegistration.unregister()); pageChangeListeners.clear(); } @@ -150,7 +153,7 @@ protected void removeSitemapProvider(SitemapProvider provider) { * Creates a new subscription with the given id. * * @param callback an instance that should receive the events - * @returns a unique id that identifies the subscription or null if the limit of subscriptions is already reached + * @return a unique id that identifies the subscription or null if the limit of subscriptions is already reached */ public @Nullable String createSubscription(SitemapSubscriptionCallback callback) { if (maxSubscriptions >= 0 && callbacks.size() >= maxSubscriptions) { @@ -176,9 +179,9 @@ public void removeSubscription(String subscriptionId) { String sitemapPage = pageOfSubscription.remove(subscriptionId); if (sitemapPage != null && !pageOfSubscription.values().contains(sitemapPage)) { // this was the only subscription listening on this page, so we can dispose the listener - PageChangeListener listener = pageChangeListeners.remove(sitemapPage); + ListenerRecord listener = pageChangeListeners.remove(sitemapPage); if (listener != null) { - listener.dispose(); + listener.serviceRegistration().unregister(); } } logger.debug("Removed subscription with id {} ({} active subscriptions)", subscriptionId, callbacks.size()); @@ -249,13 +252,14 @@ public void setPageId(String subscriptionId, String sitemapName, String pageId) } private void addCallbackToListener(String sitemapName, String pageId, SitemapSubscriptionCallback callback) { - PageChangeListener listener = pageChangeListeners.get(getValue(sitemapName, pageId)); - if (listener == null) { - // there is no listener for this page yet, so let's try to create one - listener = new PageChangeListener(sitemapName, pageId, itemUIRegistry, collectWidgets(sitemapName, pageId)); - pageChangeListeners.put(getValue(sitemapName, pageId), listener); - } - listener.addCallback(callback); + ListenerRecord listener = pageChangeListeners.computeIfAbsent(getValue(sitemapName, pageId), v -> { + PageChangeListener newListener = new PageChangeListener(sitemapName, pageId, itemUIRegistry, + collectWidgets(sitemapName, pageId)); + ServiceRegistration registration = bundleContext.registerService(EventSubscriber.class.getName(), + newListener, null); + return new ListenerRecord(newListener, registration); + }); + listener.pageChangeListener().addCallback(callback); } private EList collectWidgets(String sitemapName, String pageId) { @@ -278,12 +282,12 @@ private EList collectWidgets(String sitemapName, String pageId) { } private void removeCallbackFromListener(String sitemapPage, SitemapSubscriptionCallback callback) { - PageChangeListener oldListener = pageChangeListeners.get(sitemapPage); + ListenerRecord oldListener = pageChangeListeners.get(sitemapPage); if (oldListener != null) { - oldListener.removeCallback(callback); - if (!pageOfSubscription.values().contains(sitemapPage)) { + oldListener.pageChangeListener().removeCallback(callback); + if (!pageOfSubscription.containsValue(sitemapPage)) { // no other callbacks are left here, so we can safely dispose the listener - oldListener.dispose(); + oldListener.serviceRegistration().unregister(); pageChangeListeners.remove(sitemapPage); } } @@ -311,14 +315,14 @@ public void modelChanged(String modelName, EventType type) { String changedSitemapName = modelName.substring(0, modelName.length() - SITEMAP_SUFFIX.length()); - for (Entry listenerEntry : pageChangeListeners.entrySet()) { + for (Entry listenerEntry : pageChangeListeners.entrySet()) { String sitemapWithPage = listenerEntry.getKey(); String sitemapName = extractSitemapName(sitemapWithPage); String pageId = extractPageId(sitemapWithPage); if (sitemapName.equals(changedSitemapName)) { EList widgets = collectWidgets(sitemapName, pageId); - listenerEntry.getValue().sitemapContentChanged(widgets); + listenerEntry.getValue().pageChangeListener().sitemapContentChanged(widgets); } } } @@ -336,9 +340,7 @@ public void checkAliveClients() { } } // Send an ALIVE event to all subscribers to trigger an exception for dead subscribers - for (Entry listenerEntry : pageChangeListeners.entrySet()) { - listenerEntry.getValue().sendAliveEvent(); - } + pageChangeListeners.values().forEach(l -> l.pageChangeListener().sendAliveEvent()); } @Override @@ -355,19 +357,22 @@ public void receive(Event event) { // members and predictions aren't really possible in that case (or at least would be highly complex). return; } - for (PageChangeListener pageChangeListener : pageChangeListeners.values()) { + for (ListenerRecord listener : pageChangeListeners.values()) { if (prediction.isConfirmation()) { - pageChangeListener.keepCurrentState(item); + listener.pageChangeListener().keepCurrentState(item); } else { - pageChangeListener.changeStateTo(item, prediction.getPredictedState()); + listener.pageChangeListener().changeStateTo(item, prediction.getPredictedState()); } } } else if (event instanceof ChannelDescriptionChangedEvent channelDescriptionChangedEvent) { channelDescriptionChangedEvent.getLinkedItemNames().forEach(itemName -> { - for (PageChangeListener pageChangeListener : pageChangeListeners.values()) { - pageChangeListener.descriptionChanged(itemName); + for (ListenerRecord listener : pageChangeListeners.values()) { + listener.pageChangeListener().descriptionChanged(itemName); } }); } } + + private record ListenerRecord(PageChangeListener pageChangeListener, ServiceRegistration serviceRegistration) { + } } diff --git a/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/PageChangeListener.java b/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/PageChangeListener.java index 3252af2f6bd..30aa6bc5e92 100644 --- a/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/PageChangeListener.java +++ b/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/PageChangeListener.java @@ -20,16 +20,19 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.eclipse.emf.common.util.EList; import org.openhab.core.common.ThreadPoolManager; +import org.openhab.core.events.Event; +import org.openhab.core.events.EventSubscriber; import org.openhab.core.io.rest.core.item.EnrichedItemDTOMapper; import org.openhab.core.io.rest.sitemap.SitemapSubscriptionService.SitemapSubscriptionCallback; -import org.openhab.core.items.GenericItem; -import org.openhab.core.items.GroupItem; import org.openhab.core.items.Item; import org.openhab.core.items.ItemNotFoundException; -import org.openhab.core.items.StateChangeListener; +import org.openhab.core.items.events.GroupStateUpdatedEvent; +import org.openhab.core.items.events.ItemEvent; +import org.openhab.core.items.events.ItemStateChangedEvent; import org.openhab.core.library.CoreItemFactory; import org.openhab.core.model.sitemap.sitemap.Chart; import org.openhab.core.model.sitemap.sitemap.ColorArray; @@ -45,7 +48,7 @@ * @author Kai Kreuzer - Initial contribution * @author Laurent Garnier - Added support for icon color */ -public class PageChangeListener implements StateChangeListener { +public class PageChangeListener implements EventSubscriber { private static final int REVERT_INTERVAL = 300; private final ScheduledExecutorService scheduler = ThreadPoolManager @@ -55,6 +58,7 @@ public class PageChangeListener implements StateChangeListener { private final ItemUIRegistry itemUIRegistry; private EList widgets; private Set items; + private final HashSet filterItems = new HashSet<>(); private final List callbacks = Collections.synchronizedList(new ArrayList<>()); private Set distinctCallbacks = Collections.emptySet(); @@ -75,23 +79,10 @@ public PageChangeListener(String sitemapName, String pageId, ItemUIRegistry item } private void updateItemsAndWidgets(EList widgets) { - if (this.widgets != null) { - // cleanup statechange listeners in case widgets were removed - items = getAllItems(this.widgets); - for (Item item : items) { - if (item instanceof GenericItem genericItem) { - genericItem.removeStateChangeListener(this); - } - } - } - this.widgets = widgets; items = getAllItems(widgets); - for (Item item : items) { - if (item instanceof GenericItem genericItem) { - genericItem.addStateChangeListener(this); - } - } + filterItems.clear(); + filterItems.addAll(items.stream().map(Item::getName).collect(Collectors.toSet())); } public String getSitemapName() { @@ -113,19 +104,6 @@ public void removeCallback(SitemapSubscriptionCallback callback) { distinctCallbacks = new HashSet<>(callbacks); } - /** - * Disposes this instance and releases all resources. - */ - public void dispose() { - for (Item item : items) { - if (item instanceof GenericItem genericItem) { - genericItem.removeStateChangeListener(this); - } else if (item instanceof GroupItem groupItem) { - groupItem.removeStateChangeListener(this); - } - } - } - /** * Collects all items that are represented by a given list of widgets * @@ -182,26 +160,6 @@ private void constructAndSendEvents(Item item, State newState) { } } - @Override - public void stateChanged(Item item, State oldState, State newState) { - // For all items except group, send an event only when the event state is changed. - if (item instanceof GroupItem) { - return; - } - constructAndSendEvents(item, newState); - } - - @Override - public void stateUpdated(Item item, State state) { - // For group item only, send an event each time the event state is updated. - // It allows updating the group label while the group state is unchanged, - // for example the count in label for Group:Switch:OR - if (!(item instanceof GroupItem)) { - return; - } - constructAndSendEvents(item, state); - } - public void keepCurrentState(Item item) { scheduler.schedule(() -> { constructAndSendEvents(item, item.getState()); @@ -338,4 +296,24 @@ private Set constructSitemapEventsForUpdatedDescr(Item item, List< } return events; } + + @Override + public Set getSubscribedEventTypes() { + return Set.of(ItemStateChangedEvent.TYPE, GroupStateUpdatedEvent.TYPE); + } + + @Override + public void receive(Event event) { + if (event instanceof ItemEvent itemEvent && filterItems.contains(itemEvent.getItemName())) { + Item item = itemUIRegistry.get(itemEvent.getItemName()); + if (item == null) { + return; + } + if (event instanceof GroupStateUpdatedEvent groupStateUpdatedEvent) { + constructAndSendEvents(item, groupStateUpdatedEvent.getItemState()); + } else if (event instanceof ItemStateChangedEvent itemStateChangedEvent) { + constructAndSendEvents(item, itemStateChangedEvent.getItemState()); + } + } + } } diff --git a/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/SitemapResource.java b/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/SitemapResource.java index 84d3823ff44..2c92feed0f7 100644 --- a/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/SitemapResource.java +++ b/bundles/org.openhab.core.io.rest.sitemap/src/main/java/org/openhab/core/io/rest/sitemap/internal/SitemapResource.java @@ -21,10 +21,12 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.annotation.security.RolesAllowed; import javax.servlet.http.HttpServletRequest; @@ -56,6 +58,8 @@ import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.auth.Role; import org.openhab.core.common.ThreadPoolManager; +import org.openhab.core.events.Event; +import org.openhab.core.events.EventSubscriber; import org.openhab.core.io.rest.JSONResponse; import org.openhab.core.io.rest.LocaleService; import org.openhab.core.io.rest.RESTConstants; @@ -67,7 +71,8 @@ import org.openhab.core.items.GenericItem; import org.openhab.core.items.Item; import org.openhab.core.items.ItemNotFoundException; -import org.openhab.core.items.StateChangeListener; +import org.openhab.core.items.events.ItemEvent; +import org.openhab.core.items.events.ItemStateChangedEvent; import org.openhab.core.library.CoreItemFactory; import org.openhab.core.library.types.HSBType; import org.openhab.core.model.sitemap.SitemapProvider; @@ -127,7 +132,7 @@ * @author Laurent Garnier - Added support for icon color * @author Mark Herwege - Added pattern and unit fields */ -@Component(service = RESTResource.class) +@Component(service = { RESTResource.class, EventSubscriber.class }) @JaxrsResource @JaxrsName(SitemapResource.PATH_SITEMAPS) @JaxrsApplicationSelect("(" + JaxrsWhiteboardConstants.JAX_RS_NAME + "=" + RESTConstants.JAX_RS_NAME + ")") @@ -137,7 +142,7 @@ @Tag(name = SitemapResource.PATH_SITEMAPS) @NonNullByDefault public class SitemapResource - implements RESTResource, SitemapSubscriptionCallback, SseBroadcaster.Listener { + implements RESTResource, SitemapSubscriptionCallback, SseBroadcaster.Listener, EventSubscriber { private final Logger logger = LoggerFactory.getLogger(SitemapResource.class); @@ -177,6 +182,7 @@ public class SitemapResource .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON); private @Nullable ScheduledFuture cleanSubscriptionsJob; + private Set stateChangeListeners = new CopyOnWriteArraySet<>(); @Activate public SitemapResource( // @@ -686,13 +692,11 @@ private boolean blockUnlessChangeOccurs(String sitemapname, String pageId) { private boolean waitForChanges(EList widgets) { long startTime = (new Date()).getTime(); boolean timeout = false; - BlockingStateChangeListener listener = new BlockingStateChangeListener(); - // let's get all items for these widgets - Set items = getAllItems(widgets); - for (GenericItem item : items) { - item.addStateChangeListener(listener); - } - while (!listener.hasChangeOccurred() && !timeout) { + Set items = getAllItems(widgets).stream().map(Item::getName).collect(Collectors.toSet()); + BlockingStateChangeListener listener = new BlockingStateChangeListener(items); + stateChangeListeners.add(listener); + + while (!listener.hasChanged() && !timeout) { timeout = (new Date()).getTime() - startTime > TIMEOUT_IN_MS; try { Thread.sleep(300); @@ -701,9 +705,8 @@ private boolean waitForChanges(EList widgets) { break; } } - for (GenericItem item : items) { - item.removeStateChangeListener(listener); - } + + stateChangeListeners.remove(listener); return timeout; } @@ -782,34 +785,16 @@ private Set getItemsInColorCond(EList colorList) { return items; } - /** - * This is a state change listener, which is merely used to determine, if a - * state change has occurred on one of a list of items. - * - * @author Kai Kreuzer - Initial contribution - * - */ - private static class BlockingStateChangeListener implements StateChangeListener { - - private boolean changed = false; - - @Override - public void stateChanged(Item item, State oldState, State newState) { - changed = true; - } - - /** - * determines, whether a state change has occurred since its creation - * - * @return true, if a state has changed - */ - public boolean hasChangeOccurred() { - return changed; - } + @Override + public Set getSubscribedEventTypes() { + return Set.of(ItemStateChangedEvent.TYPE); + } - @Override - public void stateUpdated(Item item, State state) { - // ignore if the state did not change + @Override + public void receive(Event event) { + if (event instanceof ItemEvent itemEvent) { + String itemName = itemEvent.getItemName(); + stateChangeListeners.forEach(l -> l.itemChanged(itemName)); } } @@ -855,4 +840,23 @@ public void sseEventSinkRemoved(SseEventSink sink, SseSinkInfo info) { subscriptions.removeSubscription(info.subscriptionId); knownSubscriptions.remove(info.subscriptionId); } + + private static class BlockingStateChangeListener { + private final Set items; + private boolean changed = false; + + public BlockingStateChangeListener(Set items) { + this.items = items; + } + + public void itemChanged(String item) { + if (items.contains(item)) { + changed = true; + } + } + + public boolean hasChanged() { + return changed; + } + } } diff --git a/bundles/org.openhab.core.io.rest.sitemap/src/test/java/org/openhab/core/io/rest/sitemap/internal/SitemapResourceTest.java b/bundles/org.openhab.core.io.rest.sitemap/src/test/java/org/openhab/core/io/rest/sitemap/internal/SitemapResourceTest.java index fe384aeabb0..b8a0b0da6ab 100644 --- a/bundles/org.openhab.core.io.rest.sitemap/src/test/java/org/openhab/core/io/rest/sitemap/internal/SitemapResourceTest.java +++ b/bundles/org.openhab.core.io.rest.sitemap/src/test/java/org/openhab/core/io/rest/sitemap/internal/SitemapResourceTest.java @@ -18,7 +18,6 @@ import static org.hamcrest.collection.IsEmptyCollection.empty; import static org.mockito.Mockito.*; -import java.math.BigDecimal; import java.util.Collection; import java.util.List; import java.util.Locale; @@ -45,7 +44,7 @@ import org.openhab.core.io.rest.sitemap.SitemapSubscriptionService; import org.openhab.core.items.GenericItem; import org.openhab.core.items.ItemNotFoundException; -import org.openhab.core.library.types.DecimalType; +import org.openhab.core.items.events.ItemEvent; import org.openhab.core.library.types.OnOffType; import org.openhab.core.library.types.PercentType; import org.openhab.core.model.sitemap.SitemapProvider; @@ -158,11 +157,13 @@ public void whenSitemapsAreProvidedShouldReturnSitemapBeans() { @Test public void whenLongPollingShouldObserveItems() { + ItemEvent itemEvent = mock(ItemEvent.class); + when(itemEvent.getItemName()).thenReturn(item.getName()); new Thread(() -> { try { Thread.sleep(STATE_UPDATE_WAIT_TIME); // wait for the #getPageData call and listeners to attach to the // item - item.setState(PercentType.ZERO); + sitemapResource.receive(itemEvent); } catch (InterruptedException e) { } }).start(); @@ -180,11 +181,13 @@ public void whenLongPollingShouldObserveItems() { @Test public void whenLongPollingShouldObserveItemsFromVisibilityRules() { + ItemEvent itemEvent = mock(ItemEvent.class); + when(itemEvent.getItemName()).thenReturn(visibilityRuleItem.getName()); new Thread(() -> { try { Thread.sleep(STATE_UPDATE_WAIT_TIME); // wait for the #getPageData call and listeners to attach to the // item - visibilityRuleItem.setState(new DecimalType(BigDecimal.ONE)); + sitemapResource.receive(itemEvent); } catch (InterruptedException e) { } }).start(); @@ -202,11 +205,13 @@ public void whenLongPollingShouldObserveItemsFromVisibilityRules() { @Test public void whenLongPollingShouldObserveItemsFromLabelColorConditions() { + ItemEvent itemEvent = mock(ItemEvent.class); + when(itemEvent.getItemName()).thenReturn(labelColorItem.getName()); new Thread(() -> { try { Thread.sleep(STATE_UPDATE_WAIT_TIME); // wait for the #getPageData call and listeners to attach to the // item - labelColorItem.setState(new DecimalType(BigDecimal.ONE)); + sitemapResource.receive(itemEvent); } catch (InterruptedException e) { } }).start(); @@ -224,11 +229,13 @@ public void whenLongPollingShouldObserveItemsFromLabelColorConditions() { @Test public void whenLongPollingShouldObserveItemsFromValueColorConditions() { + ItemEvent itemEvent = mock(ItemEvent.class); + when(itemEvent.getItemName()).thenReturn(valueColorItem.getName()); new Thread(() -> { try { Thread.sleep(STATE_UPDATE_WAIT_TIME); // wait for the #getPageData call and listeners to attach to the // item - valueColorItem.setState(new DecimalType(BigDecimal.ONE)); + sitemapResource.receive(itemEvent); } catch (InterruptedException e) { } }).start(); diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java index 4217bec0a24..aa8cbd624e3 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java @@ -13,14 +13,18 @@ package org.openhab.core.internal.events; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -47,9 +51,7 @@ public class EventHandler implements AutoCloseable { private final Map> typedEventSubscribers; private final Map typedEventFactories; - private final ScheduledExecutorService watcher = Executors - .newSingleThreadScheduledExecutor(new NamedThreadFactory("eventwatcher")); - private final ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("eventexecutor")); + private final Map, ExecutorRecord> executors = new HashMap<>(); /** * Create a new event handler. @@ -63,10 +65,19 @@ public EventHandler(final Map> typedEventSubscriber this.typedEventFactories = typedEventFactories; } + private synchronized ExecutorRecord createExecutorRecord(Class subscriber) { + return new ExecutorRecord( + Executors.newSingleThreadExecutor(new NamedThreadFactory("eventexecutor-" + executors.size())), + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("eventwatcher-" + executors.size())), + new AtomicInteger()); + } + @Override public void close() { - watcher.shutdownNow(); - executor.shutdownNow(); + executors.values().forEach(r -> { + r.executor.shutdownNow(); + r.watcher.shutdownNow(); + }); } public void handleEvent(org.osgi.service.event.Event osgiEvent) { @@ -140,8 +151,16 @@ private synchronized void dispatchEvent(final Set eventSubscrib EventFilter filter = eventSubscriber.getEventFilter(); if (filter == null || filter.apply(event)) { logger.trace("Delegate event to subscriber ({}).", eventSubscriber.getClass()); - executor.submit(() -> { - ScheduledFuture logTimeout = watcher.schedule( + ExecutorRecord executorRecord = Objects.requireNonNull( + executors.computeIfAbsent(eventSubscriber.getClass(), this::createExecutorRecord)); + int queueSize = executorRecord.count().incrementAndGet(); + if (queueSize > 1000) { + logger.warn( + "The queue for a subscriber of type '{}' exceeds 1000 elements. System may be unstable.", + eventSubscriber.getClass()); + } + CompletableFuture.runAsync(() -> { + ScheduledFuture logTimeout = executorRecord.watcher().schedule( () -> logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.", eventSubscriber, EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS), EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS, TimeUnit.MILLISECONDS); @@ -152,10 +171,13 @@ private synchronized void dispatchEvent(final Set eventSubscrib EventSubscriber.class.getName(), ex.getMessage(), ex); } logTimeout.cancel(false); - }); + }, executorRecord.executor()).thenRun(executorRecord.count::decrementAndGet); } else { logger.trace("Skip event subscriber ({}) because of its filter.", eventSubscriber.getClass()); } } } + + private record ExecutorRecord(ExecutorService executor, ScheduledExecutorService watcher, AtomicInteger count) { + } }