Skip to content

Commit

Permalink
Merge pull request #171 from prooph/event_bus_fixes
Browse files Browse the repository at this point in the history
Fix Bugs in Event Bus
  • Loading branch information
codeliner authored Oct 23, 2017
2 parents a9727d2 + 1ed3321 commit c6dcdb3
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 9 deletions.
36 changes: 34 additions & 2 deletions src/EventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class EventBus extends MessageBus
*/
protected $collectExceptions = false;

/**
* @var array
*/
protected $collectedExceptions = [];

public function __construct(ActionEventEmitter $actionEventEmitter = null)
{
parent::__construct($actionEventEmitter);
Expand Down Expand Up @@ -58,12 +63,29 @@ function (ActionEvent $actionEvent): void {
$actionEvent->setParam(self::EVENT_PARAM_MESSAGE_HANDLED, true);
}

if (count($caughtExceptions)) {
throw EventListenerException::collected(...$caughtExceptions);
foreach ($caughtExceptions as $ex) {
$this->collectedExceptions[] = $ex;
}
},
self::PRIORITY_INVOKE_HANDLER
);

$this->events->attachListener(
self::EVENT_FINALIZE,
function (ActionEvent $actionEvent): void {
$target = $actionEvent->getTarget();

if (empty($target->collectedExceptions)) {
return;
}

$exceptions = $target->collectedExceptions;
$target->collectedExceptions = [];

$actionEvent->setParam(MessageBus::EVENT_PARAM_EXCEPTION, EventListenerException::collected(...$exceptions));
},
1000
);
}

/**
Expand Down Expand Up @@ -99,4 +121,14 @@ public function disableCollectExceptions(): void
{
$this->collectExceptions = false;
}

public function isCollectingExceptions(): bool
{
return $this->collectExceptions;
}

public function addCollectedException(\Throwable $e): void
{
$this->collectedExceptions[] = $e;
}
}
19 changes: 14 additions & 5 deletions src/Plugin/InvokeStrategy/OnEventStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@ public function attachToMessageBus(MessageBus $messageBus): void
$this->listenerHandlers[] = $messageBus->attach(
MessageBus::EVENT_DISPATCH,
function (ActionEvent $actionEvent): void {
if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) {
return;
}

$target = $actionEvent->getTarget();
$message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []);

foreach ($handlers as $handler) {
$handler->onEvent($message);
if (is_callable($handler) || ! is_object($handler) || ! is_callable([$handler, 'onEvent'])) {
continue;
}

try {
$handler->onEvent($message);
} catch (\Throwable $e) {
if ($target->isCollectingExceptions()) {
$target->addCollectedException($e);
} else {
throw $e;
}
}
}

$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);
Expand Down
2 changes: 1 addition & 1 deletion tests/EventBusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public function it_collects_exceptions_if_mode_is_enabled(): void
MessageBus::EVENT_DISPATCH,
function (ActionEvent $e) use ($handler, $errorProducer): void {
if ($e->getParam(MessageBus::EVENT_PARAM_MESSAGE_NAME) === CustomMessage::class) {
$e->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$handler, $errorProducer, $handler]);
$e->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$handler, $errorProducer, $handler]);
}
},
MessageBus::PRIORITY_ROUTE
Expand Down
36 changes: 36 additions & 0 deletions tests/Mock/CustomMessageEventHandler2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <[email protected]>
* (c) 2015-2017 Sascha-Oliver Prolic <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\ServiceBus\Mock;

final class CustomMessageEventHandler2
{
private $lastMessage;

private $invokeCounter = 0;

public function on($message): void
{
$this->lastMessage = $message;
$this->invokeCounter++;
}

public function getLastMessage()
{
return $this->lastMessage;
}

public function getInvokeCounter(): int
{
return $this->invokeCounter;
}
}
21 changes: 21 additions & 0 deletions tests/Mock/CustomMessageEventHandlerThrowingExceptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <[email protected]>
* (c) 2015-2017 Sascha-Oliver Prolic <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\ServiceBus\Mock;

final class CustomMessageEventHandlerThrowingExceptions
{
public function onEvent($message): void
{
throw new \Exception('bar');
}
}
52 changes: 52 additions & 0 deletions tests/Mock/CustomOnEventStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <[email protected]>
* (c) 2015-2017 Sascha-Oliver Prolic <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\ServiceBus\Mock;

use Prooph\Common\Event\ActionEvent;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\Plugin\AbstractPlugin;

final class CustomOnEventStrategy extends AbstractPlugin
{
public function attachToMessageBus(MessageBus $messageBus): void
{
$this->listenerHandlers[] = $messageBus->attach(
MessageBus::EVENT_DISPATCH,
function (ActionEvent $actionEvent): void {
$target = $actionEvent->getTarget();
$message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []);

foreach ($handlers as $handler) {
if (is_callable($handler) || ! is_object($handler) || ! is_callable([$handler, 'on'])) {
continue;
}

try {
$handler->on($message);
} catch (\Throwable $e) {
if ($target->isCollectingExceptions()) {
$target->addCollectedException($e);
} else {
throw $e;
}
}
}

$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);
},
MessageBus::PRIORITY_INVOKE_HANDLER
);
}
}
147 changes: 146 additions & 1 deletion tests/Plugin/InvokeStrategy/OnEventStrategyTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
use PHPUnit\Framework\TestCase;
use Prooph\Common\Event\DefaultListenerHandler;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Exception\EventListenerException;
use Prooph\ServiceBus\Exception\MessageDispatchException;
use Prooph\ServiceBus\Plugin\InvokeStrategy\OnEventStrategy;
use Prooph\ServiceBus\Plugin\ListenerExceptionCollectionMode;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler;
use ProophTest\ServiceBus\Mock\CustomMessage;
use ProophTest\ServiceBus\Mock\CustomMessageEventHandler;
use ProophTest\ServiceBus\Mock\CustomMessageEventHandler2;
use ProophTest\ServiceBus\Mock\CustomMessageEventHandlerThrowingExceptions;
use ProophTest\ServiceBus\Mock\CustomOnEventStrategy;
use Prophecy\Argument;

class OnEventStrategyTest extends TestCase
Expand Down Expand Up @@ -60,7 +66,7 @@ public function it_can_be_attached_to_event_bus(): void
->shouldBeCalled()
->willReturn(
new DefaultListenerHandler(
function () {
function (): void {
}
)
);
Expand Down Expand Up @@ -91,4 +97,143 @@ public function it_should_not_handle_already_processed_messages(): void
$this->assertSame($customEvent, $callableHandler->getLastMessage());
$this->assertSame(1, $callableHandler->getInvokeCounter());
}

/**
* @test
*/
public function it_should_still_work_with_callables(): void
{
$eventBus = new EventBus();

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandler();

$result = false;

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message) use (&$result): void {
$result = true;
})
->andTo($handler);

