From 957fcedc8b54c3ac334c52ca531595ef1befe95e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Hohwiller?= Date: Wed, 4 Oct 2023 22:59:58 +0200 Subject: [PATCH] added EventBus --- core/pom.xml | 12 + .../io/github/mmm/event/AbstractEventBus.java | 266 ++++++++++++++++ .../github/mmm/event/AbstractEventSender.java | 73 +++++ .../github/mmm/event/AbstractEventSource.java | 77 +++-- .../java/io/github/mmm/event/EventBus.java | 87 ++++++ .../io/github/mmm/event/EventBusAccess.java | 26 ++ .../github/mmm/event/EventSourceAdapter.java | 87 ++---- .../github/mmm/event/impl/EventBusImpl.java | 47 +++ .../mmm/event/impl/WeakEventListener.java | 6 +- core/src/main/java/module-info.java | 30 +- .../mmm/event/impl/EventBusImplTest.java | 74 +++++ .../github/mmm/event/impl/EventBusTest.java | 287 ++++++++++++++++++ pom.xml | 8 + 13 files changed, 990 insertions(+), 90 deletions(-) create mode 100644 core/src/main/java/io/github/mmm/event/AbstractEventBus.java create mode 100644 core/src/main/java/io/github/mmm/event/AbstractEventSender.java create mode 100644 core/src/main/java/io/github/mmm/event/EventBus.java create mode 100644 core/src/main/java/io/github/mmm/event/EventBusAccess.java create mode 100644 core/src/main/java/io/github/mmm/event/impl/EventBusImpl.java create mode 100644 core/src/test/java/io/github/mmm/event/impl/EventBusImplTest.java create mode 100644 core/src/test/java/io/github/mmm/event/impl/EventBusTest.java diff --git a/core/pom.xml b/core/pom.xml index 08e4138..2afce96 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -11,4 +11,16 @@ jar ${project.artifactId} Java module providing the event pattern as generic reusable API and implementation. + + + ${project.groupId} + mmm-base + + + diff --git a/core/src/main/java/io/github/mmm/event/AbstractEventBus.java b/core/src/main/java/io/github/mmm/event/AbstractEventBus.java new file mode 100644 index 0000000..7d1ee24 --- /dev/null +++ b/core/src/main/java/io/github/mmm/event/AbstractEventBus.java @@ -0,0 +1,266 @@ +/* Copyright (c) The m-m-m Team, Licensed under the Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 */ +package io.github.mmm.event; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +import io.github.mmm.base.exception.GlobalExceptionHandler; + +/** + * This is the default implementation of {@link EventBus}. + */ +public abstract class AbstractEventBus implements EventBus { + + // private static final Logger LOG = LoggerFactory.getLogger(AbstractEventBus.class); + + @SuppressWarnings("rawtypes") + private final Map, EventDispatcher> eventType2dispatcherMap; + + private final Queue eventQueue; + + /** The {@link GlobalExceptionHandler}. */ + protected final GlobalExceptionHandler errorHandler; + + /** + * The constructor. + */ + public AbstractEventBus() { + + this(null); + } + + /** + * The constructor. + * + * @param errorHandler the {@link GlobalExceptionHandler} instance. + */ + protected AbstractEventBus(GlobalExceptionHandler errorHandler) { + + super(); + this.eventType2dispatcherMap = new ConcurrentHashMap<>(); + this.eventQueue = new ConcurrentLinkedQueue<>(); + if (errorHandler == null) { + this.errorHandler = io.github.mmm.base.exception.GlobalExceptionHandlerAccess.get(); + } else { + this.errorHandler = errorHandler; + } + } + + @Override + public void sendEvent(Object event) { + + Objects.requireNonNull(event); + this.eventQueue.add(event); + triggerDispatchEvents(); + } + + /** + * Called from {@link #sendEvent(Object)} to ensure {@link #dispatchEvents()} is triggered. This can be done + * synchronous or asynchronous. + */ + protected abstract void triggerDispatchEvents(); + + /** + * Dispatches all events in the event queue. + */ + protected void dispatchEvents() { + + while (true) { + Object event = this.eventQueue.poll(); + if (event == null) { + return; + } + dispatchEvent(event); + } + } + + /** + * Dispatches the given event. + * + * @param is the generic type of {@code event}. + * @param event is the event to dispatch. + */ + protected void dispatchEvent(E event) { + + @SuppressWarnings("unchecked") + Class eventType = (Class) event.getClass(); + EventDispatcher eventDispatcher = getEventDispatcherOrNull(eventType); + boolean dispatched = false; + if (eventDispatcher != null) { + dispatched = eventDispatcher.fireEvent(event); + } + if (!dispatched) { + handleUndispatchedEvent(event); + } + } + + /** + * Called if an event was {@link #sendEvent(Object) send} but not dispatched to any + * {@link #addListener(Class, EventListener) registered listener}. + * + * @param event is the un-dispatched event. + */ + protected void handleUndispatchedEvent(Object event) { + + // LOG.warn("Event send with no responsible listener registered: {}", event); + } + + /** + * Gets or creates the {@link EventDispatcher} for the given {@code eventType}. + * + * @param is the generic type of {@code eventType}. + * @param eventType is the {@link Class} reflecting the event. + * @return the {@link EventDispatcher} responsible for the given {@code eventType}. + */ + @SuppressWarnings("unchecked") + protected EventDispatcher getEventDispatcherRequired(Class eventType) { + + EventDispatcher dispatcher = this.eventType2dispatcherMap.get(eventType); + if (dispatcher == null) { + Class type = eventType.getSuperclass(); + EventDispatcher parent; + if (type != null) { + parent = getEventDispatcherRequired(type); + } else { + parent = null; + } + dispatcher = this.eventType2dispatcherMap.computeIfAbsent(eventType, t -> new EventDispatcher<>(parent)); + } + return (EventDispatcher) dispatcher; + } + + /** + * Gets the most specific {@link EventDispatcher} responsible the given {@code eventType}. + * + * @param is the generic type of {@code eventType}. + * @param eventType is the {@link Class} reflecting the event. + * @return the most specific {@link EventDispatcher} responsible for the given {@code eventType}. May be {@code null} + * if no {@link EventListener} is {@link #addListener(Class, EventListener) registered} for a compatible + * {@code eventType}. + */ + @SuppressWarnings("unchecked") + protected EventDispatcher getEventDispatcherOrNull(Class eventType) { + + Class type = eventType; + EventDispatcher dispatcher = this.eventType2dispatcherMap.get(eventType); + while ((dispatcher == null) && (type != null)) { + type = type.getSuperclass(); + dispatcher = this.eventType2dispatcherMap.get(type); + } + return (EventDispatcher) dispatcher; + } + + @Override + public void addListener(Class eventType, EventListener listener) { + + Objects.requireNonNull(eventType); + Objects.requireNonNull(listener); + if (eventType.isInterface()) { + throw new UnsupportedOperationException( + "This EventBus implementation does not support interfaces as event type: " + eventType.getName()); + } + EventDispatcher eventDispatcher = getEventDispatcherRequired(eventType); + eventDispatcher.addListener(listener); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public boolean removeListener(Class eventType, EventListener listener) { + + boolean removed = false; + if (eventType == null) { + for (EventDispatcher dispatcher : this.eventType2dispatcherMap.values()) { + boolean currentRemoved = dispatcher.removeListener((EventListener) listener); + if (currentRemoved) { + removed = true; + } + } + } else { + EventDispatcher dispatcher = this.eventType2dispatcherMap.get(eventType); + if (dispatcher != null) { + return dispatcher.removeListener(listener); + } + } + return removed; + } + + /** + * A dispatcher for all {@link EventListener}s of a particular {@link EventBus#addListener(Class, EventListener) event + * type}. + * + * @param type of the {@link EventListener#onEvent(Object) events}. + */ + protected class EventDispatcher extends AbstractEventSource> { + + /** @see #onEvent(Object, Collection) */ + private final EventDispatcher parentDispatcher; + + /** @see #fireEvent(Object, Collection) */ + private final Collection> listeners; + + /** + * The constructor. + * + * @param parent is the {@link EventDispatcher} responsible for the super-class or {@code null} if this is the root + * {@link EventDispatcher} responsible for {@link Object}. + */ + public EventDispatcher(EventDispatcher parent) { + + this(parent, new ConcurrentLinkedQueue<>()); + } + + /** + * The constructor. + * + * @param parent is the {@link EventDispatcher} responsible for the super-class or {@code null} if this is the root + * {@link EventDispatcher} responsible for {@link Object}. + * @param listeners the empty {@link Collection} implementation for the {@link EventListener}s. + */ + protected EventDispatcher(EventDispatcher parent, Collection> listeners) { + + super(); + this.parentDispatcher = parent; + this.listeners = listeners; + } + + @Override + protected void doAddListener(EventListener listener) { + + this.listeners.add(listener); + } + + @Override + public boolean removeListener(EventListener listener) { + + return this.listeners.remove(listener); + } + + @Override + protected boolean fireEvent(E event) { + + boolean dispatched = false; + for (EventListener listener : this.listeners) { + try { + listener.onEvent(event); + dispatched = true; + } catch (Throwable exception) { + AbstractEventBus.this.errorHandler.handleError(event, exception); + } + } + if (this.parentDispatcher != null) { + boolean superDispatched = this.parentDispatcher.fireEvent(event); + if (superDispatched) { + dispatched = true; + } + } + return dispatched; + } + + } + +} \ No newline at end of file diff --git a/core/src/main/java/io/github/mmm/event/AbstractEventSender.java b/core/src/main/java/io/github/mmm/event/AbstractEventSender.java new file mode 100644 index 0000000..8d55ad7 --- /dev/null +++ b/core/src/main/java/io/github/mmm/event/AbstractEventSender.java @@ -0,0 +1,73 @@ +/* Copyright (c) The m-m-m Team, Licensed under the Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 */ +package io.github.mmm.event; + +/** + * Implementation of {@link EventSource}. + * + * @param the type of the events to {@link EventListener#onEvent(Object) send}. + * @param the type of the {@link EventListener listeners}. + * @since 1.0.0 + */ +public abstract class AbstractEventSender > + extends AbstractEventSource { + + private EventSourceAdapter eventAdapter; + + /** + * The constructor. + */ + public AbstractEventSender() { + + super(); + this.eventAdapter = EventSourceAdapter.empty(); + } + + @Override + protected void doAddListener(EventListener listener) { + + this.eventAdapter = this.eventAdapter.addListener(listener); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public boolean removeListener(L listener) { + + EventSourceAdapter adapter = this.eventAdapter.removeListener((EventListener) listener); + if (adapter == null) { + return false; + } + this.eventAdapter = adapter; + return true; + } + + /** + * @return {@code true} if at least one {@link EventListener} is {@link #addListener(EventListener, boolean) + * registered}, {@code false} otherwise. + */ + protected boolean hasListeners() { + + return this.eventAdapter.hasListeners(); + } + + /** + * @return the {@link EventSourceAdapter}. + */ + protected EventSourceAdapter getEventAdapter() { + + return this.eventAdapter; + } + + /** + * @param event the event to {@link EventListener#onEvent(Object) send} to all {@link #addListener(EventListener) + * registered} {@link EventListener}s. + * @return {@code true} if the event has actually been dispatched, {@code false} otherwise (no listener was + * {@link #addListener(EventListener) registered} for the event). + */ + @Override + protected boolean fireEvent(E event) { + + return this.eventAdapter.fireEvent(event); + } + +} diff --git a/core/src/main/java/io/github/mmm/event/AbstractEventSource.java b/core/src/main/java/io/github/mmm/event/AbstractEventSource.java index af0c965..7b8ee87 100644 --- a/core/src/main/java/io/github/mmm/event/AbstractEventSource.java +++ b/core/src/main/java/io/github/mmm/event/AbstractEventSource.java @@ -2,6 +2,8 @@ * http://www.apache.org/licenses/LICENSE-2.0 */ package io.github.mmm.event; +import io.github.mmm.event.impl.WeakEventListener; + /** * Implementation of {@link EventSource}. * @@ -11,60 +13,79 @@ */ public abstract class AbstractEventSource > implements EventSource { - private EventSourceAdapter eventAdapter; - /** * The constructor. */ public AbstractEventSource() { super(); - this.eventAdapter = EventSourceAdapter.empty(); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void addListener(L listener, boolean weak) { + /** + * @param listener the {@link EventListener} to wrap (potentially). + * @param weak - {@code true} to wrap as {@link WeakEventListener}, {@code false} otherwise. + * @return the given {@link EventListener} or a {@link WeakEventListener} wrapping it in case {@code weak} is + * {@code true}. + * @see EventSource#addListener(EventListener, boolean) + */ + protected final EventListener wrap(EventListener listener, boolean weak) { - this.eventAdapter = this.eventAdapter.addListener((EventListener) listener, weak); + if (weak) { + return new WeakEventListener<>(this, listener); + } + return listener; } - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public boolean removeListener(L listener) { + /** + * @param type of the event. + * @param listener the {@link EventListener} to unwrap (potentially). + * @return the given {@link EventListener} or the unwrapped {@link EventListener} if a {@link WeakEventListener} was + * given. + */ + protected static EventListener unwrap(EventListener listener) { - EventSourceAdapter adapter = this.eventAdapter.removeListener((EventListener) listener); - if (adapter == null) { - return false; + if (listener == null) { + return null; + } else if (listener instanceof WeakEventListener) { + return ((WeakEventListener) listener).ref.get(); } - this.eventAdapter = adapter; - return true; + return listener; } /** - * @return {@code true} if at least one {@link EventListener} is {@link #addListener(EventListener, boolean) - * registered}, {@code false} otherwise. + * @param listener2remove the {@link EventListener} to {@link #removeListener(EventListener) remove}. + * @param registeredListener the {@link #addListener(EventListener) registered} {@link EventListener} to match with. + * @return {@code true} if the given {@link EventListener}s match, {@code false} otherwise. */ - protected boolean hasListeners() { + protected static boolean matches(EventListener listener2remove, EventListener registeredListener) { - return this.eventAdapter.hasListeners(); + registeredListener = AbstractEventSource.unwrap(registeredListener); + if (listener2remove == registeredListener) { + return true; + } else if ((registeredListener.isMatchedUsingEquals()) && registeredListener.equals(listener2remove)) { + return true; + } + return false; } - /** - * @return the {@link EventSourceAdapter}. - */ - protected EventSourceAdapter getEventAdapter() { + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void addListener(L listener, boolean weak) { - return this.eventAdapter; + doAddListener(wrap((EventListener) listener, weak)); } + /** + * @param listener the {@link EventListener} to add. + */ + protected abstract void doAddListener(EventListener listener); + /** * @param event the event to {@link EventListener#onEvent(Object) send} to all {@link #addListener(EventListener) * registered} {@link EventListener}s. + * @return {@code true} if the event has actually been dispatched, {@code false} otherwise (no listener was + * {@link #addListener(EventListener) registered} for the event). */ - protected void fireEvent(E event) { - - this.eventAdapter.fireEvent(event); - } + protected abstract boolean fireEvent(E event); } diff --git a/core/src/main/java/io/github/mmm/event/EventBus.java b/core/src/main/java/io/github/mmm/event/EventBus.java new file mode 100644 index 0000000..9791b24 --- /dev/null +++ b/core/src/main/java/io/github/mmm/event/EventBus.java @@ -0,0 +1,87 @@ +/* Copyright (c) The m-m-m Team, Licensed under the Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 */ +package io.github.mmm.event; + +/** + * This is the interface for an event bus. An event bus is a central place for {@link #sendEvent(Object) sending}, + * {@link #addListener(Class, EventListener) listening} to and {@link EventListener#onEvent(Object) receiving} events. + * The {@link EventBus} allows to communicate between loosely coupled components in a smart and efficient way: + *
    + *
  • A component sending events only needs to know the {@link EventBus} but not the receivers of the events.
  • + *
  • A component receiving events only needs to know the {@link EventBus} but not the sender of the events.
  • + *
+ * This way components can communicate via events without compile time dependency between them. All they need to see is + * the {@link EventBus} and the event itself.
+ * ATTENTION:
+ * This interface is designed as a stable API and standard for an event bus. However, there can be various + * implementations of this interface with different aspects regarding concurrency, polymorphism, performance, etc. While + * this interface will remain stable, we might change internals of the {@link EventBus} implementation. Further, you may + * want to choose a different implementation of {@link EventBus} when you are inside a front-end application (UI) or a + * back-end application (server). Therefore, it is possible to provide your own implementation of {@link EventBus} as a + * Java module via {@link java.util.ServiceLoader}.
+ * NOTE:
+ * The loose coupling makes flows less easy to see, understand and debug. You should only consider this approach for + * components that should be decoupled by design. Do not get confused by the beauty of the event-bus pattern and avoid + * using it where straight method calls should be preferred.
+ * E.g. if you have a user-interface with a navigation sub-dialog and various other dialogs they should communicate via + * {@link EventBus} to update their views accordingly. However, a business component responsible to read and write + * addresses may get the requirement that in case of a change of an address some logic from the domain of another + * component should be invoked and that might even reject the change. In the latter case {@link EventBus} is the wrong + * choice. + * + * @see EventBusAccess#get() + */ +public interface EventBus { + + /** + * This method sends an event to all {@link #addListener(Class, EventListener) suitable registered listeners}. + * + * @param event is the event to send. Technically such event may be any Object such as a {@link String}. However, it + * is strongly recommended to create explicit value classes named with the suffix "Event". The easiest way to + * create your own event type is to use a Java {@link Record}. Please note that it may seem more easy to use + * data-objects directly as event, e.g. for the selection of a user, you may send the user object itself as + * event. However, later you may notice that you also need to send an event if a user is deleted or created and + * your design and semantic of events will be flawed. Therefore it is strongly to create a + * {@code UserSelectionEvent} {@link Class} or {@link Record} containing the selected user or its unique + * identifier in that example from the beginning to ensure a design for extension and flexibility. + */ + void sendEvent(Object event); + + /** + * This method registers a listener that is interested in events. + * + * @param is the type of the events to listen to. + * @param eventType is the {@link Class} reflecting the events to listen to. Typically this should be the exact type + * of some event sent via {@link #sendEvent(Object)}. However, polymorphic implementations of {@link EventBus} + * will also support event inheritance and allow you to register an {@link EventListener} for a + * {@link Class#getSuperclass() super-class} of an event type. + * @param listener is the {@link EventListener} that shall be {@link EventListener#onEvent(Object) notified} if an + * event of the given {@link Class} is {@link #sendEvent(Object) send}. + */ + void addListener(Class eventType, EventListener listener); + + /** + * This method removes a listener. If the listener was not {@link #addListener(Class, EventListener) registered} + * before this method will have no effect. + * + * @param is the type of the events to listen to. + * @param eventType is the {@link Class} reflecting the events to listen to. + * @param listener is the {@link EventListener} to remove. + * @return {@code true} if the given {@code listener} has successfully been removed, {@code false} if the + * {@code listener} was NOT {@link #addListener(Class, EventListener) registered}. + */ + boolean removeListener(Class eventType, EventListener listener); + + /** + * This method removes a listener. If the listener was not registered before this method will have no effect. + * + * @param listener is the {@link EventListener} to remove. + * @return {@code true} if the given {@code listener} has successfully been removed, {@code false} if the + * {@code listener} was NOT {@link #addListener(Class, EventListener) registered}. + */ + default boolean removeListener(EventListener listener) { + + return removeListener(null, listener); + } + +} \ No newline at end of file diff --git a/core/src/main/java/io/github/mmm/event/EventBusAccess.java b/core/src/main/java/io/github/mmm/event/EventBusAccess.java new file mode 100644 index 0000000..503c89f --- /dev/null +++ b/core/src/main/java/io/github/mmm/event/EventBusAccess.java @@ -0,0 +1,26 @@ +package io.github.mmm.event; + +import java.util.ServiceLoader; + +import io.github.mmm.base.config.ServiceHelper; + +/** + * Class giving {@link #get() global access} to the {@link EventBus}. + */ +public final class EventBusAccess { + + private static final EventBus EVENT_BUS = ServiceHelper.singleton(ServiceLoader.load(EventBus.class), false); + + private EventBusAccess() { + + } + + /** + * @return the {@link EventBus} instance. + */ + public static EventBus get() { + + return EVENT_BUS; + } + +} diff --git a/core/src/main/java/io/github/mmm/event/EventSourceAdapter.java b/core/src/main/java/io/github/mmm/event/EventSourceAdapter.java index c3c0d57..6f9c842 100644 --- a/core/src/main/java/io/github/mmm/event/EventSourceAdapter.java +++ b/core/src/main/java/io/github/mmm/event/EventSourceAdapter.java @@ -24,43 +24,11 @@ public abstract class EventSourceAdapter> { super(); } - boolean matches(EventListener listener2remove, EventListener registeredListener) { - - registeredListener = unwrap(registeredListener); - if (listener2remove == registeredListener) { - return true; - } else if ((registeredListener.isMatchedUsingEquals()) && registeredListener.equals(listener2remove)) { - return true; - } - return false; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - EventListener wrap(EventListener listener, boolean weak) { - - if (weak) { - return new WeakEventListener(this, listener); - } - return listener; - } - - @SuppressWarnings("unchecked") - > T unwrap(EventListener listener) { - - if (listener == null) { - return null; - } else if (listener instanceof WeakEventListener) { - return (T) ((WeakEventListener) listener).ref.get(); - } - return (T) listener; - } - /** * @param listener - see {@link EventSource#addListener(EventListener)}. - * @param weak - see {@link EventSource#addListener(EventListener, boolean)}. * @return this adapter itself or a new instance capable to handle more listeners. */ - public abstract EventSourceAdapter addListener(EventListener listener, boolean weak); + public abstract EventSourceAdapter addListener(EventListener listener); /** * @param listener - see {@link EventSource#removeListener(EventListener)}. @@ -70,23 +38,27 @@ > T unwrap(EventListener listener) public abstract EventSourceAdapter removeListener(EventListener listener); /** - * @param event the event to {@link EventListener#onEvent(Object) send} to all - * {@link #addListener(EventListener, boolean) registered} {@link EventListener}s. + * @param event the event to {@link EventListener#onEvent(Object) send} to all {@link #addListener(EventListener) + * registered} {@link EventListener}s. + * @return {@code true} if the event has actually been dispatched, {@code false} otherwise (no listener was + * {@link #addListener(EventListener) registered} for the event). */ - public abstract void fireEvent(E event); + public abstract boolean fireEvent(E event); - void fireEvent(E event, EventListener listener) { + boolean fireEvent(E event, EventListener listener) { try { listener.onEvent(event); + return true; } catch (Exception e) { Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e); } + return false; } /** - * @return {@code true} if at least one {@link EventListener} is {@link #addListener(EventListener, boolean) - * registered}, {@code false} otherwise. + * @return {@code true} if at least one {@link EventListener} is {@link #addListener(EventListener) registered}, + * {@code false} otherwise. */ public boolean hasListeners() { @@ -94,7 +66,7 @@ public boolean hasListeners() { } /** - * @return the number of {@link #addListener(EventListener, boolean) registered} {@link EventListener}s. + * @return the number of {@link #addListener(EventListener) registered} {@link EventListener}s. */ public abstract int getListenerCount(); @@ -132,9 +104,9 @@ private Empty() { } @Override - public EventSourceAdapter addListener(EventListener listener, boolean weak) { + public EventSourceAdapter addListener(EventListener listener) { - return new Single<>(wrap(listener, weak)); + return new Single<>(listener); } @Override @@ -144,8 +116,9 @@ public EventSourceAdapter removeListener(EventListener listener) { } @Override - public void fireEvent(Object event) { + public boolean fireEvent(Object event) { + return false; } @Override @@ -185,25 +158,25 @@ private Single(EventListener listener) { } @Override - public EventSourceAdapter addListener(EventListener eventListener, boolean weak) { + public EventSourceAdapter addListener(EventListener eventListener) { - return new Multi<>(this.listener, wrap(eventListener, weak)); + return new Multi<>(this.listener, eventListener); } @SuppressWarnings("unchecked") @Override public EventSourceAdapter removeListener(EventListener eventListener) { - if (matches(eventListener, this.listener)) { + if (AbstractEventSource.matches(eventListener, this.listener)) { return EMPTY; } return null; } @Override - public void fireEvent(E event) { + public boolean fireEvent(E event) { - fireEvent(event, this.listener); + return fireEvent(event, this.listener); } @Override @@ -218,11 +191,12 @@ public int getListenerCount() { return 1; } + @SuppressWarnings("unchecked") @Override public L getListener(int index) { if (index == 0) { - return unwrap(this.listener); + return (L) AbstractEventSource.unwrap(this.listener); } return null; } @@ -254,7 +228,7 @@ private Multi(EventListener first, EventListener second) { } @Override - public EventSourceAdapter addListener(EventListener listener, boolean weak) { + public EventSourceAdapter addListener(EventListener listener) { int oldCapacity = this.listeners.length; if (this.locked) { @@ -275,7 +249,7 @@ public EventSourceAdapter addListener(EventListener listener, b public EventSourceAdapter removeListener(EventListener listener) { for (int i = 0; i < this.listenerCount; i++) { - if (matches(listener, this.listeners[i])) { + if (AbstractEventSource.matches(listener, this.listeners[i])) { if (this.listenerCount == 2) { return new Single<>(this.listeners[1 - i]); } else { @@ -300,16 +274,21 @@ public EventSourceAdapter removeListener(EventListener listener } @Override - public void fireEvent(E event) { + public boolean fireEvent(E event) { + boolean dispatched = false; try { this.locked = true; for (int i = 0; i < this.listenerCount; i++) { - fireEvent(event, this.listeners[i]); + boolean send = fireEvent(event, this.listeners[i]); + if (send) { + dispatched = true; + } } } finally { this.locked = false; } + return dispatched; } @Override @@ -328,7 +307,7 @@ public int getListenerCount() { public L getListener(int index) { if ((index >= 0) && (index < this.listenerCount)) { - return unwrap(this.listeners[index]); + return (L) AbstractEventSource.unwrap(this.listeners[index]); } return null; } diff --git a/core/src/main/java/io/github/mmm/event/impl/EventBusImpl.java b/core/src/main/java/io/github/mmm/event/impl/EventBusImpl.java new file mode 100644 index 0000000..f03acd0 --- /dev/null +++ b/core/src/main/java/io/github/mmm/event/impl/EventBusImpl.java @@ -0,0 +1,47 @@ +/* Copyright (c) The m-m-m Team, Licensed under the Apache License, Version 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 */ +package io.github.mmm.event.impl; + +import io.github.mmm.base.exception.GlobalExceptionHandler; +import io.github.mmm.event.AbstractEventBus; +import io.github.mmm.event.EventBus; + +/** + * This is the default implementation of {@link EventBus}. + */ +public class EventBusImpl extends AbstractEventBus { + + private volatile boolean dispatching; + + /** + * The constructor. + */ + public EventBusImpl() { + + this(null); + } + + /** + * The constructor. + * + * @param errorHandler the {@link GlobalExceptionHandler} instance. + */ + protected EventBusImpl(GlobalExceptionHandler errorHandler) { + + super(errorHandler); + } + + @Override + protected void triggerDispatchEvents() { + + // not synchronized - may call dispatchEvents() parallel + // since we are using a concurrent queue, events will get properly dispatched + // in the worst case and event could overtake if all goes badly wrong... + if (!this.dispatching) { + this.dispatching = true; + dispatchEvents(); + this.dispatching = false; + } + } + +} diff --git a/core/src/main/java/io/github/mmm/event/impl/WeakEventListener.java b/core/src/main/java/io/github/mmm/event/impl/WeakEventListener.java index 3591e95..c9da5bd 100644 --- a/core/src/main/java/io/github/mmm/event/impl/WeakEventListener.java +++ b/core/src/main/java/io/github/mmm/event/impl/WeakEventListener.java @@ -6,6 +6,7 @@ import java.util.Objects; import io.github.mmm.event.EventListener; +import io.github.mmm.event.EventSource; import io.github.mmm.event.EventSourceAdapter; /** @@ -17,7 +18,8 @@ */ public class WeakEventListener implements EventListener { - private final EventSourceAdapter source; + @SuppressWarnings("rawtypes") + private final EventSource source; /** The {@link WeakReference}. */ public final WeakReference> ref; @@ -28,7 +30,7 @@ public class WeakEventListener implements EventListener { * @param source the {@link EventSourceAdapter}. * @param listener the original listener to wrap. */ - public WeakEventListener(EventSourceAdapter source, EventListener listener) { + public WeakEventListener(EventSource source, EventListener listener) { super(); Objects.requireNonNull(source, "source"); diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java index 4d5d9ea..0e8fa51 100644 --- a/core/src/main/java/module-info.java +++ b/core/src/main/java/module-info.java @@ -6,8 +6,16 @@ * Provides generic, reusable infrastructure to define, send and receive events.
* *

Event


- * This module provides a minimalistic but powerful API for the event pattern. To get started, here is a simple example - * how you can define your custom event handler interface: + * This module provides a minimalistic but powerful API for the event pattern. You may define any kind of custom event + * like this: + * + *
+ * // you may also use a record instead of a class...
+ * public class MyEvent {
+ * }
+ * 
+ * + * To get started, here is a simple example how you can define your custom event handler interface: * *
  * @FunctionalInterface
@@ -15,9 +23,6 @@
  * }
  * 
