Skip to content

Commit

Permalink
Merge pull request #7 from hirethunk/chris/ver-51-braindead-concurrency
Browse files Browse the repository at this point in the history
VER-51: Very basic concurrency guards
  • Loading branch information
DanielCoulbourne authored Nov 16, 2023
2 parents b0e395d + 089a405 commit 9237fd5
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/Exceptions/ConcurrencyException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

namespace Thunk\Verbs\Exceptions;

use RuntimeException;

class ConcurrencyException extends RuntimeException
{
}
55 changes: 55 additions & 0 deletions src/Lifecycle/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
use Glhd\Bits\Bits;
use Glhd\Bits\Snowflake;
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Database\Query\Builder as BaseBuilder;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\LazyCollection;
use Ramsey\Uuid\UuidInterface;
use Symfony\Component\Uid\AbstractUid;
use Thunk\Verbs\Event;
use Thunk\Verbs\Exceptions\ConcurrencyException;
use Thunk\Verbs\Facades\Verbs;
use Thunk\Verbs\Models\VerbEvent;
use Thunk\Verbs\Models\VerbStateEvent;
Expand Down Expand Up @@ -36,12 +40,63 @@ public function read(
return VerbEvent::query()->lazyById();
}

/** @param Event[] $events */
public function write(array $events): bool
{
if (empty($events)) {
return true;
}

$this->guardAgainstConcurrentWrites($events);

return VerbEvent::insert(static::formatForWrite($events))
&& VerbStateEvent::insert(static::formatRelationshipsForWrite($events));
}

/** @param Event[] $events */
protected function guardAgainstConcurrentWrites(array $events): void
{
$max_event_ids = new Collection();

$query = VerbStateEvent::query()->toBase();

$query->select([
'state_type',
'state_id',
DB::raw(sprintf(
'max(%s) as %s',
$query->getGrammar()->wrap('event_id'),
$query->getGrammar()->wrapTable('max_event_id')
)),
]);

$query->orderBy('id');

$query->where(function (BaseBuilder $query) use ($events, $max_event_ids) {
foreach ($events as $event) {
foreach ($event->states() as $state) {
if (! $max_event_ids->has($key = $state::class.$state->id)) {
$query->orWhere(function (BaseBuilder $query) use ($state) {
$query->where('state_type', $state::class);
$query->where('state_id', $state->id);
});
$max_event_ids->put($key, $state->last_event_id);
}
}
}
});

$query->each(function ($result) use ($max_event_ids) {
$key = data_get($result, 'state_type').data_get($result, 'state_id');
$max_written_id = (int) data_get($result, 'max_event_id');
$max_expected_id = $max_event_ids->get($key, 0);

if ($max_written_id > $max_expected_id) {
throw new ConcurrencyException("An event with ID {$max_written_id} has been written to the database, which is higher than {$max_expected_id}, which is in memory.");
}
});
}

/** @param Event[] $event_objects */
protected static function formatForWrite(array $event_objects): array
{
Expand Down
6 changes: 6 additions & 0 deletions src/Support/StateCollection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
use Illuminate\Support\Collection;
use Thunk\Verbs\State;

/**
* @template TKey of array-key
*
* @implements \ArrayAccess<TKey, State>
* @implements \Illuminate\Support\Enumerable<TKey, State>
*/
class StateCollection extends Collection
{
protected array $aliases = [];
Expand Down
54 changes: 54 additions & 0 deletions tests/Unit/ConcurrencyTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

use Thunk\Verbs\Event;
use Thunk\Verbs\Exceptions\ConcurrencyException;
use Thunk\Verbs\Lifecycle\EventStore;
use Thunk\Verbs\Models\VerbEvent;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\StateCollection;

it('does not throw on sequential events', function () {
$store = app(EventStore::class);

$event = new ConcurrencyTestEvent();
$event->id = 1;
ConcurrencyTestState::singleton()->last_event_id = 1;

$store->write([$event]);

$event2 = new ConcurrencyTestEvent();
$event2->id = 2;
ConcurrencyTestState::singleton()->last_event_id = 2;

$store->write([$event2]);

expect(VerbEvent::count())->toBe(2);
});

it('throws on non-sequential events', function () {
$store = app(EventStore::class);

$event = new ConcurrencyTestEvent();
$event->id = 2;
ConcurrencyTestState::singleton()->last_event_id = 2;

$store->write([$event]);

$event2 = new ConcurrencyTestEvent();
$event2->id = 1;
ConcurrencyTestState::singleton()->last_event_id = 1;

$store->write([$event2]);
})->throws(ConcurrencyException::class);

class ConcurrencyTestEvent extends Event
{
public function states(): StateCollection
{
return StateCollection::make([ConcurrencyTestState::singleton()]);
}
}

class ConcurrencyTestState extends State
{
}

0 comments on commit 9237fd5

Please sign in to comment.