$router->attachToMessageBus($eventBus);

$eventBus->dispatch(new CustomMessage('some text'));

$this->assertTrue($result);
$this->assertSame(1, $handler->getInvokeCounter());
}

/**
* @test
*/
public function it_should_still_work_with_callables_and_collect_all_exceptions(): void
{
$eventBus = new EventBus();

$exceptionModePlugin = new ListenerExceptionCollectionMode();
$exceptionModePlugin->attachToMessageBus($eventBus);

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandlerThrowingExceptions();

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message): void {
throw new \Exception('foo');
})
->andTo($handler);

$router->attachToMessageBus($eventBus);

$ex = null;

try {
$eventBus->dispatch(new CustomMessage('some text'));
} catch (MessageDispatchException $ex) {
$ex = $ex->getPrevious();
}

$this->assertNotNull($ex);
$this->assertInstanceOf(EventListenerException::class, $ex);
$this->assertCount(2, $ex->listenerExceptions());
}

/**
* @test
*/
public function it_should_still_work_with_callables_and_collect_all_exceptions_part2(): void
{
$eventBus = new EventBus();

$exceptionModePlugin = new ListenerExceptionCollectionMode();
$exceptionModePlugin->attachToMessageBus($eventBus);

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandlerThrowingExceptions();

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message): void {
throw new \Exception('foo');
})
->andTo($handler)
->andTo($handler);

$router->attachToMessageBus($eventBus);

$ex = null;

try {
$eventBus->dispatch(new CustomMessage('some text'));
} catch (MessageDispatchException $ex) {
$ex = $ex->getPrevious();
}

$this->assertNotNull($ex);
$this->assertInstanceOf(EventListenerException::class, $ex);
$this->assertCount(3, $ex->listenerExceptions());
}

/**
* @test
*/
public function it_should_still_work_with_callables_and_other_strategies(): void
{
$eventBus = new EventBus();

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$secondOnEventStrategy = new CustomOnEventStrategy();
$secondOnEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandler();
$handler2 = new CustomMessageEventHandler2();

$result = false;

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message) use (&$result): void {
$result = true;
})
->andTo($handler)
->andTo($handler2);

$router->attachToMessageBus($eventBus);

$eventBus->dispatch(new CustomMessage('some text'));

$this->assertTrue($result);
$this->assertSame(1, $handler->getInvokeCounter());
$this->assertSame(1, $handler2->getInvokeCounter());
}
}

0 comments on commit c6dcdb3

Please sign in to comment.