Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supprot the thread id filter binlog #77

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions binlog2sql/binlog2sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class Binlog2sql(object):

def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None,
def __init__(self, connection_settings, thread_id=0, start_file=None, start_pos=None, end_file=None, end_pos=None,
start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False,
flashback=False, stop_never=False, back_interval=1.0, only_dml=True, sql_type=None):
"""
Expand All @@ -23,6 +23,7 @@ def __init__(self, connection_settings, start_file=None, start_pos=None, end_fil
raise ValueError('Lack of parameter: start_file')

self.conn_setting = connection_settings
self.thread_id = thread_id
self.start_file = start_file
self.start_pos = start_pos if start_pos else 4 # use binlog v4
self.end_file = end_file if end_file else start_file
Expand Down Expand Up @@ -98,13 +99,15 @@ def process_binlog(self):

if isinstance(binlog_event, QueryEvent) and not self.only_dml:
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event,
flashback=self.flashback, no_pk=self.no_pk)
flashback=self.flashback, no_pk=self.no_pk,
thread_id=self.thread_id)
if sql:
print(sql)
elif is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type:
for row in binlog_event.rows:
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk,
row=row, flashback=self.flashback, e_start_pos=e_start_pos)
row=row, flashback=self.flashback, e_start_pos=e_start_pos,
thread_id=self.thread_id)
if self.flashback:
f_tmp.write(sql + '\n')
else:
Expand Down Expand Up @@ -142,7 +145,8 @@ def __del__(self):
if __name__ == '__main__':
args = command_line_args(sys.argv[1:])
conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}
binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos,
binlog2sql = Binlog2sql(connection_settings=conn_setting, thread_id=args.thread_id,
start_file=args.start_file, start_pos=args.start_pos,
end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time,
stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables,
no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never,
Expand Down
19 changes: 13 additions & 6 deletions binlog2sql/binlog2sql_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
UpdateRowsEvent,
DeleteRowsEvent,
)

BEGIN_QUERY_EVENT = None

if sys.version > '3':
PY3PLUS = True
Expand Down Expand Up @@ -65,6 +65,7 @@ def parse_args():
connect_setting.add_argument('-P', '--port', dest='port', type=int,
help='MySQL port to use', default=3306)
interval = parser.add_argument_group('interval filter')
interval.add_argument('--thread-id', dest='thread_id', type=int, help='binlog event thread_id', default=0)
interval.add_argument('--start-file', dest='start_file', type=str, help='Start binlog file to be parsed')
interval.add_argument('--start-position', '--start-pos', dest='start_pos', type=int,
help='Start position of the --start-file', default=4)
Expand Down Expand Up @@ -164,20 +165,26 @@ def event_type(event):
return t


def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, flashback=False, no_pk=False):
def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None,
flashback=False, no_pk=False, thread_id=0):
if flashback and no_pk:
raise ValueError('only one of flashback or no_pk can be True')
if not (isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent)
or isinstance(binlog_event, DeleteRowsEvent) or isinstance(binlog_event, QueryEvent)):
raise ValueError('binlog_event must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent')

sql = ''
global BEGIN_QUERY_EVENT
if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
BEGIN_QUERY_EVENT = binlog_event
if isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) \
or isinstance(binlog_event, DeleteRowsEvent):
pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk)
sql = cursor.mogrify(pattern['template'], pattern['values'])
time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
sql += ' #start %s end %s time %s' % (e_start_pos, binlog_event.packet.log_pos, time)
# Support filter by the thread_id
if (thread_id == 0) or (thread_id and BEGIN_QUERY_EVENT.slave_proxy_id == thread_id):
pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk)
sql = cursor.mogrify(pattern['template'], pattern['values'])
time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
sql += ' #start %s end %s time %s' % (e_start_pos, binlog_event.packet.log_pos, time)
elif flashback is False and isinstance(binlog_event, QueryEvent) and binlog_event.query != 'BEGIN' \
and binlog_event.query != 'COMMIT':
if binlog_event.schema:
Expand Down