Skip to content

Commit

Permalink
Improve performance for state update handling (#3635)
Browse files Browse the repository at this point in the history
* Improve threading in EventHandler
* refactor pagechangelistener to event
* One executor per subscriber type, not per subscriber

Signed-off-by: Jan N. Klug <[email protected]>
  • Loading branch information
J-N-K authored Jul 2, 2023
1 parent 6b91416 commit a656073
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.openhab.core.model.sitemap.sitemap.Widget;
import org.openhab.core.thing.events.ChannelDescriptionChangedEvent;
import org.openhab.core.ui.items.ItemUIRegistry;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class SitemapSubscriptionService implements ModelRepositoryChangeListener
private static final int DEFAULT_MAX_SUBSCRIPTIONS = 50;

private final Logger logger = LoggerFactory.getLogger(SitemapSubscriptionService.class);
private final BundleContext bundleContext;

public interface SitemapSubscriptionCallback {

Expand All @@ -94,25 +97,25 @@ public interface SitemapSubscriptionCallback {
private final Map<String, Instant> creationInstants = new ConcurrentHashMap<>();

/* sitemap+page -> listener */
private final Map<String, PageChangeListener> pageChangeListeners = new ConcurrentHashMap<>();
private final Map<String, ListenerRecord> pageChangeListeners = new ConcurrentHashMap<>();

/* Max number of subscriptions at the same time */
private int maxSubscriptions = DEFAULT_MAX_SUBSCRIPTIONS;

@Activate
public SitemapSubscriptionService(Map<String, Object> config, final @Reference ItemUIRegistry itemUIRegistry) {
applyConfig(config);
public SitemapSubscriptionService(Map<String, Object> config, final @Reference ItemUIRegistry itemUIRegistry,
BundleContext bundleContext) {
this.itemUIRegistry = itemUIRegistry;
this.bundleContext = bundleContext;
applyConfig(config);
}

@Deactivate
protected void deactivate() {
pageOfSubscription.clear();
callbacks.clear();
creationInstants.clear();
for (PageChangeListener listener : pageChangeListeners.values()) {
listener.dispose();
}
pageChangeListeners.values().forEach(l -> l.serviceRegistration.unregister());
pageChangeListeners.clear();
}

Expand Down Expand Up @@ -150,7 +153,7 @@ protected void removeSitemapProvider(SitemapProvider provider) {
* Creates a new subscription with the given id.
*
* @param callback an instance that should receive the events
* @returns a unique id that identifies the subscription or null if the limit of subscriptions is already reached
* @return a unique id that identifies the subscription or null if the limit of subscriptions is already reached
*/
public @Nullable String createSubscription(SitemapSubscriptionCallback callback) {
if (maxSubscriptions >= 0 && callbacks.size() >= maxSubscriptions) {
Expand All @@ -176,9 +179,9 @@ public void removeSubscription(String subscriptionId) {
String sitemapPage = pageOfSubscription.remove(subscriptionId);
if (sitemapPage != null && !pageOfSubscription.values().contains(sitemapPage)) {
// this was the only subscription listening on this page, so we can dispose the listener
PageChangeListener listener = pageChangeListeners.remove(sitemapPage);
ListenerRecord listener = pageChangeListeners.remove(sitemapPage);
if (listener != null) {
listener.dispose();
listener.serviceRegistration().unregister();
}
}
logger.debug("Removed subscription with id {} ({} active subscriptions)", subscriptionId, callbacks.size());
Expand Down Expand Up @@ -249,13 +252,14 @@ public void setPageId(String subscriptionId, String sitemapName, String pageId)
}

private void addCallbackToListener(String sitemapName, String pageId, SitemapSubscriptionCallback callback) {
PageChangeListener listener = pageChangeListeners.get(getValue(sitemapName, pageId));
if (listener == null) {
// there is no listener for this page yet, so let's try to create one
listener = new PageChangeListener(sitemapName, pageId, itemUIRegistry, collectWidgets(sitemapName, pageId));
pageChangeListeners.put(getValue(sitemapName, pageId), listener);
}
listener.addCallback(callback);
ListenerRecord listener = pageChangeListeners.computeIfAbsent(getValue(sitemapName, pageId), v -> {
PageChangeListener newListener = new PageChangeListener(sitemapName, pageId, itemUIRegistry,
collectWidgets(sitemapName, pageId));
ServiceRegistration<?> registration = bundleContext.registerService(EventSubscriber.class.getName(),
newListener, null);
return new ListenerRecord(newListener, registration);
});
listener.pageChangeListener().addCallback(callback);
}

private EList<Widget> collectWidgets(String sitemapName, String pageId) {
Expand All @@ -278,12 +282,12 @@ private EList<Widget> collectWidgets(String sitemapName, String pageId) {
}

private void removeCallbackFromListener(String sitemapPage, SitemapSubscriptionCallback callback) {
PageChangeListener oldListener = pageChangeListeners.get(sitemapPage);
ListenerRecord oldListener = pageChangeListeners.get(sitemapPage);
if (oldListener != null) {
oldListener.removeCallback(callback);
if (!pageOfSubscription.values().contains(sitemapPage)) {
oldListener.pageChangeListener().removeCallback(callback);
if (!pageOfSubscription.containsValue(sitemapPage)) {
// no other callbacks are left here, so we can safely dispose the listener
oldListener.dispose();
oldListener.serviceRegistration().unregister();
pageChangeListeners.remove(sitemapPage);
}
}
Expand Down Expand Up @@ -311,14 +315,14 @@ public void modelChanged(String modelName, EventType type) {

String changedSitemapName = modelName.substring(0, modelName.length() - SITEMAP_SUFFIX.length());

for (Entry<String, PageChangeListener> listenerEntry : pageChangeListeners.entrySet()) {
for (Entry<String, ListenerRecord> listenerEntry : pageChangeListeners.entrySet()) {
String sitemapWithPage = listenerEntry.getKey();
String sitemapName = extractSitemapName(sitemapWithPage);
String pageId = extractPageId(sitemapWithPage);

if (sitemapName.equals(changedSitemapName)) {
EList<Widget> widgets = collectWidgets(sitemapName, pageId);
listenerEntry.getValue().sitemapContentChanged(widgets);
listenerEntry.getValue().pageChangeListener().sitemapContentChanged(widgets);
}
}
}
Expand All @@ -336,9 +340,7 @@ public void checkAliveClients() {
}
}
// Send an ALIVE event to all subscribers to trigger an exception for dead subscribers
for (Entry<String, PageChangeListener> listenerEntry : pageChangeListeners.entrySet()) {
listenerEntry.getValue().sendAliveEvent();
}
pageChangeListeners.values().forEach(l -> l.pageChangeListener().sendAliveEvent());
}

@Override
Expand All @@ -355,19 +357,22 @@ public void receive(Event event) {
// members and predictions aren't really possible in that case (or at least would be highly complex).
return;
}
for (PageChangeListener pageChangeListener : pageChangeListeners.values()) {
for (ListenerRecord listener : pageChangeListeners.values()) {
if (prediction.isConfirmation()) {
pageChangeListener.keepCurrentState(item);
listener.pageChangeListener().keepCurrentState(item);
} else {
pageChangeListener.changeStateTo(item, prediction.getPredictedState());
listener.pageChangeListener().changeStateTo(item, prediction.getPredictedState());
}
}
} else if (event instanceof ChannelDescriptionChangedEvent channelDescriptionChangedEvent) {
channelDescriptionChangedEvent.getLinkedItemNames().forEach(itemName -> {
for (PageChangeListener pageChangeListener : pageChangeListeners.values()) {
pageChangeListener.descriptionChanged(itemName);
for (ListenerRecord listener : pageChangeListeners.values()) {
listener.pageChangeListener().descriptionChanged(itemName);
}
});
}
}

private record ListenerRecord(PageChangeListener pageChangeListener, ServiceRegistration<?> serviceRegistration) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.eclipse.emf.common.util.EList;
import org.openhab.core.common.ThreadPoolManager;
import org.openhab.core.events.Event;
import org.openhab.core.events.EventSubscriber;
import org.openhab.core.io.rest.core.item.EnrichedItemDTOMapper;
import org.openhab.core.io.rest.sitemap.SitemapSubscriptionService.SitemapSubscriptionCallback;
import org.openhab.core.items.GenericItem;
import org.openhab.core.items.GroupItem;
import org.openhab.core.items.Item;
import org.openhab.core.items.ItemNotFoundException;
import org.openhab.core.items.StateChangeListener;
import org.openhab.core.items.events.GroupStateUpdatedEvent;
import org.openhab.core.items.events.ItemEvent;
import org.openhab.core.items.events.ItemStateChangedEvent;
import org.openhab.core.library.CoreItemFactory;
import org.openhab.core.model.sitemap.sitemap.Chart;
import org.openhab.core.model.sitemap.sitemap.ColorArray;
Expand All @@ -45,7 +48,7 @@
* @author Kai Kreuzer - Initial contribution
* @author Laurent Garnier - Added support for icon color
*/
public class PageChangeListener implements StateChangeListener {
public class PageChangeListener implements EventSubscriber {

private static final int REVERT_INTERVAL = 300;
private final ScheduledExecutorService scheduler = ThreadPoolManager
Expand All @@ -55,6 +58,7 @@ public class PageChangeListener implements StateChangeListener {
private final ItemUIRegistry itemUIRegistry;
private EList<Widget> widgets;
private Set<Item> items;
private final HashSet<String> filterItems = new HashSet<>();
private final List<SitemapSubscriptionCallback> callbacks = Collections.synchronizedList(new ArrayList<>());
private Set<SitemapSubscriptionCallback> distinctCallbacks = Collections.emptySet();

Expand All @@ -75,23 +79,10 @@ public PageChangeListener(String sitemapName, String pageId, ItemUIRegistry item
}

private void updateItemsAndWidgets(EList<Widget> widgets) {
if (this.widgets != null) {
// cleanup statechange listeners in case widgets were removed
items = getAllItems(this.widgets);
for (Item item : items) {
if (item instanceof GenericItem genericItem) {
genericItem.removeStateChangeListener(this);
}
}
}

this.widgets = widgets;
items = getAllItems(widgets);
for (Item item : items) {
if (item instanceof GenericItem genericItem) {
genericItem.addStateChangeListener(this);
}
}
filterItems.clear();
filterItems.addAll(items.stream().map(Item::getName).collect(Collectors.toSet()));
}

public String getSitemapName() {
Expand All @@ -113,19 +104,6 @@ public void removeCallback(SitemapSubscriptionCallback callback) {
distinctCallbacks = new HashSet<>(callbacks);
}

/**
* Disposes this instance and releases all resources.
*/
public void dispose() {
for (Item item : items) {
if (item instanceof GenericItem genericItem) {
genericItem.removeStateChangeListener(this);
} else if (item instanceof GroupItem groupItem) {
groupItem.removeStateChangeListener(this);
}
}
}

/**
* Collects all items that are represented by a given list of widgets
*
Expand Down Expand Up @@ -182,26 +160,6 @@ private void constructAndSendEvents(Item item, State newState) {
}
}

@Override
public void stateChanged(Item item, State oldState, State newState) {
// For all items except group, send an event only when the event state is changed.
if (item instanceof GroupItem) {
return;
}
constructAndSendEvents(item, newState);
}

@Override
public void stateUpdated(Item item, State state) {
// For group item only, send an event each time the event state is updated.
// It allows updating the group label while the group state is unchanged,
// for example the count in label for Group:Switch:OR
if (!(item instanceof GroupItem)) {
return;
}
constructAndSendEvents(item, state);
}

public void keepCurrentState(Item item) {
scheduler.schedule(() -> {
constructAndSendEvents(item, item.getState());
Expand Down Expand Up @@ -338,4 +296,24 @@ private Set<SitemapEvent> constructSitemapEventsForUpdatedDescr(Item item, List<
}
return events;
}

@Override
public Set<String> getSubscribedEventTypes() {
return Set.of(ItemStateChangedEvent.TYPE, GroupStateUpdatedEvent.TYPE);
}

@Override
public void receive(Event event) {
if (event instanceof ItemEvent itemEvent && filterItems.contains(itemEvent.getItemName())) {
Item item = itemUIRegistry.get(itemEvent.getItemName());
if (item == null) {
return;
}
if (event instanceof GroupStateUpdatedEvent groupStateUpdatedEvent) {
constructAndSendEvents(item, groupStateUpdatedEvent.getItemState());
} else if (event instanceof ItemStateChangedEvent itemStateChangedEvent) {
constructAndSendEvents(item, itemStateChangedEvent.getItemState());
}
}
}
}
Loading

0 comments on commit a656073

Please sign in to comment.