-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DoctrineTransactionalMessageRepository.php
55 lines (47 loc) · 1.69 KB
/
DoctrineTransactionalMessageRepository.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?php
namespace EventSauce\MessageOutbox\DoctrineOutbox;
use Doctrine\DBAL\Connection;
use EventSauce\EventSourcing\AggregateRootId;
use EventSauce\EventSourcing\Message;
use EventSauce\EventSourcing\MessageRepository;
use EventSauce\EventSourcing\PaginationCursor;
use EventSauce\EventSourcing\UnableToPersistMessages;
use EventSauce\MessageOutbox\OutboxRepository;
use Generator;
use Throwable;
class DoctrineTransactionalMessageRepository implements MessageRepository
{
public function __construct(
private Connection $connection,
private MessageRepository $messageRepository,
private OutboxRepository $outboxRepository
) {}
public function persist(Message ...$messages): void
{
try {
$this->connection->beginTransaction();
try {
$this->messageRepository->persist(...$messages);
$this->outboxRepository->persist(...$messages);
$this->connection->commit();
} catch (Throwable $exception) {
$this->connection->rollBack();
throw $exception;
}
} catch (Throwable $exception) {
throw UnableToPersistMessages::dueTo('', $exception);
}
}
public function retrieveAll(AggregateRootId $id): Generator
{
return $this->messageRepository->retrieveAll($id);
}
public function retrieveAllAfterVersion(AggregateRootId $id, int $aggregateRootVersion): Generator
{
return $this->messageRepository->retrieveAllAfterVersion($id, $aggregateRootVersion);
}
public function paginate(PaginationCursor $cursor): Generator
{
return $this->messageRepository->paginate($cursor);
}
}