Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BinLogStreamReader is not resuming from last position (log_file, log_pos not working). #618

Open
zygias opened this issue Jun 27, 2024 · 1 comment

Comments

@zygias
Copy link

zygias commented Jun 27, 2024

Hello, i face issue, that BinLogStreamReader is not resuming from last position.
I have hardcoded 1146 position, but process takes 875 position too. It should resume log stream from >1146 position.

    stream = BinLogStreamReader(
        connection_settings=config,
        server_id=1,
        only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        is_mariadb=True,
        blocking=True,
        resume_stream=True,
        # log_file=log_file,
        # log_pos=log_pos,
        log_file="mysqld-bin.000003",
        log_pos=1146,
    )

MariaDB version: 10.3.39
Package version: mysql-replication=="0.45.1" (Works with 10.3.39 MariaDB version)

my.cnf:

[mysqld]
log-bin
binlog-format=ROW
server-id=1
binlog_row_image=FULL

Process logs:

INFO:root:Resuming from mysqld-bin.000003:1146
INFO:root:Starting BinLogStreamReader with log_file=mysqld-bin.000003 and log_pos=1146
DEBUG:root:BinLogStreamReader initialized with log_file=mysqld-bin.000003 and log_pos=1146
INFO:root: -> Insert into mydatabase.example_table: {'id': 1, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 34, 17), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:875
INFO:root: -> Insert into mydatabase.example_table: {'id': 2, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 56, 39), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:1146

Python code:

import time
# mysql-replication=="0.45.1"
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from pymysqlreplication.event import RotateEvent
import logging

# Database connection parameters
config = {
    'host': 'localhost',
    'port': 3307,
    'user': 'root',
    'password': 'rootpassword',
    'database': 'mydatabase'
}

# Setup logging
logging.basicConfig(level=logging.DEBUG)


def save_binlog_position(log_file, log_pos):
    with open("binlog_position.txt", "w") as f:
        f.write(f"{log_file},{log_pos}")
        logging.info(f"Saved logs: {log_file}:{log_pos}")


def load_binlog_position():
    try:
        with open("binlog_position.txt", "r") as f:
            log_file, log_pos = f.read().strip().split(',')
            logging.info(f"Resuming from {log_file}:{log_pos}")
            return log_file, int(log_pos)
    except FileNotFoundError:
        logging.info("Starting from beginning")
        return None, None


def get_values_from_logs(stream):
    for event in stream:
        table_name = event.table
        if isinstance(event, WriteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": False})
                logging.info(f" -> Insert into {event.schema}.{event.table}: {row['values']}")
        elif isinstance(event, UpdateRowsEvent):
            for row in event.rows:
                row['after_values'].update({"_deleted": False})
                logging.info(
                    f" -> Update {event.schema}.{event.table}: {row['before_values']} -> {row['after_values']}")
        elif isinstance(event, DeleteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": True})
                logging.info(f" -> Delete from {event.schema}.{event.table}: {row['values']}")
        log_file, log_pos = stream.log_file, stream.log_pos
        save_binlog_position(log_file, log_pos)


# Function to parse binary log events
def parse_binlog_events():
    log_file, log_pos = load_binlog_position()

    logging.info(f"Starting BinLogStreamReader with log_file={log_file} and log_pos={log_pos}")

    stream = BinLogStreamReader(
        connection_settings=config,
        server_id=1,
        only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        is_mariadb=True,
        blocking=True,
        resume_stream=True,
        # log_file=log_file,
        # log_pos=log_pos,
        log_file="mysqld-bin.000003",
        log_pos=1146,
    )

    logging.debug(f"BinLogStreamReader initialized with log_file={stream.log_file} and log_pos={stream.log_pos}")
    get_values_from_logs(stream)
    stream.close()


if __name__ == "__main__":
    parse_binlog_events()

Docker-compose:

version: '3'

services:
  mariadb:
    image: mariadb:10.3.39
    ports:
      - "3307:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: mydatabase
      MYSQL_USER: myuser
      MYSQL_PASSWORD: mypassword
    volumes:
      - ./mariadb/my.cnf:/etc/mysql/conf.d/my.cnf

Anyone could help? Thank you in advance!

@dongwook-chan
Copy link
Collaborator

@zygias
Thanks for reporting.
Could you please provide provide binary logs or queries so that I could reproduce the issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants