Skip to content

Commit

Permalink
feature: INSERT OR UPDATE (upsert) support (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
taka-oyama authored Mar 25, 2024
1 parent 092b135 commit 23daae1
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 15 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# v7.2.0 (Not released yet)

Added
- Support for `Query\Builder::upsert()` (#203)

# v7.1.0 (2024-03-11)

Changed
Expand Down
3 changes: 3 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ parameters:
- message: "#^Method Colopl\\\\Spanner\\\\SpannerServiceProvider\\:\\:parseConfig\\(\\) should return array\\{name\\: string, instance\\: string, database\\: string, prefix\\: string, cache_path\\: string|null, session_pool\\: array\\<string, mixed\\>\\} but returns non\\-empty\\-array\\<string, mixed\\>\\.$#"
count: 1
path: src/SpannerServiceProvider.php
- message: "#^Expression on left side of \\?\\? is not nullable\\.$#"
count: 1
path: src/Connection.php
58 changes: 53 additions & 5 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Exception;
use Generator;
use Google\Cloud\Core\Exception\AbortedException;
use Google\Cloud\Core\Exception\ConflictException;
use Google\Cloud\Core\Exception\GoogleException;
use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Spanner\Database;
Expand Down Expand Up @@ -323,11 +324,9 @@ public function affectingStatement($query, $bindings = []): int
throw new RuntimeException('Tried to run update outside of transaction! Affecting statements must be done inside a transaction');
}

$rowCount = $transaction->executeUpdate($query, ['parameters' => $this->prepareBindings($bindings)]);

$this->recordsHaveBeenModified($rowCount > 0);

return $rowCount;
return $this->shouldRunAsBatchDml($query)
? $this->executeBatchDml($transaction, $query, $bindings)
: $this->executeDml($transaction, $query, $bindings);
});
};

Expand Down Expand Up @@ -595,6 +594,55 @@ protected function executePartitionedQuery(string $query, array $options): Gener
}
}

/**
* @param Transaction $transaction
* @param string $query
* @param list<mixed> $bindings
* @return int
*/
protected function executeDml(Transaction $transaction, string $query, array $bindings = []): int
{
$rowCount = $transaction->executeUpdate($query, ['parameters' => $this->prepareBindings($bindings)]);
$this->recordsHaveBeenModified($rowCount > 0);
return $rowCount;
}

/**
* @param Transaction $transaction
* @param string $query
* @param list<mixed> $bindings
* @return int
*/
protected function executeBatchDml(Transaction $transaction, string $query, array $bindings = []): int
{
$result = $transaction->executeUpdateBatch([
['sql' => $query, 'parameters' => $this->prepareBindings($bindings)]
]);

$error = $result->error();
if ($error !== null) {
throw new ConflictException(
$error['status']['message'] ?? '',
$error['status']['code'] ?? 0,
null,
['details' => $error['details'] ?? []],
);
}

$rowCount = array_sum($result->rowCounts() ?? []);
$this->recordsHaveBeenModified($rowCount > 0);
return $rowCount;
}

/**
* @param string $query
* @return bool
*/
protected function shouldRunAsBatchDml(string $query): bool
{
return stripos($query, 'insert or ') === 0;
}

/**
* @template T
* @param Closure(): T $callback
Expand Down
28 changes: 28 additions & 0 deletions src/Query/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Colopl\Spanner\Query;

use Closure;
use Colopl\Spanner\Connection;
use Illuminate\Contracts\Support\Arrayable;
use Illuminate\Database\Query\Builder as BaseBuilder;
Expand Down Expand Up @@ -55,6 +56,33 @@ public function updateOrInsert(array $attributes, array $values = [])
return (bool) $this->take(1)->update(Arr::except($values, array_keys($attributes)));
}

/**
* @inheritDoc
*/
public function upsert(array $values, $uniqueBy = [], $update = null)
{
if (empty($values)) {
return 0;
}

if (! array_is_list($values)) {
$values = [$values];
} else {
foreach ($values as $key => $value) {
ksort($value);

$values[$key] = $value;
}
}

$this->applyBeforeQueryCallbacks();

return $this->connection->affectingStatement(
$this->grammar->compileUpsert($this, $values, [], []),
$this->cleanBindings(Arr::flatten($values, 1)),
);
}

/**
* @inheritDoc
*/
Expand Down
22 changes: 15 additions & 7 deletions src/Query/Concerns/UsesMutations.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@
trait UsesMutations
{
/**
* @param array $values
* @param array<string, mixed> $values
* @return void
*/
public function insertUsingMutation(array $values)
{
$this->connection->insertUsingMutation($this->from, $values);
$this->connection->insertUsingMutation($this->getTableName(), $values);
}

/**
* @param array $values
* @param array<string, mixed> $values
* @return void
*/
public function updateUsingMutation(array $values)
{
$this->connection->updateUsingMutation($this->from, $values);
$this->connection->updateUsingMutation($this->getTableName(), $values);
}

/**
Expand All @@ -49,15 +49,23 @@ public function updateUsingMutation(array $values)
*/
public function insertOrUpdateUsingMutation(array $values)
{
$this->connection->insertOrUpdateUsingMutation($this->from, $values);
$this->connection->insertOrUpdateUsingMutation($this->getTableName(), $values);
}

/**
* @param array|KeySet $keys
* @param list<string>|KeySet $keys
* @return void
*/
public function deleteUsingMutation($keys)
{
$this->connection->deleteUsingMutation($this->from, $keys);
$this->connection->deleteUsingMutation($this->getTableName(), $keys);
}

/**
* @return string
*/
private function getTableName(): string
{
return (string) $this->getGrammar()->getValue($this->from);
}
}
9 changes: 9 additions & 0 deletions src/Query/Grammar.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@
use Illuminate\Database\Query\Builder;
use Illuminate\Contracts\Database\Query\Expression;
use Illuminate\Database\Query\Grammars\Grammar as BaseGrammar;
use Illuminate\Support\Str;
use RuntimeException;

class Grammar extends BaseGrammar
{
use MarksAsNotSupported;
use SharedGrammarCalls;

/**
* @inheritDoc
*/
public function compileUpsert(Builder $query, array $values, array $uniqueBy, array $update)
{
return Str::replaceFirst('insert', 'insert or update', $this->compileInsert($query, $values));
}

/**
* @inheritDoc
*/
Expand Down
77 changes: 74 additions & 3 deletions tests/Query/BuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
use Colopl\Spanner\Schema\Blueprint;
use Colopl\Spanner\Tests\TestCase;
use Colopl\Spanner\TimestampBound\ExactStaleness;
use Google\Cloud\Core\Exception\ConflictException;
use Google\Cloud\Spanner\Bytes;
use Google\Cloud\Spanner\Duration;
use Illuminate\Support\Str;
use Illuminate\Database\QueryException;
use Illuminate\Support\Carbon;
use Illuminate\Support\Collection;
use LogicException;

use const Grpc\STATUS_ALREADY_EXISTS;
Expand Down Expand Up @@ -578,6 +577,78 @@ public function testInsertDatetime(): void
$this->assertSame($carbonMax->getTimestamp(), $insertedTimestamp->getTimestamp());
}

public function test_upsert_single_row(): void
{
$table = __FUNCTION__;
$conn = $this->getDefaultConnection();
$sb = $conn->getSchemaBuilder();
$sb->create($table, function (Blueprint $table) {
$table->integer('id')->primary();
$table->string('s', 1);
});

$query = $conn->table($table);
$this->assertSame(1, $query->upsert(['id' => 1, 's' => 'a']));
$this->assertSame(1, $query->upsert(['id' => 1, 's' => 'b']));

$this->assertSame(['id' => 1, 's' => 'b'], (array) $query->sole());
}

public function test_upsert_multi_row(): void
{
$table = __FUNCTION__;
$conn = $this->getDefaultConnection();
$sb = $conn->getSchemaBuilder();
$sb->create($table, function (Blueprint $table) {
$table->integer('id')->primary();
$table->string('s', 1);
});

$query = $conn->table($table);

// insert
$this->assertSame(2, $query->upsert([
['id' => 1, 's' => 'a'],
['id' => 2, 's' => 'b'],
]));

// update (no change x1, change x1, insert x1)
$this->assertSame(3, $query->upsert([
['id' => 1, 's' => 'a'],
['id' => 2, 's' => '_'],
['id' => 3, 's' => 'c'],
]));

$this->assertSame(['a', '_', 'c'], $query->orderBy('id')->pluck('s')->all());
}

public function test_upsert_throw_error(): void
{
$table = __FUNCTION__;
$conn = $this->getDefaultConnection();
$sb = $conn->getSchemaBuilder();
$sb->create($table, function (Blueprint $table) {
$table->integer('id')->primary();
$table->string('s', 1);
});

$query = $conn->table($table);
$exceptionThrown = false;
try {
$query->upsert([
['id' => 1, 's' => 'a'],
['id' => 2, 's' => 'bb'],
]);
} catch (QueryException $e) {
$this->assertSame(9, $e->getCode());
$this->assertStringContainsString('New value exceeds the maximum size limit', $e->getMessage());
$exceptionThrown = true;
}

$this->assertTrue($exceptionThrown);
$this->assertSame(0, $query->count());
}

public function testWhereDatetime(): void
{
date_default_timezone_set('Asia/Tokyo');
Expand Down Expand Up @@ -818,7 +889,7 @@ public function test_whereLike(): void
$caughtException = $ex;
}
if (getenv('SPANNER_EMULATOR_HOST')) {
$this->assertStringContainsString('INTERNAL', $caughtException?->getMessage());
$this->assertStringContainsString('Invalid UTF-8', $caughtException?->getMessage());
} else {
$this->assertStringContainsString('Invalid request proto: an error was encountered during deserialization of the request proto.', $caughtException?->getMessage());
}
Expand Down

0 comments on commit 23daae1

Please sign in to comment.