Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #115 for mysql 8.4.0 #116

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "krowinski/php-mysql-replication",
"name": "hetao29/php-mysql-replication",
"description": "Pure PHP Implementation of MySQL replication protocol. This allow you to receive event like insert, update, delete with their data and raw SQL queries.",
"keywords": [
"mysql-replication",
Expand All @@ -17,7 +17,7 @@
"ext-json": "*",
"ext-sockets": "*",
"doctrine/collections": "^2.1",
"doctrine/dbal": "^3.8",
"doctrine/dbal": "^4.0",
"psr/log": "^3.0",
"psr/simple-cache": "^3.0",
"symfony/event-dispatcher": "^6.0|^7.0"
Expand All @@ -26,7 +26,7 @@
"kubawerlos/php-cs-fixer-custom-fixers": "^3.19",
"monolog/monolog": "^3.5",
"phpstan/phpstan": "^1.10",
"phpunit/phpunit": "^10.5",
"phpunit/phpunit": "^11.0",
"symplify/easy-coding-standard": "^12.1"
},
"license": "MIT",
Expand Down
7 changes: 6 additions & 1 deletion src/MySQLReplication/BinLog/BinLogSocketConnect.php
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,14 @@ private function getBinlogStream(): void
$this->executeSQL('SET @master_binlog_checksum = @@global.binlog_checksum');
}


if ($this->config->heartbeatPeriod > 0.00) {
// master_heartbeat_period is in nanoseconds
$this->executeSQL('SET @master_heartbeat_period = ' . $this->config->heartbeatPeriod * 1000000000);
if(version_compare($this->repository->getVersion(),"8.4.0")>=0){
$this->executeSQL('SET @source_heartbeat_period = ' . $this->config->heartbeatPeriod * 1000000000);
}else{
$this->executeSQL('SET @master_heartbeat_period = ' . $this->config->heartbeatPeriod * 1000000000);
}

$this->logger->info('Heartbeat period set to ' . $this->config->heartbeatPeriod . ' seconds');
}
Expand Down
22 changes: 16 additions & 6 deletions src/MySQLReplication/BinaryDataReader/BinaryDataReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,21 @@ public function readUInt24(): int
return $data[1] + ($data[2] << 8) + ($data[3] << 16);
}

public function readUInt64(): string
public function readUInt64(): string|int
{
return $this->unpackUInt64($this->read(self::UNSIGNED_INT64_LENGTH));
}

public function unpackUInt64(string $binary): string
public function unpackUInt64(string $binary): string|int
{
$data = self::unpack('V*', $binary);

return bcadd((string)$data[1], bcmul((string)$data[2], bcpow('2', '32')));
$num = bcadd((string)$data[1], bcmul((string)$data[2], bcpow('2', '32')));
if($num>PHP_INT_MAX || $num<PHP_INT_MIN){
return $num;
}else{
return intval($num);
}
}

public function readInt24(): int
Expand All @@ -138,11 +143,16 @@ public function readInt24(): int
return $res;
}

public function readInt64(): string
public function readInt64(): string|int
{
$data = self::unpack('V*', $this->read(self::UNSIGNED_INT64_LENGTH));

return bcadd((string)$data[1], (string)($data[2] << 32));
$num = bcadd((string)$data[1], (string)($data[2] << 32));
if($num>PHP_INT_MAX || $num<PHP_INT_MIN){
return $num;
}else{
return intval($num);
}
}

public function readLengthString(int $size): string
Expand Down Expand Up @@ -286,7 +296,7 @@ public function readDouble(): float

public function readTableId(): string
{
return $this->unpackUInt64($this->read(self::UNSIGNED_INT48_LENGTH) . chr(0) . chr(0));
return (string)$this->unpackUInt64($this->read(self::UNSIGNED_INT48_LENGTH) . chr(0) . chr(0));
}

public function isComplete(int $size): bool
Expand Down
2 changes: 1 addition & 1 deletion src/MySQLReplication/Event/RotateEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class RotateEvent extends EventCommon
{
public function makeRotateEventDTO(): RotateDTO
{
$binFilePos = $this->binaryDataReader->readUInt64();
$binFilePos = (string)$this->binaryDataReader->readUInt64();
$binFileName = $this->binaryDataReader->read(
$this->eventInfo->getSizeNoHeader() - $this->getSizeToRemoveByVersion()
);
Expand Down
2 changes: 1 addition & 1 deletion src/MySQLReplication/Event/RowEvent/RowEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public function makeUpdateRowsDTO(): ?UpdateRowsDTO

protected function findTableMap(): ?TableMap
{
$tableId = $this->binaryDataReader->readTableId();
$tableId = (string)$this->binaryDataReader->readTableId();
$this->binaryDataReader->advance(2);

if (in_array(
Expand Down
2 changes: 1 addition & 1 deletion src/MySQLReplication/Event/XidEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class XidEvent extends EventCommon
{
public function makeXidDTO(): XidDTO
{
return new XidDTO($this->eventInfo, $this->binaryDataReader->readUInt64());
return new XidDTO($this->eventInfo, (string)$this->binaryDataReader->readUInt64());
}
}
20 changes: 10 additions & 10 deletions src/MySQLReplication/Repository/MySQLRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,21 @@ public function isCheckSum(): bool

public function getVersion(): string
{
$r = '';
$versions = $this->getConnection()
->fetchAllAssociative('SHOW VARIABLES LIKE "version%"');

foreach ($versions as $version) {
$r .= $version['Value'];
}
$res = $this->getConnection()
->fetchAssociative('SHOW VARIABLES LIKE "version"');

return $r;
return $res['Value']??"";
}

public function getMasterStatus(): MasterStatusDTO
{
$data = $this->getConnection()
->fetchAssociative('SHOW MASTER STATUS');
if(version_compare($this->getVersion(),"8.4.0")>=0){
$data = $this->getConnection()
->fetchAssociative('SHOW BINARY LOG STATUS');
}else{
$data = $this->getConnection()
->fetchAssociative('SHOW MASTER STATUS');
}
if (empty($data)) {
throw new BinLogException(
MySQLReplicationException::BINLOG_NOT_ENABLED,
Expand Down
20 changes: 6 additions & 14 deletions tests/Unit/Repository/MySQLRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,14 @@ public function testShouldIsCheckSum(): void

public function testShouldGetVersion(): void
{
$expected = [
[
'Value' => 'foo',
],
[
'Value' => 'bar',
],
[
'Value' => '123',
],
];
$expected = [
'Value' => 'version',
];

$this->connection->method('fetchAllAssociative')
->willReturn($expected);
$this->connection->method('fetchAssociative')
->willReturn($expected);

self::assertEquals('foobar123', $this->mySQLRepositoryTest->getVersion());
self::assertEquals('version', $this->mySQLRepositoryTest->getVersion());
}

public function testShouldGetMasterStatus(): void
Expand Down