diff --git a/bench.sh b/bench.sh index 60e87e5..a9dfca1 100755 --- a/bench.sh +++ b/bench.sh @@ -5,7 +5,7 @@ USAGE="Usage: bench.sh --driver [arangodb | postgres | mysql | mariadb] [--strat DRIVER= STREAM_STRATEGY= -while [[ ${1} ]]; do +while [ ${1} ]; do case "${1}" in --driver) DRIVER=${2} @@ -40,6 +40,8 @@ export DRIVER=${DRIVER} export STREAM_STRATEGY=${STRATEGY} echo "" +php -v + echo "Starting benchmark ${DRIVER}!" php src/prepare.php php src/benchmark.php @@ -54,44 +56,33 @@ php src/prepare.php WRITER_COUNTER=0 WRITER_ITERATIONS=10 -start=$(adjtimex | awk '/(time.tv_sec|time.tv_usec):/ { printf("%07d", $2) }') +read up rest logs/writer${WRITER_COUNTER}${type}.log & + php src/writer.php writer${WRITER_COUNTER} ${type} | tee logs/writer${WRITER_COUNTER}${type}.log & done WRITER_COUNTER=$((WRITER_COUNTER + 1)) done for type in user post todo blog comment all do - php src/projector.php projectors${type} ${type} >logs/projector${type}.log & + php src/projector.php projectors${type} ${type} | tee logs/projector${type}.log & done echo "Waiting ... stay patient!" wait -end=$(adjtimex | awk '/(time.tv_sec|time.tv_usec):/ { printf("%07d", $2) }') - -WRITER_COUNTER=0 -while [ ${WRITER_COUNTER} -lt ${WRITER_ITERATIONS} ]; do - for type in user post todo blog comment - do - cat logs/writer${WRITER_COUNTER}${type}.log - done - - WRITER_COUNTER=$((WRITER_COUNTER + 1)) -done - -for type in user post todo blog comment all -do - cat logs/projector${type}.log -done +read up rest createProjection('all_projection'); + $projection = $projectionManager->createProjection( + 'all_projection', + [ + PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 50, + PdoEventStoreProjector::OPTION_CACHE_SIZE => 50, + PdoEventStoreProjector::OPTION_SLEEP => 10000, + PdoEventStoreProjector::OPTION_PCNTL_DISPATCH => true, + ] + ); $projection ->init(function (): array { return ['count' => 0]; @@ -57,12 +66,13 @@ public function run() $avg = $this->stopAt / $time; outputText("Projection $this->id read $readEvents events"); - outputText("Projection $this->id used $time seconds, avg $avg events/second"); + outputText("Projection $this->id used $time seconds, avg $avg events/second " . getMemoryConsumption()); outputText("Projection $this->id checking integrity ...", true, ''); Assertion::eq($readEvents, $stopAt, 'Number of all projected events invalid: Value "%s" does not equal expected value "%s".'); outputText(" ok\n", false); } catch (\Throwable $e) { echo $e->getMessage() . PHP_EOL . $e->getTraceAsString(); + throw $e; } } } diff --git a/src/CategoryProjector.php b/src/CategoryProjector.php index 39f5d8e..379df2c 100644 --- a/src/CategoryProjector.php +++ b/src/CategoryProjector.php @@ -5,6 +5,7 @@ namespace Prooph\EventStoreBenchmarks; use Prooph\Common\Messaging\Message; +use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector; use Prooph\EventStore\Util\Assertion; use Ramsey\Uuid\Uuid; @@ -37,7 +38,15 @@ public function run() $start = \microtime(true); $uuid = Uuid::uuid4()->toString(); - $projection = $projectionManager->createProjection('category_projection_' . $uuid); + $projection = $projectionManager->createProjection( + 'category_projection_' . $uuid, + [ + PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 50, + PdoEventStoreProjector::OPTION_CACHE_SIZE => 50, + PdoEventStoreProjector::OPTION_SLEEP => 10000, + PdoEventStoreProjector::OPTION_PCNTL_DISPATCH => true, + ] + ); $projection ->init(function (): array { return ['count' => 0]; @@ -61,12 +70,13 @@ public function run() $avg = $this->stopAt / $time; outputText("Projection $this->id read $readEvents events"); - outputText("Projection $this->id used $time seconds, avg $avg events/second"); + outputText("Projection $this->id used $time seconds, avg $avg events/second " . getMemoryConsumption()); outputText("Projection $this->id checking integrity ...", true, ''); Assertion::eq($readEvents, 2500, 'Number of category projected events invalid: Value "%s" does not equal expected value "%s".'); outputText(" ok\n", false); } catch (\Throwable $e) { echo $e->getMessage() . PHP_EOL . $e->getTraceAsString(); + throw $e; } } } diff --git a/src/StreamCreator.php b/src/StreamCreator.php index f5daca0..eec9d7b 100644 --- a/src/StreamCreator.php +++ b/src/StreamCreator.php @@ -41,16 +41,24 @@ public function run() $start = \microtime(true); + $streamName = new StreamName($this->category . '-' . $this->id); + for ($i = 0; $i < $this->executions; $i++) { $count += $this->numberOfEvents; - $streamName = $this->category . '-' . Uuid::uuid4()->toString(); $events = createTestEvents(testPayload(), $this->numberOfEvents); if ($eventStore instanceof TransactionalEventStore) { $eventStore->beginTransaction(); } - $eventStore->create(new Stream(new StreamName($streamName), \SplFixedArray::fromArray($events))); + if (getenv('STREAM_STRATEGY') === 'Aggregate') { + $streamName = $this->category . '-' . Uuid::uuid4()->toString(); + $eventStore->create(new Stream(new StreamName($streamName), \SplFixedArray::fromArray($events))); + } elseif ($i === 0 && $eventStore->hasStream($streamName) === false) { + $eventStore->create(new Stream($streamName, \SplFixedArray::fromArray($events))); + } else { + $eventStore->appendTo($streamName, \SplFixedArray::fromArray($events)); + } if ($eventStore instanceof TransactionalEventStore) { $eventStore->commit(); @@ -65,12 +73,13 @@ public function run() $avg = ($this->executions * $this->numberOfEvents) / $time; outputText("Writer $this->id-$this->category wrote $this->eventsWritten events"); - outputText("Writer $this->id-$this->category used $time seconds, avg $avg events/second"); + outputText("Writer $this->id-$this->category used $time seconds, avg $avg events/second " . getMemoryConsumption()); outputText("Writer $this->id checking integrity ...", true, ''); Assertion::eq($count, $this->numberOfEvents * $this->executions, 'Number of writer events invalid: Value "%s" does not equal expected value "%s".'); outputText(" ok\n", false); } catch (\Throwable $e) { echo $e->getMessage() . PHP_EOL . $e->getTraceAsString(); + throw $e; } } } diff --git a/src/benchmark.php b/src/benchmark.php index 433b539..0096075 100644 --- a/src/benchmark.php +++ b/src/benchmark.php @@ -7,6 +7,7 @@ use Dotenv\Dotenv; use Prooph\Common\Messaging\Message; use Prooph\EventStore\EventStore; +use Prooph\EventStore\Pdo\Projection\PdoEventStoreProjector; use Prooph\EventStore\Projection\ProjectionManager; use Prooph\EventStore\Stream; use Prooph\EventStore\StreamName; @@ -143,7 +144,7 @@ $eventsPerSecond = 2500 / $time; outputText("test 3 using $name took $time seconds"); - outputText("test 3 using $name writes $eventsPerSecond events per second\n"); + outputText("test 3 using $name writes $eventsPerSecond events per second"); outputText('test 3 checking integrity ...', true, ''); checkWriteIntegrity($eventStore, $numberStreams[$name], $numberEvents[$name]); outputText(" ok\n", false); @@ -178,7 +179,15 @@ foreach ($projectionManagers as $name => $projectionManager) { /* @var ProjectionManager $projectionManager */ - $projection = $projectionManager->createProjection('test_projection_5'); + $projection = $projectionManager->createProjection( + 'test_projection_5', + [ + PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 50, + PdoEventStoreProjector::OPTION_CACHE_SIZE => 50, + PdoEventStoreProjector::OPTION_SLEEP => 10000, + PdoEventStoreProjector::OPTION_PCNTL_DISPATCH => true, + ] + ); $projection ->init(function (): array { return ['count' => 0]; @@ -213,7 +222,15 @@ foreach ($streamNamesTest1[$name] as $streamName) { $streamNames[] = $streamName->toString(); } - $projection = $projectionManager->createProjection('test_projection_6'); + $projection = $projectionManager->createProjection( + 'test_projection_6', + [ + PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 50, + PdoEventStoreProjector::OPTION_CACHE_SIZE => 50, + PdoEventStoreProjector::OPTION_SLEEP => 10000, + PdoEventStoreProjector::OPTION_PCNTL_DISPATCH => true, + ] + ); $projection ->init(function (): array { return ['count' => 0]; @@ -235,4 +252,5 @@ outputText('test 6 checking integrity ...', true, ''); Assertion::eq($projection->getState()['count'], 1000, 'Number of projected events invalid: Value "%s" does not equal expected value "%s".'); outputText(" ok\n", false); + outputText( "Mem usage/peak: " . getMemoryConsumption() . PHP_EOL, false); } diff --git a/src/functions.php b/src/functions.php index f996cf3..1e18dd2 100644 --- a/src/functions.php +++ b/src/functions.php @@ -4,12 +4,22 @@ namespace Prooph\EventStoreBenchmarks; -use ArangoDb\Connection; +use ArangoDb\Handler\Statement; +use ArangoDb\Http\Client; +use ArangoDb\Http\ClientOptions; +use ArangoDb\Http\TransactionalClient; +use ArangoDb\Http\TypeSupport; +use ArangoDb\Statement\ArrayStreamHandlerFactory; +use ArangoDb\Statement\StreamHandlerFactoryInterface; +use ArangoDb\Type\Batch; +use ArangoDb\Type\Database; +use Laminas\Diactoros\Request; +use Laminas\Diactoros\Response; +use Laminas\Diactoros\StreamFactory; use PDO; use Prooph\Common\Messaging\FQCNMessageFactory; -use Prooph\EventStore\ArangoDb\EventStore as ArangoDbEventStore; +use Prooph\EventStore\ArangoDb\ArangoDbTransactionalEventStore as ArangoDbEventStore; use Prooph\EventStore\ArangoDb\Projection\ProjectionManager as ArangoDbProjectionManager; -use Prooph\EventStore\ArangoDb\Type\DeleteCollection; use Prooph\EventStore\EventStore; use Prooph\EventStore\Pdo\MariaDbEventStore; use Prooph\EventStore\Pdo\MySqlEventStore; @@ -21,9 +31,17 @@ use Prooph\EventStore\StreamName; use Prooph\EventStore\Util\Assertion; use ProophTest\EventStore\Mock\TestDomainEvent; -use function Prooph\EventStore\ArangoDb\Fn\eventStreamsBatch; -use function Prooph\EventStore\ArangoDb\Fn\execute; -use function Prooph\EventStore\ArangoDb\Fn\projectionsBatch; +use Psr\Http\Message\RequestFactoryInterface; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseFactoryInterface; +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamFactoryInterface; +use function Prooph\EventStore\ArangoDb\Func\eventStreamsBatch; +use function Prooph\EventStore\ArangoDb\Func\projectionsBatch; + +$whoops = new \Whoops\Run; +$whoops->pushHandler(new \Whoops\Handler\PlainTextHandler()); +$whoops->register(); function testDatabases(): array { @@ -106,14 +124,10 @@ function createConnection(string $driver) return new PDO("pgsql:host=$host;port=$port;dbname=$dbName;options='--client_encoding=\"$charset\"'", $username, $password); case 'arangodb': - $connection = new Connection( - [ - Connection::HOST => \getenv('ARANGODB_HOST'), - Connection::MAX_CHUNK_SIZE => 64, - Connection::VST_VERSION => Connection::VST_VERSION_11, - ] + $connection = new TransactionalClient( + getArangoDbHttpClient(), + getResponseFactory() ); - $connection->connect(); return $connection; } @@ -135,21 +149,20 @@ function createDatabase($connection, string $driver, string $dbName): void break; case 'arangodb': - $result = $connection->get('/_api/collection?excludeSystem=1'); + // need own client to create database + $client = getArangoDbHttpClient(true); + $response = $client->sendType(Database::create($dbName)); - $collections = \json_decode($result->getBody(), true); - - if (\count($collections['result']) > 1) { - execute($connection, - null, - ...\array_map(function ($col) { - return DeleteCollection::with($col['name']); - }, $collections['result']) - ); + if ($response->getStatusCode() !== 201) { + throw new \RuntimeException('Database could not be created'); } - - execute($connection, null, ...eventStreamsBatch()); - execute($connection, null, ...projectionsBatch()); + $connection->sendType( + Batch::fromTypes(...eventStreamsBatch()) + ); + $connection->sendType( + Batch::fromTypes(...projectionsBatch()) + ); + sleep(10); break; default: throw new \RuntimeException(\sprintf('Driver "%s" not supported', $driver)); @@ -175,17 +188,13 @@ function destroyDatabase($connection, string $driver, string $dbName): void $connection->exec('DROP TABLE projections;'); break; case 'arangodb': - $result = $connection->get('/_api/collection?excludeSystem=1'); - - $collections = \json_decode($result->getBody(), true); + $type = 'application/' . (\getenv('USE_VPACK') === 'true' ? 'x-velocypack' : 'json'); + // need own client to create database + $client = getArangoDbHttpClient(true); + $response = $client->sendType(Database::delete($dbName)); - if (\count($collections['result']) > 1) { - execute($connection, - null, - ...\array_map(function ($col) { - return DeleteCollection::with($col['name']); - }, $collections['result']) - ); + if ($response->getStatusCode() !== 200) { + throw new \RuntimeException('Database could not be created'); } break; default: @@ -218,6 +227,7 @@ function createEventStore(string $driver, $connection): EventStore return new ArangoDbEventStore( new FQCNMessageFactory(), $connection, + new Statement(getArangoDbHttpClient(), getStreamHandlerFactory()), createStreamStrategy($driver) ); } @@ -244,7 +254,8 @@ function createProjectionManager(EventStore $eventStore, string $driver, $connec case 'arangodb': return new ArangoDbProjectionManager( $eventStore, - $connection + $connection, + new Statement($connection, getStreamHandlerFactory()) ); } } @@ -328,3 +339,72 @@ function outputText(string $text, bool $useDate = true, string $lineEnding = PHP echo $text . $lineEnding; } } + +function getArangoDbHttpClient($useSystemDatabase = false): TypeSupport +{ + $options = [ + ClientOptions::OPTION_ENDPOINT => getenv('ARANGODB_HOST'), + ClientOptions::OPTION_RECONNECT => true, + ]; + + if ($useSystemDatabase === false) { + $options[ClientOptions::OPTION_DATABASE] = getenv('ARANGODB_DB'); + } + + return new Client( + $options, + getRequestFactory(), + getResponseFactory(), + getStreamFactory() + ); +} + +function getResponseFactory(): ResponseFactoryInterface +{ + return new class implements ResponseFactoryInterface + { + public function createResponse(int $code = 200, string $reasonPhrase = ''): ResponseInterface + { + $response = new Response(); + + if ($reasonPhrase !== '') { + return $response->withStatus($code, $reasonPhrase); + } + + return $response->withStatus($code); + } + }; +} + +function getRequestFactory(): RequestFactoryInterface +{ + return new class implements RequestFactoryInterface + { + public function createRequest(string $method, $uri): RequestInterface + { + $type = 'application/json'; + + $request = new Request($uri, $method); + $request = $request->withAddedHeader('Content-Type', $type); + return $request->withAddedHeader('Accept', $type); + } + }; +} + +function getStreamFactory(): StreamFactoryInterface +{ + return new StreamFactory(); +} + +function getStreamHandlerFactory(): StreamHandlerFactoryInterface +{ + return new ArrayStreamHandlerFactory(); +} + +function getMemoryConsumption(): string +{ + $memUsage = memory_get_usage(); + $memPeak = memory_get_peak_usage(); + + return '(' . round($memUsage / 1024 / 1024, 2) . 'MB / ' . round($memPeak / 1024 / 1024, 2) . ' MB)'; +}