You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello,
I take the liberty to disturb you for a while, I hope this is a discussion issue, whether it is possible to extend the operation of event processing freely or conveniently, such as adding extension to events without destroying the original structure.
I found an interesting thing. I am researching TDSQL, which is a distributed database in China. The bottom layer is based on the mysql event structure, and new events are added. If you want to use the project smoothly, you need to implement object-oriented on the original basis. The inheritance and then adjustment, of course, for the object-oriented method of __map_events, it should be a private attribute and cannot be inherited as a subclass. But there are also channels to get it, which is also related to the Python mechanism, but this breaks the code. This is not graceful. So you can only copy the code and increase it by upgrading the version.
classBinLogStreamReaderV2(object):
"""Connect to replication stream and read event """report_slave=Nonedef__init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False,
blocking=False, only_events=None, log_file=None, log_pos=None,
filter_non_implemented_events=True,
ignored_events=None, auto_position=None,
only_tables=None, ignored_tables=None,
only_schemas=None, ignored_schemas=None,
freeze_schema=False, skip_to_timestamp=None,
report_slave=None, slave_uuid=None,
pymysql_wrapper=None,
fail_on_table_metadata_unavailable=False,
slave_heartbeat=None, map_events=None):
deffetchone(self):
whileTrue:
ifnotself.__connected_stream:
self.__connect_to_stream()
ifnotself.__connected_ctl:
self.__connect_to_ctl()
try:
ifpymysql.__version__<"0.6":
pkt=self._stream_connection.read_packet()
else:
pkt=self._stream_connection._read_packet()
exceptpymysql.OperationalErroraserror:
code, message=error.argsifcodeinMYSQL_EXPECTED_ERROR_CODES:
self._stream_connection.close()
self.__connected_stream=Falsecontinueifpkt.is_eof_packet():
self.close()
returnNoneifnotpkt.is_ok_packet():
continuebinlog_event=BinLogPacketWrapperV2(pkt, self.table_map,
self._ctl_connection,
self.__use_checksum,
self.__allowed_events_in_packet,
self.__only_tables,
self.__ignored_tables,
self.__only_schemas,
self.__ignored_schemas,
self.__freeze_schema,
self.__fail_on_table_metadata_unavailable,
self.map_events)
ifbinlog_event.event_type==ROTATE_EVENT:
self.log_pos=binlog_event.event.positionself.log_file=binlog_event.event.next_binlog# Table ID in binlog are NOT persistent in MySQL - they are in-memory identifiers# that means that when MySQL master restarts, it will reuse same table id for different tables# which will cause errors for us since our in-memory map will try to decode row data with# wrong table schema.# The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it# restarts. That means every rotation we see *could* be a sign of restart and so potentially# invalidates all our cached table id to schema mappings. This means we have to load them all# again for each logfile which is potentially wasted effort, but we can't really do much better# without being broken in restart caseself.table_map= {}
elifbinlog_event.log_pos:
self.log_pos=binlog_event.log_pos# This check must not occur before clearing the ``table_map`` as a# result of a RotateEvent.## The first RotateEvent in a binlog file has a timestamp of# zero. If the server has moved to a new log and not written a# timestamped RotateEvent at the end of the previous log, the# RotateEvent at the beginning of the new log will be ignored# if the caller provided a positive ``skip_to_timestamp``# value. This will result in the ``table_map`` becoming# corrupt.## https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html# From the MySQL Internals Manual:## ROTATE_EVENT is generated locally and written to the binary# log on the master. It is written to the relay log on the# slave when FLUSH LOGS occurs, and when receiving a# ROTATE_EVENT from the master. In the latter case, there# will be two rotate events in total originating on different# servers.## There are conditions under which the terminating# log-rotation event does not occur. For example, the server# might crash.ifself.skip_to_timestampandbinlog_event.timestamp<self.skip_to_timestamp:
continueifbinlog_event.event_type==TABLE_MAP_EVENTand \
binlog_event.eventisnotNone:
self.table_map[binlog_event.event.table_id] = \
binlog_event.event.get_table()
# event is none if we have filtered it on packet level# we filter also not allowed eventsifbinlog_event.eventisNoneor (binlog_event.event.__class__notinself.__allowed_events):
continuereturnbinlog_event.event
packet.py
classBinLogPacketWrapperV2(object):
""" Bin Log Packet Wrapper. It uses an existing packet object, and wraps around it, exposing useful variables while still providing access to the original packet objects variables and methods. """__event_map= {
# eventconstants.QUERY_EVENT: event.QueryEvent,
constants.ROTATE_EVENT: event.RotateEvent,
constants.FORMAT_DESCRIPTION_EVENT: event.FormatDescriptionEvent,
constants.XID_EVENT: event.XidEvent,
constants.INTVAR_EVENT: event.IntvarEvent,
constants.GTID_LOG_EVENT: event.GtidEvent,
constants.STOP_EVENT: event.StopEvent,
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
# row_eventconstants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,
constants.DELETE_ROWS_EVENT_V1: row_event.DeleteRowsEvent,
constants.UPDATE_ROWS_EVENT_V2: row_event.UpdateRowsEvent,
constants.WRITE_ROWS_EVENT_V2: row_event.WriteRowsEvent,
constants.DELETE_ROWS_EVENT_V2: row_event.DeleteRowsEvent,
constants.TABLE_MAP_EVENT: row_event.TableMapEvent,
# 5.6 GTID enabled replication eventsconstants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
# ADD TDSQL GTID EVENTSTDSQL_GLOBAL_GTID_EVENT: event_exp.TDSQLGtidEvent
}
def__init__(self, from_packet, table_map, ctl_connection, use_checksum,
allowed_events,
only_tables,
ignored_tables,
only_schemas,
ignored_schemas,
freeze_schema,
fail_on_table_metadata_unavailable,
map_events):
ifmap_events:
self.__event_map.update(map_events)
The text was updated successfully, but these errors were encountered:
Sorry for the late response. It's indeed an interesting problem. I wonder if we could allow to inherit from the base class to extend them in the user code and have settings to replace the base class by the new one .
Hello,
I take the liberty to disturb you for a while, I hope this is a discussion issue, whether it is possible to extend the operation of event processing freely or conveniently, such as adding extension to events without destroying the original structure.
I found an interesting thing. I am researching TDSQL, which is a distributed database in China. The bottom layer is based on the mysql event structure, and new events are added. If you want to use the project smoothly, you need to implement object-oriented on the original basis. The inheritance and then adjustment, of course, for the object-oriented method of __map_events, it should be a private attribute and cannot be inherited as a subclass. But there are also channels to get it, which is also related to the Python mechanism, but this breaks the code. This is not graceful. So you can only copy the code and increase it by upgrading the version.
├── binlogstream_expand.py
├── constants_expand
│ ├── BINLOG.py
│ └── init.py
├── event_expand
│ ├── event.py
│ └── init.py
├── main.py
├── packet_expand
│ ├── packet.py
│ └── init.py
├── requirements.txt
└── venv
main.py
binlogstream_expand.py
init add map_events
map_events = {TDSQL_GLOBAL_GTID_EVENT: TDSQLGtidEvent}
packet.py
The text was updated successfully, but these errors were encountered: