From 8fed1ed5b078f15ad9c8405b5cc4f0dc912a6392 Mon Sep 17 00:00:00 2001 From: woods-chen Date: Thu, 2 Dec 2021 00:55:03 +0000 Subject: [PATCH] skip the first 2 events after reconnected * sleep 5s after disconnected accidently and skip the first 2 events after reconnected * some cleanup and compacting works --- pymysqlreplication/binlogstream.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index a69b5e5e..15bcbcf0 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -2,6 +2,7 @@ import pymysql import struct +import time from distutils.version import LooseVersion from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE @@ -30,6 +31,7 @@ # 2006 MySQL server has gone away MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] +__PYMYSQL_VERSION_LT_06 = pymysql.__version__ < LooseVersion("0.6") class ReportSlave(object): @@ -208,11 +210,6 @@ def __init__(self, connection_settings, server_id, each time the client is disconnected and then auto-reconnected to the mysql server (OperationalError 2006/2013) if resume_stream is False. so it's suggested to set resume_stream to True. - - an additional RotateEvent and FormatDescriptionEvent will be - fetched each time the client is disconnected and then auto- - reconnected to the server. (no matter resume_stream is True - or False) """ self.__connection_settings = connection_settings @@ -263,7 +260,7 @@ def __init__(self, connection_settings, server_id, self.pymysql_wrapper = pymysql.connect def close(self): - if getattr(self, '_stream_connection', None) and self._stream_connection.open: + if self.__connected_stream: self._stream_connection.close() if getattr(self, '_ctl_connection', None): # break reference cycle between stream reader and underlying @@ -304,7 +301,7 @@ def _register_slave(self): packet = self.report_slave.encoded(self.__server_id) - if pymysql.__version__ < LooseVersion("0.6"): + if __PYMYSQL_VERSION_LT_06: self._stream_connection.wfile.write(packet) self._stream_connection.wfile.flush() self._stream_connection.read_packet() @@ -334,7 +331,7 @@ def __connect_to_stream(self, force_reconnect=False): # server_id (4) -- server id of this slave # log_file (string.EOF) -- filename of the binlog on the master self._stream_connection = self.pymysql_wrapper(**self.__connection_settings) - if pymysql.__version__ < LooseVersion("0.6"): + if __PYMYSQL_VERSION_LT_06: self._stream_connection._read_packet = self._stream_connection.read_packet self.__use_checksum = self.__checksum_enabled() @@ -467,7 +464,7 @@ def __connect_to_stream(self, force_reconnect=False): # encoded_data prelude += gtid_set.encoded() - if pymysql.__version__ < LooseVersion("0.6"): + if __PYMYSQL_VERSION_LT_06: self._stream_connection.wfile.write(prelude) self._stream_connection.wfile.flush() else: @@ -493,7 +490,10 @@ def __fetchone(self): except pymysql.OperationalError as error: code, message = error.args if code in MYSQL_EXPECTED_ERROR_CODES: + time.sleep(5) self.__connect_to_stream(force_reconnect=True) + # skip the first 2 events (RotateEvent and FormatDescriptionEvent) + _ = self.__fetchone(), self.__fetchone() continue raise