Skip to content

Commit

Permalink
added EventBus
Browse files Browse the repository at this point in the history
  • Loading branch information
hohwille committed Oct 4, 2023
1 parent a077e11 commit 957fced
Show file tree
Hide file tree
Showing 13 changed files with 990 additions and 90 deletions.
12 changes: 12 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,16 @@
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>Java module providing the event pattern as generic reusable API and implementation.</description>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>mmm-base</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
-->
</dependencies>
</project>
266 changes: 266 additions & 0 deletions core/src/main/java/io/github/mmm/event/AbstractEventBus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/* Copyright (c) The m-m-m Team, Licensed under the Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0 */
package io.github.mmm.event;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import io.github.mmm.base.exception.GlobalExceptionHandler;

/**
* This is the default implementation of {@link EventBus}.
*/
public abstract class AbstractEventBus implements EventBus {

// private static final Logger LOG = LoggerFactory.getLogger(AbstractEventBus.class);

@SuppressWarnings("rawtypes")
private final Map<Class<?>, EventDispatcher> eventType2dispatcherMap;

private final Queue<Object> eventQueue;

/** The {@link GlobalExceptionHandler}. */
protected final GlobalExceptionHandler errorHandler;

/**
* The constructor.
*/
public AbstractEventBus() {

this(null);
}

/**
* The constructor.
*
* @param errorHandler the {@link GlobalExceptionHandler} instance.
*/
protected AbstractEventBus(GlobalExceptionHandler errorHandler) {

super();
this.eventType2dispatcherMap = new ConcurrentHashMap<>();
this.eventQueue = new ConcurrentLinkedQueue<>();
if (errorHandler == null) {
this.errorHandler = io.github.mmm.base.exception.GlobalExceptionHandlerAccess.get();
} else {
this.errorHandler = errorHandler;
}
}

@Override
public void sendEvent(Object event) {

Objects.requireNonNull(event);
this.eventQueue.add(event);
triggerDispatchEvents();
}

/**
* Called from {@link #sendEvent(Object)} to ensure {@link #dispatchEvents()} is triggered. This can be done
* synchronous or asynchronous.
*/
protected abstract void triggerDispatchEvents();

/**
* Dispatches all events in the event queue.
*/
protected void dispatchEvents() {

while (true) {
Object event = this.eventQueue.poll();
if (event == null) {
return;
}
dispatchEvent(event);
}
}

/**
* Dispatches the given event.
*
* @param <E> is the generic type of {@code event}.
* @param event is the event to dispatch.
*/
protected <E> void dispatchEvent(E event) {

@SuppressWarnings("unchecked")
Class<E> eventType = (Class<E>) event.getClass();
EventDispatcher<E> eventDispatcher = getEventDispatcherOrNull(eventType);
boolean dispatched = false;
if (eventDispatcher != null) {
dispatched = eventDispatcher.fireEvent(event);
}
if (!dispatched) {
handleUndispatchedEvent(event);
}
}

/**
* Called if an event was {@link #sendEvent(Object) send} but not dispatched to any
* {@link #addListener(Class, EventListener) registered listener}.
*
* @param event is the un-dispatched event.
*/
protected void handleUndispatchedEvent(Object event) {

// LOG.warn("Event send with no responsible listener registered: {}", event);
}

/**
* Gets or creates the {@link EventDispatcher} for the given {@code eventType}.
*
* @param <E> is the generic type of {@code eventType}.
* @param eventType is the {@link Class} reflecting the event.
* @return the {@link EventDispatcher} responsible for the given {@code eventType}.
*/
@SuppressWarnings("unchecked")
protected <E> EventDispatcher<E> getEventDispatcherRequired(Class<E> eventType) {

EventDispatcher<?> dispatcher = this.eventType2dispatcherMap.get(eventType);
if (dispatcher == null) {
Class<?> type = eventType.getSuperclass();
EventDispatcher<?> parent;
if (type != null) {
parent = getEventDispatcherRequired(type);
} else {
parent = null;
}
dispatcher = this.eventType2dispatcherMap.computeIfAbsent(eventType, t -> new EventDispatcher<>(parent));
}
return (EventDispatcher<E>) dispatcher;
}

/**
* Gets the most specific {@link EventDispatcher} responsible the given {@code eventType}.
*
* @param <E> is the generic type of {@code eventType}.
* @param eventType is the {@link Class} reflecting the event.
* @return the most specific {@link EventDispatcher} responsible for the given {@code eventType}. May be {@code null}
* if no {@link EventListener} is {@link #addListener(Class, EventListener) registered} for a compatible
* {@code eventType}.
*/
@SuppressWarnings("unchecked")
protected <E> EventDispatcher<E> getEventDispatcherOrNull(Class<E> eventType) {

Class<?> type = eventType;
EventDispatcher<?> dispatcher = this.eventType2dispatcherMap.get(eventType);
while ((dispatcher == null) && (type != null)) {
type = type.getSuperclass();
dispatcher = this.eventType2dispatcherMap.get(type);
}
return (EventDispatcher<E>) dispatcher;
}

@Override
public <E> void addListener(Class<E> eventType, EventListener<E> listener) {

Objects.requireNonNull(eventType);
Objects.requireNonNull(listener);
if (eventType.isInterface()) {
throw new UnsupportedOperationException(
"This EventBus implementation does not support interfaces as event type: " + eventType.getName());
}
EventDispatcher<E> eventDispatcher = getEventDispatcherRequired(eventType);
eventDispatcher.addListener(listener);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public <E> boolean removeListener(Class<E> eventType, EventListener<E> listener) {

boolean removed = false;
if (eventType == null) {
for (EventDispatcher<?> dispatcher : this.eventType2dispatcherMap.values()) {
boolean currentRemoved = dispatcher.removeListener((EventListener) listener);
if (currentRemoved) {
removed = true;
}
}
} else {
EventDispatcher<E> dispatcher = this.eventType2dispatcherMap.get(eventType);
if (dispatcher != null) {
return dispatcher.removeListener(listener);
}
}
return removed;
}

/**
* A dispatcher for all {@link EventListener}s of a particular {@link EventBus#addListener(Class, EventListener) event
* type}.
*
* @param <E> type of the {@link EventListener#onEvent(Object) events}.
*/
protected class EventDispatcher<E> extends AbstractEventSource<E, EventListener<E>> {

/** @see #onEvent(Object, Collection) */
private final EventDispatcher<? super E> parentDispatcher;

/** @see #fireEvent(Object, Collection) */
private final Collection<EventListener<E>> listeners;

/**
* The constructor.
*
* @param parent is the {@link EventDispatcher} responsible for the super-class or {@code null} if this is the root
* {@link EventDispatcher} responsible for {@link Object}.
*/
public EventDispatcher(EventDispatcher<? super E> parent) {

this(parent, new ConcurrentLinkedQueue<>());
}

/**
* The constructor.
*
* @param parent is the {@link EventDispatcher} responsible for the super-class or {@code null} if this is the root
* {@link EventDispatcher} responsible for {@link Object}.
* @param listeners the empty {@link Collection} implementation for the {@link EventListener}s.
*/
protected EventDispatcher(EventDispatcher<? super E> parent, Collection<EventListener<E>> listeners) {

super();
this.parentDispatcher = parent;
this.listeners = listeners;
}

@Override
protected void doAddListener(EventListener<E> listener) {

this.listeners.add(listener);
}

@Override
public boolean removeListener(EventListener<E> listener) {

return this.listeners.remove(listener);
}

@Override
protected boolean fireEvent(E event) {

boolean dispatched = false;
for (EventListener<E> listener : this.listeners) {
try {
listener.onEvent(event);
dispatched = true;
} catch (Throwable exception) {
AbstractEventBus.this.errorHandler.handleError(event, exception);
}
}
if (this.parentDispatcher != null) {
boolean superDispatched = this.parentDispatcher.fireEvent(event);
if (superDispatched) {
dispatched = true;
}
}
return dispatched;
}

}

}
73 changes: 73 additions & 0 deletions core/src/main/java/io/github/mmm/event/AbstractEventSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* Copyright (c) The m-m-m Team, Licensed under the Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0 */
package io.github.mmm.event;

/**
* Implementation of {@link EventSource}.
*
* @param <E> the type of the events to {@link EventListener#onEvent(Object) send}.
* @param <L> the type of the {@link EventListener listeners}.
* @since 1.0.0
*/
public abstract class AbstractEventSender<E, L extends EventListener<?/* super E */> >
extends AbstractEventSource<E, L> {

private EventSourceAdapter<E, L> eventAdapter;

/**
* The constructor.
*/
public AbstractEventSender() {

super();
this.eventAdapter = EventSourceAdapter.empty();
}

@Override
protected void doAddListener(EventListener<E> listener) {

this.eventAdapter = this.eventAdapter.addListener(listener);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public boolean removeListener(L listener) {

EventSourceAdapter<E, L> adapter = this.eventAdapter.removeListener((EventListener) listener);
if (adapter == null) {
return false;
}
this.eventAdapter = adapter;
return true;
}

/**
* @return {@code true} if at least one {@link EventListener} is {@link #addListener(EventListener, boolean)
* registered}, {@code false} otherwise.
*/
protected boolean hasListeners() {

return this.eventAdapter.hasListeners();
}

/**
* @return the {@link EventSourceAdapter}.
*/
protected EventSourceAdapter<E, L> getEventAdapter() {

return this.eventAdapter;
}

/**
* @param event the event to {@link EventListener#onEvent(Object) send} to all {@link #addListener(EventListener)
* registered} {@link EventListener}s.
* @return {@code true} if the event has actually been dispatched, {@code false} otherwise (no listener was
* {@link #addListener(EventListener) registered} for the event).
*/
@Override
protected boolean fireEvent(E event) {

return this.eventAdapter.fireEvent(event);
}

}
Loading

0 comments on commit 957fced

Please sign in to comment.