From 23daae1a6e4dd25e35e8b3f12ca5b4eaa9d38873 Mon Sep 17 00:00:00 2001 From: Takayasu Oyama Date: Mon, 25 Mar 2024 17:15:37 +0900 Subject: [PATCH] feature: INSERT OR UPDATE (upsert) support (#203) --- CHANGELOG.md | 5 ++ phpstan.neon | 3 ++ src/Connection.php | 58 +++++++++++++++++++-- src/Query/Builder.php | 28 ++++++++++ src/Query/Concerns/UsesMutations.php | 22 +++++--- src/Query/Grammar.php | 9 ++++ tests/Query/BuilderTest.php | 77 ++++++++++++++++++++++++++-- 7 files changed, 187 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b8ce019..37cc8e97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# v7.2.0 (Not released yet) + +Added +- Support for `Query\Builder::upsert()` (#203) + # v7.1.0 (2024-03-11) Changed diff --git a/phpstan.neon b/phpstan.neon index d42e5cff..e0d4e541 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -41,3 +41,6 @@ parameters: - message: "#^Method Colopl\\\\Spanner\\\\SpannerServiceProvider\\:\\:parseConfig\\(\\) should return array\\{name\\: string, instance\\: string, database\\: string, prefix\\: string, cache_path\\: string|null, session_pool\\: array\\\\} but returns non\\-empty\\-array\\\\.$#" count: 1 path: src/SpannerServiceProvider.php + - message: "#^Expression on left side of \\?\\? is not nullable\\.$#" + count: 1 + path: src/Connection.php diff --git a/src/Connection.php b/src/Connection.php index 2876c052..4a1f115a 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -28,6 +28,7 @@ use Exception; use Generator; use Google\Cloud\Core\Exception\AbortedException; +use Google\Cloud\Core\Exception\ConflictException; use Google\Cloud\Core\Exception\GoogleException; use Google\Cloud\Core\Exception\NotFoundException; use Google\Cloud\Spanner\Database; @@ -323,11 +324,9 @@ public function affectingStatement($query, $bindings = []): int throw new RuntimeException('Tried to run update outside of transaction! Affecting statements must be done inside a transaction'); } - $rowCount = $transaction->executeUpdate($query, ['parameters' => $this->prepareBindings($bindings)]); - - $this->recordsHaveBeenModified($rowCount > 0); - - return $rowCount; + return $this->shouldRunAsBatchDml($query) + ? $this->executeBatchDml($transaction, $query, $bindings) + : $this->executeDml($transaction, $query, $bindings); }); }; @@ -595,6 +594,55 @@ protected function executePartitionedQuery(string $query, array $options): Gener } } + /** + * @param Transaction $transaction + * @param string $query + * @param list $bindings + * @return int + */ + protected function executeDml(Transaction $transaction, string $query, array $bindings = []): int + { + $rowCount = $transaction->executeUpdate($query, ['parameters' => $this->prepareBindings($bindings)]); + $this->recordsHaveBeenModified($rowCount > 0); + return $rowCount; + } + + /** + * @param Transaction $transaction + * @param string $query + * @param list $bindings + * @return int + */ + protected function executeBatchDml(Transaction $transaction, string $query, array $bindings = []): int + { + $result = $transaction->executeUpdateBatch([ + ['sql' => $query, 'parameters' => $this->prepareBindings($bindings)] + ]); + + $error = $result->error(); + if ($error !== null) { + throw new ConflictException( + $error['status']['message'] ?? '', + $error['status']['code'] ?? 0, + null, + ['details' => $error['details'] ?? []], + ); + } + + $rowCount = array_sum($result->rowCounts() ?? []); + $this->recordsHaveBeenModified($rowCount > 0); + return $rowCount; + } + + /** + * @param string $query + * @return bool + */ + protected function shouldRunAsBatchDml(string $query): bool + { + return stripos($query, 'insert or ') === 0; + } + /** * @template T * @param Closure(): T $callback diff --git a/src/Query/Builder.php b/src/Query/Builder.php index 948df960..7f13f86b 100644 --- a/src/Query/Builder.php +++ b/src/Query/Builder.php @@ -17,6 +17,7 @@ namespace Colopl\Spanner\Query; +use Closure; use Colopl\Spanner\Connection; use Illuminate\Contracts\Support\Arrayable; use Illuminate\Database\Query\Builder as BaseBuilder; @@ -55,6 +56,33 @@ public function updateOrInsert(array $attributes, array $values = []) return (bool) $this->take(1)->update(Arr::except($values, array_keys($attributes))); } + /** + * @inheritDoc + */ + public function upsert(array $values, $uniqueBy = [], $update = null) + { + if (empty($values)) { + return 0; + } + + if (! array_is_list($values)) { + $values = [$values]; + } else { + foreach ($values as $key => $value) { + ksort($value); + + $values[$key] = $value; + } + } + + $this->applyBeforeQueryCallbacks(); + + return $this->connection->affectingStatement( + $this->grammar->compileUpsert($this, $values, [], []), + $this->cleanBindings(Arr::flatten($values, 1)), + ); + } + /** * @inheritDoc */ diff --git a/src/Query/Concerns/UsesMutations.php b/src/Query/Concerns/UsesMutations.php index 7d1afbf4..56f11522 100644 --- a/src/Query/Concerns/UsesMutations.php +++ b/src/Query/Concerns/UsesMutations.php @@ -26,21 +26,21 @@ trait UsesMutations { /** - * @param array $values + * @param array $values * @return void */ public function insertUsingMutation(array $values) { - $this->connection->insertUsingMutation($this->from, $values); + $this->connection->insertUsingMutation($this->getTableName(), $values); } /** - * @param array $values + * @param array $values * @return void */ public function updateUsingMutation(array $values) { - $this->connection->updateUsingMutation($this->from, $values); + $this->connection->updateUsingMutation($this->getTableName(), $values); } /** @@ -49,15 +49,23 @@ public function updateUsingMutation(array $values) */ public function insertOrUpdateUsingMutation(array $values) { - $this->connection->insertOrUpdateUsingMutation($this->from, $values); + $this->connection->insertOrUpdateUsingMutation($this->getTableName(), $values); } /** - * @param array|KeySet $keys + * @param list|KeySet $keys * @return void */ public function deleteUsingMutation($keys) { - $this->connection->deleteUsingMutation($this->from, $keys); + $this->connection->deleteUsingMutation($this->getTableName(), $keys); + } + + /** + * @return string + */ + private function getTableName(): string + { + return (string) $this->getGrammar()->getValue($this->from); } } diff --git a/src/Query/Grammar.php b/src/Query/Grammar.php index bc2c9af3..6927adbc 100644 --- a/src/Query/Grammar.php +++ b/src/Query/Grammar.php @@ -22,6 +22,7 @@ use Illuminate\Database\Query\Builder; use Illuminate\Contracts\Database\Query\Expression; use Illuminate\Database\Query\Grammars\Grammar as BaseGrammar; +use Illuminate\Support\Str; use RuntimeException; class Grammar extends BaseGrammar @@ -29,6 +30,14 @@ class Grammar extends BaseGrammar use MarksAsNotSupported; use SharedGrammarCalls; + /** + * @inheritDoc + */ + public function compileUpsert(Builder $query, array $values, array $uniqueBy, array $update) + { + return Str::replaceFirst('insert', 'insert or update', $this->compileInsert($query, $values)); + } + /** * @inheritDoc */ diff --git a/tests/Query/BuilderTest.php b/tests/Query/BuilderTest.php index 8cadfc97..a6790737 100644 --- a/tests/Query/BuilderTest.php +++ b/tests/Query/BuilderTest.php @@ -22,12 +22,11 @@ use Colopl\Spanner\Schema\Blueprint; use Colopl\Spanner\Tests\TestCase; use Colopl\Spanner\TimestampBound\ExactStaleness; +use Google\Cloud\Core\Exception\ConflictException; use Google\Cloud\Spanner\Bytes; use Google\Cloud\Spanner\Duration; -use Illuminate\Support\Str; use Illuminate\Database\QueryException; use Illuminate\Support\Carbon; -use Illuminate\Support\Collection; use LogicException; use const Grpc\STATUS_ALREADY_EXISTS; @@ -578,6 +577,78 @@ public function testInsertDatetime(): void $this->assertSame($carbonMax->getTimestamp(), $insertedTimestamp->getTimestamp()); } + public function test_upsert_single_row(): void + { + $table = __FUNCTION__; + $conn = $this->getDefaultConnection(); + $sb = $conn->getSchemaBuilder(); + $sb->create($table, function (Blueprint $table) { + $table->integer('id')->primary(); + $table->string('s', 1); + }); + + $query = $conn->table($table); + $this->assertSame(1, $query->upsert(['id' => 1, 's' => 'a'])); + $this->assertSame(1, $query->upsert(['id' => 1, 's' => 'b'])); + + $this->assertSame(['id' => 1, 's' => 'b'], (array) $query->sole()); + } + + public function test_upsert_multi_row(): void + { + $table = __FUNCTION__; + $conn = $this->getDefaultConnection(); + $sb = $conn->getSchemaBuilder(); + $sb->create($table, function (Blueprint $table) { + $table->integer('id')->primary(); + $table->string('s', 1); + }); + + $query = $conn->table($table); + + // insert + $this->assertSame(2, $query->upsert([ + ['id' => 1, 's' => 'a'], + ['id' => 2, 's' => 'b'], + ])); + + // update (no change x1, change x1, insert x1) + $this->assertSame(3, $query->upsert([ + ['id' => 1, 's' => 'a'], + ['id' => 2, 's' => '_'], + ['id' => 3, 's' => 'c'], + ])); + + $this->assertSame(['a', '_', 'c'], $query->orderBy('id')->pluck('s')->all()); + } + + public function test_upsert_throw_error(): void + { + $table = __FUNCTION__; + $conn = $this->getDefaultConnection(); + $sb = $conn->getSchemaBuilder(); + $sb->create($table, function (Blueprint $table) { + $table->integer('id')->primary(); + $table->string('s', 1); + }); + + $query = $conn->table($table); + $exceptionThrown = false; + try { + $query->upsert([ + ['id' => 1, 's' => 'a'], + ['id' => 2, 's' => 'bb'], + ]); + } catch (QueryException $e) { + $this->assertSame(9, $e->getCode()); + $this->assertStringContainsString('New value exceeds the maximum size limit', $e->getMessage()); + $exceptionThrown = true; + } + + $this->assertTrue($exceptionThrown); + $this->assertSame(0, $query->count()); + } + public function testWhereDatetime(): void { date_default_timezone_set('Asia/Tokyo'); @@ -818,7 +889,7 @@ public function test_whereLike(): void $caughtException = $ex; } if (getenv('SPANNER_EMULATOR_HOST')) { - $this->assertStringContainsString('INTERNAL', $caughtException?->getMessage()); + $this->assertStringContainsString('Invalid UTF-8', $caughtException?->getMessage()); } else { $this->assertStringContainsString('Invalid request proto: an error was encountered during deserialization of the request proto.', $caughtException?->getMessage()); }