Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Feature: asyncio support for BinLogStreamReader #593

Open
gongyisheng opened this issue Dec 6, 2023 · 11 comments
Open

New Feature: asyncio support for BinLogStreamReader #593

gongyisheng opened this issue Dec 6, 2023 · 11 comments

Comments

@gongyisheng
Copy link
Contributor

gongyisheng commented Dec 6, 2023

Hi python-mysql-replication community, I want to request asyncio support for BinLogStreamReader.

My use case here is to use BinLogStreamReader to receive binlog message and forward it to a message queue and a consumer will process it. Here's an example of my code:

def binlog_subscribe():
        stream = BinLogStreamReader(
                connection_settings=MYSQL_SETTINGS, 
                server_id=2, 
                only_events=[DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent],
                blocking=True,
                enable_logging=False,
                only_schemas=set(["test"]),
                only_tables=set(["binlog_test"]))
        
        for binlogevent in stream:
                for row in binlogevent.rows:
                        yield row
 
async def forward_message():
        while True:
                try:
                        message = await asyncio.wait_for(memory_queue.get(), timeout=XXX)
                        await message_queue.push(message)
                except asyncio.TimeoutError:
                        logging.debug("Memory queue is empty")

async def main():
        forward_task = asyncio.create_task(forward_message())
        for message in binlog_subscribe():
                await memory_queue.push(row)

if __name__ == "__main__":
        asyncio.run()

Due to the fact that BinLogStreamReader __iter__ method is blocking, it blocks the asyncio event loop and causes some side effects, eg.

  1. blocking background health check with message queue server under low qps
  2. limit cpu usage under high qps.
  3. some of the message in the memory queue is not sent out in time

And I see the examples like https://github.com/julien-duponchelle/python-mysql-replication/blob/main/examples/mysql_to_kafka.py, it's a common use case to have a forward service to listen binlog stream and forward it to somewhere else.

Currently the package does not support asyncio. I think asyncio is a fit for this package because reading binlog is I/O process and asyncio allows parallel processing for multiple I/O process. There's also open source 3p lib that supports asyncio like aiomysql, aiokafka so I think there're use cases to have asyncio support. For order guarantee, as long as we continue to use iterator interface it will not cause message disorder issues.

Let me know your idea!

@sean-k1
Copy link
Collaborator

sean-k1 commented Dec 9, 2023

@gongyisheng
I have a question
If you asynchronize the sending of the Kafka broker data, it looks like its not possible to guarantee the order of the messages.
Is there a reason to do this?

@gongyisheng
Copy link
Contributor Author

gongyisheng commented Dec 11, 2023

Hi @sean-k1 , To meet guarantee of order of messages, we need to use a memory queue (I used asyncio.Queue here) to store message temporarily. Another coroutine reads data from memory queue and send data to kafka. In this case, the memory queue can help order the messages. I can help add an example to show how to run async code safely.

                              |---> Memory Queue 1 ---> Coroutine 1
MySQL---->BinlogStreamReader--|---> Memory Queue 2 ---> Coroutine 2
                              |---> Memory Queue 3 ---> Coroutine 3

The biggest reason to do this is to make this package compatible with asyncio. As asyncio gain popularity after python 3.8, we have more 3p libs and workload that uses asyncio to run I/O processes concurrently to make full use of cpu resource. http requests, redis client, database client, message queue client, s3... a lot of popular libs that has I/O operations have asyncio support.

Here's the benefits I see:

  • My case for asyncio support is that I want to distribute one binlog stream to multiple queue and let different consumer to consume at different speed. The database is write-heavy and I don't want to add too many replicas. I have to make "send to multiple queue" process async otherwise it will limit the binlog reading QPS.
  • I also see there'll be cases to have more "binlog-native" applications if we support it. One of the reason that we use message queue is that producer and consumer process message at different speed caused by sync/async. However, we can think about moving consumer code to binlog producer with asyncio support, such as monitoring / audit apps.

Thanks again, let me know if you have more questions! We can also have a survey to see if there's more ppl want asyncio support.

@dongwook-chan
Copy link
Collaborator

dongwook-chan commented Dec 12, 2023

@gongyisheng
Thank you for submitting the issue. I do agree that incorporating asyncio will maximize CPU utilization and speed up the whole streaming process if any sort of message queues are involved.

However the change you suggested - especially in your code - isn't technically within the scope of MySQL replication client. Are you suggesting that we should go beyond existing implementation and extend our responsibility?

@gongyisheng
Copy link
Contributor Author

Hi @dongwook-chan, thank you for your feedback. I think providing an async class of BinLogStreamReader is inside the responsibility of mysql replication client.

@dongwook-chan
Copy link
Collaborator

@gongyisheng
Sure! If doing so benefits our community, I'm more than glad to implememt the async class.

Could you clarify which part of BinLogStreamReader should be async? The code you provided will work even without incorporating async to BinLogStreaReader.

@gongyisheng
Copy link
Contributor Author

gongyisheng commented Dec 12, 2023

@dongwook-chan Yeah the code I provide can work but it doesn't work in a smooth way because it's not async. The event loop may be blocked in the middle because BinLogStreamReader.__iter__ is not async

I suggest that we can make BinLogStreamReader.__iter__ async.

@dongwook-chan
Copy link
Collaborator

dongwook-chan commented Dec 12, 2023

@gongyisheng
Oh I get it now. I didn't think about incorporating asyncio to I/O from/to MySQL because the blocking in my use case is quite minimal. This is because in my case, MySQL is quite busy and constantly sending events to python-mysql-replication. But I guess it really depends on how users utilize the package. I'll convince other contributors look into this issue.

I have a few questions that I want to ask just because I want to know how the community uses the package and how they can benefit from asyncio.

  1. Do you have any other async operation in your application other than ones regarding the message queue?
  2. What is the number of events per second you receive from the connected MySQL?

@gongyisheng
Copy link
Contributor Author

gongyisheng commented Dec 25, 2023

Hi @dongwook-chan, sorry I was a bit buzy this month, coming to reply

  1. Currently I have one more async operations other than sending to message queue. It's saving cutoff (file, position) to s3. It's not frequent (one call per minute).
  2. Event QPS varies a lot. Sometimes it's near 0 and sometimes it's 50-100.

Merry Christmas!

@julien-duponchelle
Copy link
Owner

Not sure if you are aware but a fork exists:
https://github.com/jettify/aiomysql_replication

I'm big fan of asyncio and I can see the benefits if you have if you need with a slow operation like S3 in the same process.

@gongyisheng
Copy link
Contributor Author

@julien-duponchelle Yes I'm a big fan of asyncio too.
I took a look at the code of async fork, it's a bit old. It supports asyncio through @asyncio.coroutine and yield, which is depracated after python3.8. We need to use async def and await instead. Good reference but need a bit more to change!
I can help with this feature during my spare time.

@dongwook-chan
Copy link
Collaborator

@julien-duponchelle
Thank you for the great reference!

@gongyisheng
Since you have the specific/professional use case, your help will be invaluable to the project. I would also like to be involved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants