Skip to content

Commit

Permalink
skip the first 2 events after reconnected
Browse files Browse the repository at this point in the history
* sleep 5s after disconnected accidently and skip the first 2
  events after reconnected
* some cleanup and compacting works
  • Loading branch information
woods-chen committed Dec 2, 2021
1 parent b615725 commit cbe2234
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pymysql
import struct
import time
from distutils.version import LooseVersion

from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
Expand Down Expand Up @@ -30,6 +31,7 @@
# 2006 MySQL server has gone away
MYSQL_EXPECTED_ERROR_CODES = [2013, 2006]

PYMYSQL_VERSION_LT_06 = pymysql.__version__ < LooseVersion("0.6")

class ReportSlave(object):

Expand Down Expand Up @@ -208,11 +210,6 @@ def __init__(self, connection_settings, server_id,
each time the client is disconnected and then auto-reconnected
to the mysql server (OperationalError 2006/2013) if resume_stream
is False. so it's suggested to set resume_stream to True.
an additional RotateEvent and FormatDescriptionEvent will be
fetched each time the client is disconnected and then auto-
reconnected to the server. (no matter resume_stream is True
or False)
"""

self.__connection_settings = connection_settings
Expand Down Expand Up @@ -263,7 +260,7 @@ def __init__(self, connection_settings, server_id,
self.pymysql_wrapper = pymysql.connect

def close(self):
if getattr(self, '_stream_connection', None) and self._stream_connection.open:
if self.__connected_stream:
self._stream_connection.close()
if getattr(self, '_ctl_connection', None):
# break reference cycle between stream reader and underlying
Expand Down Expand Up @@ -304,7 +301,7 @@ def _register_slave(self):

packet = self.report_slave.encoded(self.__server_id)

if pymysql.__version__ < LooseVersion("0.6"):
if PYMYSQL_VERSION_LT_06:
self._stream_connection.wfile.write(packet)
self._stream_connection.wfile.flush()
self._stream_connection.read_packet()
Expand Down Expand Up @@ -334,7 +331,7 @@ def __connect_to_stream(self, force_reconnect=False):
# server_id (4) -- server id of this slave
# log_file (string.EOF) -- filename of the binlog on the master
self._stream_connection = self.pymysql_wrapper(**self.__connection_settings)
if pymysql.__version__ < LooseVersion("0.6"):
if PYMYSQL_VERSION_LT_06:
self._stream_connection._read_packet = self._stream_connection.read_packet

self.__use_checksum = self.__checksum_enabled()
Expand Down Expand Up @@ -467,7 +464,7 @@ def __connect_to_stream(self, force_reconnect=False):
# encoded_data
prelude += gtid_set.encoded()

if pymysql.__version__ < LooseVersion("0.6"):
if PYMYSQL_VERSION_LT_06:
self._stream_connection.wfile.write(prelude)
self._stream_connection.wfile.flush()
else:
Expand All @@ -493,7 +490,10 @@ def __fetchone(self):
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
time.sleep(5)
self.__connect_to_stream(force_reconnect=True)
# skip the first 2 events (RotateEvent and FormatDescriptionEvent)
_ = self.__fetchone(), self.__fetchone()
continue
raise

Expand Down

0 comments on commit cbe2234

Please sign in to comment.