* - * The event itself ({@code MyEvent}) can be any object so you can use an interface, class, or enum without any - * restrictions. - * * Your component sending events can be defined like this: * *
@@ -45,11 +50,24 @@
  * MyComponent component = new MyComponentImpl();
  * MyEventListener listener = (e) -{@literal >} System.out.println("Received event: " + e);
  * component.{@link io.github.mmm.event.EventSource#addListener(EventListener) addListener}(listener);
- * component.doSomething();
+ * component.doSomething(); // this will echo "Received event: MyEvent@..."
  * // when you are done, you can unsubscribe the listener
  * component.{@link io.github.mmm.event.EventSource#removeListener(EventListener) removeListener}(listener);
  * 
+ * + * @provides io.github.mmm.event.EventBus + * @uses io.github.mmm.event.EventBus */ module io.github.mmm.event { + + requires io.github.mmm.base; + + // requires org.slf4j; + + uses io.github.mmm.event.EventBus; + + provides io.github.mmm.event.EventBus // + with io.github.mmm.event.impl.EventBusImpl; + exports io.github.mmm.event; } diff --git a/core/src/test/java/io/github/mmm/event/impl/EventBusImplTest.java b/core/src/test/java/io/github/mmm/event/impl/EventBusImplTest.java new file mode 100644 index 0000000..68ea470 --- /dev/null +++ b/core/src/test/java/io/github/mmm/event/impl/EventBusImplTest.java @@ -0,0 +1,74 @@ +package io.github.mmm.event.impl; + +import static org.junit.jupiter.api.Assertions.assertSame; + +import java.util.LinkedList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import io.github.mmm.base.exception.GlobalExceptionHandler; +import io.github.mmm.event.EventBus; +import io.github.mmm.event.EventListener; + +/** + * Test of {@link EventBusImpl}. + */ +public class EventBusImplTest extends EventBusTest { + + /** + * @return the {@link EventBus} instance to test. + */ + @Override + protected EventBus getEventBus() { + + return new EventBusImpl(); + } + + /** + * Tests the error handling of {@link EventBusImpl}. + */ + @Test + public void testErrorHandling() { + + // given + + final List errorList = new LinkedList<>(); + final String errorEvent = "error"; + final RuntimeException error = new IllegalStateException(errorEvent); + GlobalExceptionHandler errorHandler = new GlobalExceptionHandler() { + + @Override + public void handleError(Object context, Throwable e) { + + assertThat(e).isSameAs(error); + assertSame(errorEvent, context); + errorList.add(e); + } + }; + final EventBus eventBus = new EventBusImpl(errorHandler); + + EventListener listener = new EventListener<>() { + + @Override + public void onEvent(String event) { + + if (event == errorEvent) { + throw error; + } + } + }; + eventBus.addListener(String.class, listener); + + // when + then + eventBus.sendEvent("foo"); + assertThat(errorList).isEmpty(); + eventBus.sendEvent(errorEvent); + assertThat(errorList).hasSize(1); + eventBus.sendEvent("bar"); + assertThat(errorList).hasSize(1); + eventBus.sendEvent(errorEvent); + assertThat(errorList).hasSize(2); + } + +} diff --git a/core/src/test/java/io/github/mmm/event/impl/EventBusTest.java b/core/src/test/java/io/github/mmm/event/impl/EventBusTest.java new file mode 100644 index 0000000..9e7f475 --- /dev/null +++ b/core/src/test/java/io/github/mmm/event/impl/EventBusTest.java @@ -0,0 +1,287 @@ +package io.github.mmm.event.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.LinkedList; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import io.github.mmm.event.EventBus; +import io.github.mmm.event.EventListener; + +/** + * This is the test-case for {@link EventBus} and its implementation. + */ +public abstract class EventBusTest extends Assertions { + + // private static final Logger LOG = LoggerFactory.getLogger(EventBusTest.class); + + /** The number of threads. */ + private static final int NUMBER_OF_THREADS = 10; + + /** The number of events an {@link EventThread} sends to the next thread. */ + private static final int EVENTS_PER_THREAD = 5; + + /** + * @return the {@link EventBus} instance to test. + */ + protected abstract EventBus getEventBus(); + + /** + * Tests the {@link #getEventBus() event bus} with a simple scenario. + */ + @Test + public void testSimple() { + + final EventBus eventBus = getEventBus(); + final List events = new LinkedList<>(); + final String eventLast = "terminate"; + EventListener listener = new EventListener<>() { + + @Override + public void onEvent(Object event) { + + events.add(event); + if (event == eventLast) { + boolean removed = eventBus.removeListener(this); + assertThat(removed).isTrue(); + } + } + }; + eventBus.addListener(Object.class, listener); + Object event1 = new Object(); + assertThat(events).hasSize(0); + eventBus.sendEvent(event1); + assertThat(events).hasSize(1); + String event2 = "foo"; + eventBus.sendEvent(event2); + assertThat(events).hasSize(2); + MyEvent event3 = new MyEvent(1, 1); + eventBus.sendEvent(event3); + assertThat(events).hasSize(3); + eventBus.sendEvent(eventLast); + assertThat(events).hasSize(4); + eventBus.sendEvent("event after last"); + assertThat(events).hasSize(4); + assertThat(events.get(0)).isSameAs(event1); + assertThat(events.get(1)).isSameAs(event2); + assertThat(events.get(2)).isSameAs(event3); + assertThat(events.get(3)).isSameAs(eventLast); + // listener auto-removed himself on last event + boolean removed = eventBus.removeListener(listener); + assertThat(removed).isFalse(); + } + + /** + * Tests the event bus in a real concurrent usage scenario. + * + * @throws Exception if anything goes wrong. + */ + @Test + @Disabled("Only for local testing.") + public void testConcurrent() throws Exception { + + // given + final EventBus eventBus = getEventBus(); + + EventThread[] threads = new EventThread[NUMBER_OF_THREADS]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new EventThread(eventBus, i); + } + + // when + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + eventBus.sendEvent(new MyEvent(-1, threads[0].threadId)); + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + + // then + + // + // + one for the thread itself (see EventThread.run()). + int expectedEventProcessCount = 2; + // total events: each thread receives (EVENTS_PER_THREAD + 1) initial events + // it forwards its + // 0 -> 0:0,0:1,...,0:10 + // 0 -> 0:1,0:2,...,0:10 times 10 + + // initially 1 event send explicitly from call above, forwarded to all threads + int expectedEventTotalCount = threads.length + 1; // + for (int i = 0; i < threads.length; i++) { + // each thread is sending an event for itself that is forwarded to all following threads + // and it sends EVENTS_PER_THREAD events to its next thread forwarded to all following threads + int eventCountForThread = (threads.length - i + 1) + (threads.length - i) * EVENTS_PER_THREAD; + expectedEventTotalCount = expectedEventTotalCount + eventCountForThread; + } + // expectedEventTotalCount = 626; // hack + for (int i = 0; i < threads.length; i++) { + // all threads registered themselves equivalent on the even bus and must have received the same total + // number of events. + assertEquals(expectedEventTotalCount, threads[i].eventTotalCount); + assertEquals(expectedEventProcessCount, threads[i].eventProcessCount); + expectedEventProcessCount = expectedEventProcessCount + EVENTS_PER_THREAD + 1; + } + } + + /** + * A {@link Thread} sending and receiving events for asynchronous tests. + */ + protected static class EventThread extends Thread implements EventListener { + + /** The {@link EventBus} to test. */ + private final EventBus eventBus; + + /** The sequential ID of this {@link EventThread}. */ + private final int threadId; + + /** + * A counter for the number of events that have been processed (causing an event to the following + * {@link EventThread}). + */ + private int eventProcessCount; + + /** + * The total number of events received. + */ + private int eventTotalCount; + + /** + * The constructor. + * + * @param eventBus is the {@link EventBus} instance. + * @param id is the id (sequential number) of this thread. + */ + public EventThread(EventBus eventBus, int id) { + + super("EventThread" + id); + this.eventBus = eventBus; + this.threadId = id; + this.eventBus.addListener(MyEvent.class, this); + } + + @Override + public void onEvent(MyEvent event) { + + this.eventTotalCount++; + // LOG.debug(this.threadId + ";" + event.sourceThreadId + ";" + event.targetThreadId + ";" + event.eventId); + if (event.targetThreadId == this.threadId) { + this.eventProcessCount++; + this.eventBus.sendEvent(new MyEvent(event.sourceThreadId, this.threadId + 1, event.eventId)); + } + } + + @Override + public void run() { + + // send event to ourselves + this.eventBus.sendEvent(new MyEvent(this.threadId, this.threadId)); + for (int i = 0; i < EVENTS_PER_THREAD; i++) { + // send event to next thread + this.eventBus.sendEvent(new MyEvent(this.threadId)); + } + yield(); + // this.eventBus.removeListener(this); + } + } + + /** + * A simple event for testing. + */ + protected static class MyEvent { + + /** Counter for unique {@link #eventId}. */ + private static int idCounter = 0; + + /** The ID of this event (used for copies of the event for following {@link EventThread}s). */ + private int eventId; + + /** + * The {@link EventThread#getId() id} of the thread who initially triggered the event (mainly for debugging and + * tracing). + */ + private int sourceThreadId; + + /** The {@link EventThread#getId() id} of the thread who should handle the event. */ + private int targetThreadId; + + /** + * The constructor. + * + * @param sourceThreadId - see {@link #getSourceThreadId()}. + */ + public MyEvent(int sourceThreadId) { + + this(sourceThreadId, sourceThreadId + 1); + } + + /** + * The constructor. + * + * @param sourceThreadId - see {@link #getSourceThreadId()}. + * @param targetThreadId - see {@link #getTargetThreadId()}. + */ + public MyEvent(int sourceThreadId, int targetThreadId) { + + this(sourceThreadId, targetThreadId, getNextId()); + } + + /** + * @return the next unique ID. + */ + private static synchronized int getNextId() { + + return idCounter++; + } + + /** + * The constructor. + * + * @param eventId - see {@link #getEventId()} + * @param sourceThreadId - see {@link #getSourceThreadId()}. + * @param targetThreadId - see {@link #getTargetThreadId()}. + */ + public MyEvent(int sourceThreadId, int targetThreadId, int eventId) { + + super(); + this.eventId = eventId; + this.sourceThreadId = sourceThreadId; + this.targetThreadId = targetThreadId; + } + + /** + * @return the eventId + */ + public int getEventId() { + + return this.eventId; + } + + /** + * @return the sourceThreadId + */ + public int getSourceThreadId() { + + return this.sourceThreadId; + } + + /** + * @return the threadId + */ + public int getTargetThreadId() { + + return this.targetThreadId; + } + + @Override + public String toString() { + + return "Event from " + this.sourceThreadId + " to " + this.targetThreadId + " with ID " + this.eventId; + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4eb95ed..cf1794c 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,14 @@ + + ${project.groupId} + mmm-base-parent + ${project.version} + pom + import + + ${project.groupId} mmm-event