Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made QueryEvent decode options class variables #296

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

joy13975
Copy link

QueryEvent decodes the packet using utf-8, which is fine except it's hardcoded and there's no option to set the errors argument for decode().

In a project I'm working on, we need cp932 plus the errors='ignore' argument. In general it can just be set through class variables right after importing them.

@baloo
Copy link
Collaborator

baloo commented Jul 14, 2019

That's a elegant solution to a problem I've been complaining about for way too long :D So thank you!

Would you mind sharing how you use this? Are you using subclass or just injecting QueryEvent.charset as a global variable?

Re-pushed because I forgot to add self. to charset and on_errors.
Also added keywords to make it more explicit.
@joy13975
Copy link
Author

joy13975 commented Jul 15, 2019

Sorry about the re-pushes. It was a bit late and I missed the self. in the arguments and wrote in bad style.

@baloo Just set the proper charset/on_errors right after import. An example that I've tested is as follows:

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import QueryEvent
QueryEvent.charset = 'cp932'
QueryEvent.on_errors = 'ignore'
stream = BinLogStreamReader(
    connection_settings={'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': 'nopasswd'},
    blocking=True,
    server_id=100)
for e in stream:
    e.dump()

For those not sure why this is needed, try shuffing a

create table test_table (some_column text comment 'コラムおおおおお') engine=innodb;

using cp932 (e.g. using iconv -t cp932) then running the above test script with the charset and on_errors settings commented out. You should get a

UnicodeDecodeError: 'utf8' codec can't decode byte 0x83 in position 47: invalid start byte

@baloo
Copy link
Collaborator

baloo commented Jul 15, 2019

On second thoughts, I would prefer to not call decode all together, break the api, and store and reply bytestring to the user of this.

This change as you propose would make it impossible to have different charset per schema, and I would sincerely prefer to push that responsability to the consumer of the api.
Just wondering, have you check binlog recently see if the charset was specified in the transaction in the replication stream or something? That's what I would expect, but I haven't checked.

@noplay any thoughts?

@julien-duponchelle
Copy link
Owner

julien-duponchelle commented Jul 15, 2019 via email

@baloo
Copy link
Collaborator

baloo commented Jul 16, 2019

I got this parsed:

diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py
index 1ee7655e5ca9a..385b7dbae0c6a 100644
--- a/pymysqlreplication/binlogstream.py
+++ b/pymysqlreplication/binlogstream.py
@@ -8,7 +8,7 @@ from pymysql.cursors import DictCursor
 from pymysql.util import int2byte

 from .packet import BinLogPacketWrapper
-from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
+from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, START_EVENT_V3, FORMAT_DESCRIPTION_EVENT
 from .gtid import GtidSet
 from .event import (
     QueryEvent, RotateEvent, FormatDescriptionEvent,
@@ -219,6 +219,8 @@ class BinLogStreamReader(object):
         else:
             self.pymysql_wrapper = pymysql.connect

+        self.binlog_version = None
+
     def close(self):
         if self.__connected_stream:
             self._stream_connection.close()
@@ -452,9 +454,14 @@ class BinLogStreamReader(object):
                                                self.__only_schemas,
                                                self.__ignored_schemas,
                                                self.__freeze_schema,
-                                               self.__fail_on_table_metadata_unavailable)
-
-            if binlog_event.event_type == ROTATE_EVENT:
+                                               self.__fail_on_table_metadata_unavailable,
+                                               self.binlog_version)
+
+            if binlog_event.event_type == FORMAT_DESCRIPTION_EVENT:
+                self.binlog_version = binlog_event.event.version
+            elif binlog_event.event_type == START_EVENT_V3:
+                self.binlog_version = binlog_event.event.version
+            elif binlog_event.event_type == ROTATE_EVENT:
                 self.log_pos = binlog_event.event.position
                 self.log_file = binlog_event.event.next_binlog
                 # Table Id in binlog are NOT persistent in MySQL - they are in-memory identifiers
diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py
index 97ff58172cb55..4e90faf95f3d7 100644
--- a/pymysqlreplication/event.py
+++ b/pymysqlreplication/event.py
@@ -3,6 +3,12 @@
 import binascii
 import struct
 import datetime
+import warnings
+
+try:
+    from io import BytesIO
+except ImportError:
+    from cStringIO import StringIO as BytesIO

 from pymysql.util import byte2int, int2byte

@@ -98,8 +104,38 @@ class RotateEvent(BinLogEvent):
         print()


+# FormatDescriptionEvent and StartEventV3 are used to determine binlog version, see:
+# https://dev.mysql.com/doc/internals/en/determining-the-binlog-version.html
 class FormatDescriptionEvent(BinLogEvent):
