Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of the asyncio consumer #228

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

Nictec
Copy link

@Nictec Nictec commented Sep 20, 2024

Hi,
I implemented the asyncio consumer in a similar way as the producer. All missing async functions provided by the C++ library are now implemented in the pybind11 classes and I use futures in python to convert the functions with callbacks to async functions. The reason I started with this implementation is because I need the async consumer in a project with FastAPI myself.
Tests could be a little bit patchy, for that reason i would appreciate help if i missed something

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the binaries from the git commits

@BewareMyPower
Copy link
Contributor

Traceback (most recent call last):
  File "asyncio_test.py", line 27, in <module>
    from pulsar.asyncio import (
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pulsar/asyncio.py", line 165, in <module>
    class Consumer:
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pulsar/asyncio.py", line 216, in Consumer
    async def seek(self, position: tuple[int, int, int, int] | pulsar.MessageId):
TypeError: 'type' object is not subscriptable

Could you also make the tests pass for Python 3.8? Though Python 3.8 will reach EOL on the next month (2024-10)

@Nictec
Copy link
Author

Nictec commented Sep 30, 2024

I will try to fix the issues this week.

@Nictec
Copy link
Author

Nictec commented Oct 18, 2024

I found another issue with the schema system in my asyncio implementation, i will fix this too before the next commit.

@Nictec Nictec requested a review from BewareMyPower October 18, 2024 15:06
@Nictec Nictec removed their assignment Oct 18, 2024
@BewareMyPower
Copy link
Contributor

Let me fix the broken CI first

@BewareMyPower
Copy link
Contributor

Could you rebase to master to resolve the conflicts and have the CI fixed?

@BewareMyPower BewareMyPower added this to the 3.6.0 milestone Nov 4, 2024
develop/install_manifest.txt Outdated Show resolved Hide resolved
src/client.cc Outdated Show resolved Hide resolved
src/consumer.cc Outdated Show resolved Hide resolved
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Please remove these binaries.

@merlimat
Copy link
Contributor

merlimat commented Dec 2, 2024

@BewareMyPower I removed the binary files

pulsar/asyncio.py Outdated Show resolved Hide resolved
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you wrap an xxxAsync API from pulsar-client-cpp, the correct way is:

// release the GIL because xxxAsync does not access any Python objects
py::gil_scoped_release release;
// Pass the callback directly because pybind11 acquires the GIL automatically when the callback is executed
xxx.xxxAsync(yyy, callback);

tests/asyncio_test.py Outdated Show resolved Hide resolved
src/client.cc Outdated Show resolved Hide resolved
src/consumer.cc Outdated Show resolved Hide resolved
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also fix the wrapper in consumer.cc


namespace py = pybind11;

void Consumer_unsubscribe(Consumer& consumer) {
waitForAsyncResult([&consumer](ResultCallback callback) { consumer.unsubscribeAsync(callback); });
}

void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) {
consumer.unsubscribeAsync([callback] (Result result) {
py::gil_scoped_acquire acquire;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This GIL acquire is not necessary

Message Consumer_receive(Consumer& consumer) {
return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); });
}

void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
py::gil_scoped_acquire acquire;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should release the GIL rather than acquire the GIL

@BewareMyPower BewareMyPower modified the milestones: 3.6.0, 3.7.0 Jan 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants