Description
Hello,
Using v2.1.3 of Amphp/Mysql, I need to execute queries on multiple db servers, with the following requirements:
- queries run in parallel: one query per db server
- if one or more server is in error (network issue, server down, sql error...) OR one or more servers take more than a given time to respond: ignore the response, execution should continue with the results from the other servers not in error or timeout
- always close the opened db connection
The current implementation uses Amp\Promise\timeout
to wrap the execution of each query, and AmpPromise\any
combinator to execute the promises in parallel:
echo 'Begin' . PHP_EOL;
$start = microtime(true);
$globalPromise = Amp\call(function () use ($start) {
$servers = ['server1', 'server2'];
$user = 'db_user';
$pass = 'db_pass';
$dbName = 'db_name';
$promises = [];
foreach ($servers as $server) {
// Create the promise for a single server
$serverPromise = Amp\call(function () use ($start, $server, $user, $pass, $dbName): \Generator {
try {
echo "Executing {$server} promise" . PHP_EOL;
// 1 second query for server1, slow 100 seconds query for server2
$query = $server === 'server1' ? 'SELECT SLEEP(1)' : 'SELECT SLEEP(100)';
// Retrieve the db connection
$serverConfig = Amp\Mysql\ConnectionConfig::fromString("host={$server};user={$user};pass={$pass};db={$dbName}");
$db = Amp\Mysql\pool($serverConfig);
// Wrap the execution of the query in a 2 seconds timeout promise
$timeoutDbPromise = Amp\Promise\timeout($db->execute($query), 2000);
// Return the result when the promise resolves
return yield $timeoutDbPromise;
} catch (\Throwable $th) {
// Catch connection errors, sql errors, Amp\TimeoutException...
$errorTime = microtime(true) - $start;
echo 'Caught: "' . $th->getMessage() . '" on ' . $server . ' after ' . $errorTime . PHP_EOL;
// Rethrow to be handled by Amp\Promise\any combinator below
throw $th;
} finally {
// Always try to close the connection to the db to prevent clogging mysql with useless connections
if (isset($db)) {
$db->close(); // However this does not close the connection when there is still a pending query
}
}
});
// Add the server promise to the list of all promises
$promises[] = $serverPromise;
}
// Wait for all promises to settle and return the result
[$errors, $resultSets] = yield Amp\Promise\any($promises);
return [$errors, $resultSets];
});
$res = Amp\Promise\wait($globalPromise);
$time = microtime(true) - $start;
echo 'Got ' . count($res[1]) . ' results and ' . count($res[0]) . ' errors' . PHP_EOL;
echo "Done in {$time}" . PHP_EOL;
This will output
Begin
Executing server1 promise
Executing server2 promise
Caught: "Operation timed out" on server2 after 2.0047299861908
Got 1 result and 1 error
Done in 2.004860162735
The only thing that is not working as expected is that when $db->close
is called in the finally, it does not close the connection if a query is still pending.
In this test case example with SLEEP()
, the long query is still visible in mysql SHOW FULL PROCESSLIST
, and after some time the connection is aborted and SHOW GLOBAL STATUS
's Aborted_clients
is incremented by 1.
(In my practical case the issue is even worse because the table is in a metadata lock state so the query end up 'waiting for metadata lock', and the connection is never aborted: the query stays in queue until the lock is released, meanwhile other queries try to get connections to the db and in the end the server reaches 'Too many connections')
Looking into the internals of Amphp\Mysql
, it seems the $db->close()
method ends up calling Amp\Mysql\Internal\Processor->sendClose()
which itself calls startCommand()
with a callback to actually send the COM_QUIT signal to the server. startCommand
appends the task to the queue, but as the initial request is still pending, it seems the callback is never called and the connection is left open.
Is there a way to force the connection to close even when there is a pending query?
Or is there something wrong with this implementation?
Thanks