-    pass
+    def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
+        super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map,
+                                                     ctl_connection, **kwargs)
+        self.version = struct.unpack('<H', self.packet.read(2))[0]
+
+        # Note: the implementation is incomplete, if you need more, please submit
+        # a pull request
+        # see https://dev.mysql.com/doc/internals/en/format-description-event.html
+
+    def dump(self):
+        print("=== %s ===" % (self.__class__.__name__))
+        print("Version: %d" % self.version)
+        print()
+
+
+class StartEventV3(BinLogEvent):
+    def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
+        super(StartEventV3, self).__init__(from_packet, event_size, table_map,
+                                           ctl_connection, **kwargs)
+
+        self.version = struct.unpack('<H', self.packet.read(2))[0]
+        # Note: the implementation is incomplete, if you need more, please submit
+        # a pull request
+        # https://dev.mysql.com/doc/internals/en/start-event-v3.html
+
+    def dump(self):
+        print("=== %s ===" % (self.__class__.__name__))
+        print("Version: %d" % self.version)
+        print()


 class StopEvent(BinLogEvent):
@@ -166,15 +202,27 @@ class QueryEvent(BinLogEvent):
         self.execution_time = self.packet.read_uint32()
         self.schema_length = byte2int(self.packet.read(1))
         self.error_code = self.packet.read_uint16()
-        self.status_vars_length = self.packet.read_uint16()
+        if from_packet.binlog_version >= 4:
+            self.status_vars_length = self.packet.read_uint16()
+        else:
+            self.status_vars_length = 0

         # Payload
-        self.status_vars = self.packet.read(self.status_vars_length)
+        if self.status_vars_length > 0:
+            data = self.packet.read(self.status_vars_length)
+            print('data', data)
+            self.status_vars = QueryEvent._parse_status_vars(data)
+        else:
+            self.status_vars = {}
         self.schema = self.packet.read(self.schema_length)
         self.packet.advance(1)

         self.query = self.packet.read(event_size - 13 - self.status_vars_length
                                       - self.schema_length - 1).decode("utf-8")
+        print(self.dump())
+        print(self.status_vars)
+        raise "foo"
+
         #string[EOF]    query

     def _dump(self):
@@ -184,6 +232,121 @@ class QueryEvent(BinLogEvent):
         print("Query: %s" % (self.query))


+    @staticmethod
+    def _parse_status_vars(data):
+        # https://dev.mysql.com/doc/internals/en/query-event.html
+        # 0x00 Q_FLAGS2_CODE 4
+        # 0x01 Q_SQL_MODE_CODE 8
+        # 0x02 Q_CATALOG 1 + n + 1
+        # 0x03 Q_AUTO_INCREMENT 2 + 2
+        # 0x04 Q_CHARSET_CODE 2 + 2 + 2
+        # 0x05 Q_TIME_ZONE_CODE 1 + n
+        # 0x06 Q_CATALOG_NZ_CODE 1 + n
+        # 0x07 Q_LC_TIME_NAMES_CODE 2
+        # 0x08 Q_CHARSET_DATABASE_CODE 2
+        # 0x09 Q_TABLE_MAP_FOR_UPDATE_CODE 8
+        # 0x0a Q_MASTER_DATA_WRITTEN_CODE 4
+        # 0x0b Q_INVOKERS 1 + n + 1 + n
+        # 0x0c Q_UPDATED_DB_NAMES 1 + n*nul-term-string
+        # 0x0d Q_MICROSECONDS 3
+        data = BytesIO(data)   # TODO: fixup python2 support
+        status_vars = {}
+        while len(data.getvalue()[data.tell():]) > 0:
+            code = struct.unpack('<B', data.read(1))[0]
+            print('code', code)
+            if code == 0x00:
+                # 0x00 Q_FLAGS2_CODE 4
+                value = struct.unpack('<I', data.read(4))[0]
+                status_vars['flags2'] = value
+            elif code == 0x01:
+                # 0x01 Q_SQL_MODE_CODE 8
+                value = struct.unpack('<Q', data.read(8))[0]
+                status_vars['sql_mode'] = value
+            elif code == 0x02:
+                # 0x02 Q_CATALOG 1 + n + 1
+                length = struct.unpack('<B', data.read(1))[0]
+                value = struct.unpack('%ds' % length, data.read(length))[0]
+                data.read(1)
+                status_vars['catalog'] = value
+            elif code == 0x03:
+                # 0x03 Q_AUTO_INCREMENT 2 + 2
+                incr = struct.unpack('<H', data.read(2))[0]
+                offset = struct.unpack('<H', data.read(2))[0]
+                status_vars['auto_increment'] = (incr, offset)
+            elif code == 0x04:
+                # 0x04 Q_CHARSET_CODE 2 + 2 + 2
+                charset = data.read(6)
+                set_client = struct.unpack('<H', charset[0:2])[0]
+                connection = struct.unpack('<H', charset[2:4])[0]
+                server = struct.unpack('<H', charset[4:])[0]
+                status_vars['charset_code'] = {
+                    'set_client': set_client,
+                    'connection': connection,
+                    'server': server,
+                }
+            elif code == 0x05:
+                # 0x05 Q_TIME_ZONE_CODE 1 + n
+                length = struct.unpack('<B', data.read(1))[0]
+                value = struct.unpack('%ds' % length, data.read(length))[0]
+                status_vars['time_zone_code'] = value
+            elif code == 0x06:
+                # 0x06 Q_CATALOG_NZ_CODE 1 + n
+                length = struct.unpack('<B', data.read(1))[0]
+                value = struct.unpack('%ds' % length, data.read(length))[0]
+                status_vars['catalog_nz_code'] = value
+            elif code == 0x07:
+                # 0x07 Q_LC_TIME_NAMES_CODE 2
+                value = struct.unpack('<H', data.read(2))[0]
+                status_vars['time_names_code'] = value
+            elif code == 0x08:
+                # 0x08 Q_CHARSET_DATABASE_CODE 2
+                value = struct.unpack('<H', data.read(2))[0]
+                status_vars['charset_database_code'] = value
+            elif code == 0x09:
+                # 0x09 Q_TABLE_MAP_FOR_UPDATE_CODE 8
+                value = struct.unpack('<Q', data.read(8))[0]
+                status_vars['table_map_for_update_code'] = value
+            elif code == 0x0a:
+                # 0x0a Q_MASTER_DATA_WRITTEN_CODE 4
+                value = struct.unpack('<Q', data.read(4))[0]
+                status_vars['master_data_written_code'] = value
+            elif code == 0x0b:
+                # 0x0b Q_INVOKERS 1 + n + 1 + n
+                length = struct.unpack('<B', data.read(1))[0]
+                username = struct.unpack('%ds' % length, data.read(length))[0]
+                length = struct.unpack('<B', data.read(1))[0]
+                hostname = struct.unpack('%ds' % length, data.read(length))[0]
+                status_vars['invokers'] = {
+                    'username': username,
+                    'hostname': hostname,
+                }
+            elif code == 0x0c:
+                # 0x0c Q_UPDATED_DB_NAMES 1 + n*nul-term-string
+                length = struct.unpack('<B', data.read(1))[0]
+                out = []
+                for _ in range(0, length):
+                    def read_string(inp):
+                        out = BytesIO()
+                        while True:
+                            c = inp.read(1)
+                            if c == b'\0':
+                                return out.getvalue()
+                            out.write(c)
+                    out.append(read_string(data))
+                status_vars['updated_db_names'] = out
+            elif code == 0x0d:
+                # 0x0d Q_MICROSECONDS 3
+                high, low = struct.unpack('<BH', data.read(3))
+                status_vars['microseconds'] = (high << 16) + low
+            else:
+                # Do not make an infinite loop :)
+                # TODO: should send a warning
+                warnings.warn("status var %x is not handled, please report to"
+                              "https://github.com/noplay/python-mysql-replication/issues/new" % code)  # noqa
+                _ = data.read()
+        return status_vars
+
+
 class BeginLoadQueryEvent(BinLogEvent):
     """

diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py
index 936dc8c5b39d5..aedf58ef6e65d 100644
--- a/pymysqlreplication/packet.py
+++ b/pymysqlreplication/packet.py
@@ -68,6 +68,7 @@ class BinLogPacketWrapper(object):
         constants.INTVAR_EVENT: event.IntvarEvent,
         constants.GTID_LOG_EVENT: event.GtidEvent,
         constants.STOP_EVENT: event.StopEvent,
+        constants.START_EVENT_V3: event.StartEventV3,
         constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
         constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
         constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
@@ -92,7 +93,8 @@ class BinLogPacketWrapper(object):
                  only_schemas,
                  ignored_schemas,
                  freeze_schema,
-                 fail_on_table_metadata_unavailable):
+                 fail_on_table_metadata_unavailable,
+                 binlog_version):
         # -1 because we ignore the ok byte
         self.read_bytes = 0
         # Used when we want to override a value in the data buffer
@@ -100,6 +102,7 @@ class BinLogPacketWrapper(object):

         self.packet = from_packet
         self.charset = ctl_connection.charset
+        self.binlog_version = binlog_version

         # OK value
         # timestamp

the patch was straitforward enough, BUT:

@joy13975
Copy link
Author

joy13975 commented Jul 16, 2019

@baloo You’re right. This is only a temporary solution that at least allows my project to move since all schemas have the same encoding.

What about directly querying the server for charset names?

SELECT default_character_set_name FROM information_schema.SCHEMATA 
WHERE schema_name = "example_schema";

@chadmayo
Copy link

@baloo I am dealing with a similar issue. When a query inserting binary data gets parsed, the reader is failing with the same exception @joy13975 reported above.

Is it possible to make a change to allow the errors argument to be specified? This could be passed in as an argument when instantiating BinLogStreamReader or set as a class variable in QueryEvent as suggested by @joy13975 . I am happy to submit a pull request. Please let me know your thoughts.

@baloo
Copy link
Collaborator

baloo commented Oct 10, 2019

@joy13975 well no that does not work, you can't query server to get this afaict.

@cmayo117 I would prefer to go the call decode with "values passed from status_vars" way. But I do not have time to handle this myself. If you'd like to throw in a PR, that would be great.

@chadmayo
Copy link

@baloo Unfortunately I don't think that will solve our issue as it isn't a problem of using the wrong charset. Some queries in our case have embedded binary data which cannot be decoded easily. Instead, it would be helpful to have the option of just ignoring decoding errors altogether.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants