Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

include ClientSideHeartBeat #322

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pymysql
import struct
import time

from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
from pymysql.cursors import DictCursor
Expand Down Expand Up @@ -138,7 +139,7 @@ def __init__(self, connection_settings, server_id,
report_slave=None, slave_uuid=None,
pymysql_wrapper=None,
fail_on_table_metadata_unavailable=False,
slave_heartbeat=None):
slave_heartbeat=None,clientside_heartbeat=None):
"""
Attributes:
ctl_connection_settings: Connection settings for cluster holding
Expand Down Expand Up @@ -172,6 +173,8 @@ def __init__(self, connection_settings, server_id,
many event to skip in binlog). See
MASTER_HEARTBEAT_PERIOD in mysql documentation
for semantics
clientside_heartbeat: (seconds) Create a client side event when
reading stream to avoid stalled iterations.
"""

self.__connection_settings = connection_settings
Expand Down Expand Up @@ -213,6 +216,7 @@ def __init__(self, connection_settings, server_id,
self.report_slave = ReportSlave(report_slave)
self.slave_uuid = slave_uuid
self.slave_heartbeat = slave_heartbeat
self.clientside_heartbeat = clientside_heartbeat

if pymysql_wrapper:
self.pymysql_wrapper = pymysql_wrapper
Expand Down Expand Up @@ -416,7 +420,11 @@ def __connect_to_stream(self):
self.__connected_stream = True

def fetchone(self):
last_time = time.time()
while True:
if self.clientside_heartbeat is not None:
if time.time() - last_time > self.clientside_heartbeat:
return ClientSideHeartBeat()
if not self.__connected_stream:
self.__connect_to_stream()

Expand Down