Skip to content
This repository has been archived by the owner on Jul 25, 2021. It is now read-only.

Commit

Permalink
Upgraded to 0.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
frankdejonge committed Oct 4, 2019
1 parent 569383a commit 3cda407
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 17 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
}
},
"require": {
"eventsauce/eventsauce": "^0.6.0",
"eventsauce/eventsauce": "^0.7.0",
"doctrine/dbal": "^2.5",
"ramsey/uuid": "^3.6",
"ext-json": "*"
Expand Down
54 changes: 39 additions & 15 deletions src/DoctrineMessageRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace EventSauce\DoctrineMessageRepository;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Statement;
use Doctrine\DBAL\Driver\Statement;
use EventSauce\EventSourcing\AggregateRootId;
use EventSauce\EventSourcing\Header;
use EventSauce\EventSourcing\Message;
Expand Down Expand Up @@ -62,7 +62,7 @@ public function persist(Message ... $messages)
$timeOfRecordingColumn = 'time_of_recording_' . $index;
$payloadColumn = 'payload_' . $index;
$values[] = "(:{$eventIdColumn}, :{$eventTypeColumn}, :{$aggregateRootIdColumn}, :{$aggregateRootVersionColumn}, :{$timeOfRecordingColumn}, :{$payloadColumn})";
$params[$aggregateRootVersionColumn] = $params['headers'][Header::AGGREGATE_ROOT_VERSION] ?? 0;
$params[$aggregateRootVersionColumn] = $payload['headers'][Header::AGGREGATE_ROOT_VERSION] ?? 0;
$params[$timeOfRecordingColumn] = $payload['headers'][Header::TIME_OF_RECORDING];
$params[$eventIdColumn] = $payload['headers'][Header::EVENT_ID] = $payload['headers'][Header::EVENT_ID] ?? Uuid::uuid4()->toString();
$params[$payloadColumn] = json_encode($payload, $this->jsonEncodeOptions);
Expand All @@ -83,7 +83,6 @@ protected function baseSql(string $tableName): string

public function retrieveAll(AggregateRootId $id): Generator
{
/** @var Statement $stm */
$stm = $this->connection->createQueryBuilder()
->select('payload')
->from($this->tableName)
Expand All @@ -92,18 +91,7 @@ public function retrieveAll(AggregateRootId $id): Generator
->setParameter('aggregate_root_id', $id->toString())
->execute();

while ($payload = $stm->fetchColumn()) {
$messages = $this->serializer->unserializePayload(json_decode($payload, true));

/* @var Message $message */
foreach ($messages as $message) {
yield $message;
}
}

return isset($message)
? $message->header(Header::AGGREGATE_ROOT_VERSION) ?: 0
: 0;
return $this->yieldMessagesForResult($stm);
}

public function retrieveEverything(): Generator
Expand All @@ -119,4 +107,40 @@ public function retrieveEverything(): Generator
yield from $this->serializer->unserializePayload(json_decode($payload, true));
}
}

public function retrieveAllAfterVersion(AggregateRootId $id, int $aggregateRootVersion): Generator
{
/** @var Statement $stm */
$stm = $this->connection->createQueryBuilder()
->select('payload')
->from($this->tableName)
->where('aggregate_root_id = :aggregate_root_id')
->where('aggregate_root_version > :aggregate_root_version')
->orderBy('aggregate_root_version', 'ASC')
->setParameter('aggregate_root_id', $id->toString())
->setParameter('aggregate_root_version', $aggregateRootVersion)
->execute();

return $this->yieldMessagesForResult($stm);
}

/**
* @param Statement $stm
* @return Generator|int
*/
private function yieldMessagesForResult(Statement $stm)
{
while ($payload = $stm->fetchColumn()) {
$messages = $this->serializer->unserializePayload(json_decode($payload, true));

/* @var Message $message */
foreach ($messages as $message) {
yield $message;
}
}

return isset($message)
? $message->header(Header::AGGREGATE_ROOT_VERSION) ?: 0
: 0;
}
}
31 changes: 30 additions & 1 deletion tests/DoctrineIntegrationTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ public function it_works()
$message = $this->decorator->decorate(new Message(new TestEvent(), [
Header::EVENT_ID => $eventId,
Header::AGGREGATE_ROOT_ID => $aggregateRootId->toString(),
Header::AGGREGATE_ROOT_VERSION => 10,
]));
$this->repository->persist($message);
$retrievedMessage = iterator_to_array($this->repository->retrieveAll($aggregateRootId), false)[0];
$generator = $this->repository->retrieveAll($aggregateRootId);
$retrievedMessage = iterator_to_array($generator, false)[0];
$this->assertEquals($message, $retrievedMessage);
$this->assertEquals(10, $generator->getReturn());
}

/**
Expand All @@ -85,4 +88,30 @@ public function persisting_events_without_event_ids()
$this->assertCount(1, $persistedMessages);
$this->assertNotEquals($message, $persistedMessages[0]);
}

/**
* @test
*/
public function retrieving_messages_after_a_specific_version()
{
$aggregateRootId = UuidAggregateRootId::create();
$messages = [];
$messages[] = $this->decorator->decorate(new Message(new TestEvent(), [
Header::EVENT_ID => Uuid::uuid4()->toString(),
Header::AGGREGATE_ROOT_ID => $aggregateRootId->toString(),
Header::AGGREGATE_ROOT_VERSION => 10,
]));
$messages[] = $this->decorator->decorate(new Message(new TestEvent(), [
Header::EVENT_ID => $lastEventId = Uuid::uuid4()->toString(),
Header::AGGREGATE_ROOT_ID => $aggregateRootId->toString(),
Header::AGGREGATE_ROOT_VERSION => 11,
]));
$this->repository->persist(...$messages);
$generator = $this->repository->retrieveAllAfterVersion($aggregateRootId, 10);
/** @var Message[] $messages */
$messages = iterator_to_array($generator);
$this->assertEquals(11, $generator->getReturn());
$this->assertCount(1, $messages);
$this->assertEquals($lastEventId, $messages[0]->header(Header::EVENT_ID));
}
}

0 comments on commit 3cda407

Please sign in to comment.