Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] transition to anyio #302

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open

[WIP] transition to anyio #302

wants to merge 36 commits into from

Conversation

smurfix
Copy link

@smurfix smurfix commented May 22, 2024

As per #226 here's a draft of an anyio-ified aiomqtt. It abuses paho-mqtt 2.0 by way of subclassing its Client class.

No more threads. No more issues with locking. Trio compatibility.

Tests pass (one failure on asyncio) but there's still a lot to do; checking robustness and handling reconnections and getting typing back up to standard and finishing the paho callback API v2 transition and support for paho-mqtt 2.1 and general clean-up and support for more MQTTv5 features and whatnot.

I need this code in order to replace my moat-mqtt package (which is a hbmqtt port to anyio that shows its age). No guarantees but I plan to keep poking at this.

smurfix added 15 commits May 21, 2024 09:39
This commit subclasses the paho MQTT client code (somewhat intrusively),
replacing its threading with anyio-style tasks.
Recursion is bad for you. More to the point it's not necessary.

Also, several common non-matches can be checked for up front.
Pre-split the topic so the comparison code doesn't need to run `split`
each time it's called.
Drop the trailing hash from the split wildcard topic.

This further simplifies the code and prepares for the next bit.
Subscriptions are arranged into a tree that's traversed
when a message is dispatched.
No sense in doing this in the comparator

return ("asyncio", {"policy": WindowsSelectorEventLoopPolicy()})
return ("asyncio", {})
return ("trio", {})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the test suite will only run against Trio now. The proper fix is to delete the fixture completely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's temporary until the next anyio release and/or I can figure out the reason for the test failure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

smurfix added 14 commits May 27, 2024 11:36
Use `contextlib.aclosing` to cleanly stop the iterator
instead, synchronize via task_status.started()
If the server is busy it might delay forwarding messages
until after it sees the unsubscription.
anyio 4.4 warns when that doesn't happen
This change adds subscriptions with separate message queues.

Does not yet handle duplicate subscriptions.
needs just a bool return, not a counter
This test doesn't yet support subscribing to a topic multiple times.

TODO.
This change skips walking the topic tree, assuming server support.
@smurfix
Copy link
Author

smurfix commented May 27, 2024

Now includes a subscription method that lets you do "local" message processing:

async with Client("test.mosquitto.org") as client:
    async with client.subscription("foo/bar") as msgs:
        async for m in msgs:
            await process(m)

@empicano
Copy link
Collaborator

empicano commented May 28, 2024

Hi there, very cool to see so much work on this! 👍

This seems to be the go-ahead for our yearly discussion on this subject 😄 For reference, we had a PR in the past (#152) to switch to anyio that we ultimately decided to abandon. In discussion #44, people were reluctant about a switch to anyio, especially if the changes would impact the interface.

All that being said, we focus a lot on keeping aiomqtt as small and maintainable as possible. I can see an internal switch to anyio if it works towards that goal. In the best case, this would be non-breaking. The trio support would be a nice extra, but wouldn't make the switch worth it to me if it increases maintenance cost. I am also still on the edge if it's worth adding another dependency if we get e.g. task groups "for free" once we drop Python <3.11.

@frederikaalund, @JonathanPlasse, what are your thoughts on this? 🙂


Now includes a subscription method that lets you do "local" message processing

We had something similar to this in the past with unfiltered_messages and filtered_messages. People got confused about how it works, among others the fact that this creates multiple queues, and we thus decided to change it to what we have now.

README.md Outdated Show resolved Hide resolved
import anyio
from anyio import create_memory_object_stream

from outcome import Error, Value

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see outcome in the dependencies. I also don't think it's worth the trouble to implement a wrapper on top of memory object streams.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I'll fix the dependency.

The wrapper is intended for those who want to use their own asyncio.queue-based implementation (e.g. LIFO, or something priority-based). If the maintainers decide to drop backwards compatibility and use anyio's object streams instead, I'd be happy to remove it.

@agronholm
Copy link

Is it absolutely necessary to depend on the paho-mqtt client? Can't we instead create a sans-io implementation of the MQTT v5 / v3.1 protocols?

@smurfix
Copy link
Author

smurfix commented May 28, 2024

@agronholm Paho-MQTT at least has a working MQTTv5 implementation and complete data classes for all the fiddly bits; reimplementing all that is not exactly trivial, thus the straightest way to working threads-free async-only code was to anyio-ify it by way of a subclass. I'm the first person to admit that this subclass is way more intrusive than any sane programmer should be comfortable with, but you got to start somewhere.

I do plan to build a reasonable sans-io MQTT core eventually. Ideally I'd then be able to teach the current paho.mqtt.Client class to use that (should be a net reduction in code size …). Assuming that its maintainers are amenable to this, which I'm a bit skeptical about, but we'll see. In the meantime I've submitted a somewhat less intrusive clean-up patch to them.

eclipse/paho.mqtt.python#845

@smurfix
Copy link
Author

smurfix commented May 28, 2024

@empicano asyncio taskgroups may have the same name, but not the same features of anyio taskgroups. For instance, they don't have cancel scopes; you need to cancel the task the group runs in, which is not the same thing. There are other differences which @agronholm is way more qualified than I am to educate people about. ;-)

As to changing the interface, my patch manages not to do that. Admittedly it does this by way of somewhat-invasively subclassing the Paho client but frankly the Paho code could do with a bit of improvement. :-/

As far as "filtered_messages", IMHO it's far more confusing, or rather detrimental to modularity, to require a central "async with client.messages()" loop to do the message dispatching. You can't do a "this task deals with temperatures and that task deals with humidity" pattern that way. It also requires more CPU.

IMHO it's a question of designing the right interface. The old async with filtered_messages() pattern required the caller to do the subscribe/loop-/unsubscribe dance instead of having the context manager handle it, which I can't see a good reason for. It also risked losing messages, because the filter was attached after subscribing instead of beforehand.

NB 9c6a7f4 documents the new async with client.subscription() interface. Personally I'd much rather use my example code than the equivalent global-queue example above it (which is more code and less modular, esp. when you add queue-full-error handling) example above it.

@agronholm
Copy link

@agronholm Paho-MQTT at least has a working MQTTv5 implementation and complete data classes for all the fiddly bits; reimplementing all that is not exactly trivial, thus the straightest way to working threads-free async-only code was to anyio-ify it by way of a subclass. I'm the first person to admit that this subclass is way more intrusive than any sane programmer should be comfortable with, but you got to start somewhere.

I do plan to build a reasonable sans-io MQTT core eventually. Ideally I'd then be able to teach the current paho.mqtt.Client class to use that (should be a net reduction in code size …). Assuming that its maintainers are amenable to this, which I'm a bit skeptical about, but we'll see. In the meantime I've submitted a somewhat less intrusive clean-up patch to them.

eclipse/paho.mqtt.python#845

Fair enough.

turns out that it obfuscates more than it helps, at least here
@empicano
Copy link
Collaborator

empicano commented Jun 2, 2024

I hope Frederik will still comment here, he can frankly review this better than me. I'll say that I'd be pretty neat to get rid of the threads and clean up the code, I'm just concerned about maintainability when adding as much code as we're already stretched thin as is. I agree that that sans-io "backend" would be really cool to see in aiomqtt 👍


On the interface topic:

You can't do a "this task deals with temperatures and that task deals with humidity" pattern that way.

Maybe I misunderstand you here, I'd say this example in the documentation does that, or not? The flexibility of this approach was in fact one of the main reasons I pushed for the change.

Generally, I want this library to serve the most basic use case (which I think is a single task and queue) as well as possible, with flexibility to use it for as much as possible after that. I'd like to avoid having multiple queues by default, because when we still had filtered_messages we heared multiple times in issues and discussions that this led to confusion and problems.

Apart from that, I'd expect a fixed number of tasks working a single priority queue to fit better in most use cases where we need to process message concurrently, e.g. because the number of messages coming over different topics is often skewed. What do you think about that?

IMHO it's a question of designing the right interface. The old async with filtered_messages() pattern required the caller to do the subscribe/loop-/unsubscribe dance instead of having the context manager handle it, which I can't see a good reason for. It also risked losing messages, because the filter was attached after subscribing instead of beforehand.

In my opinion, subscriptions and routing should be separate. I admit that context managers for subscriptions are elegant, but most of the time we either don't need to unsubscribe (clean session) or don't want to unsubscribe (persistent session) before disconnection. I want this library to support as many use cases as possible, and dynamic un/subscriptions are part of that. We can still have those via task cancellation with the proposed changes, but I'd like to find a simpler solution.

Let me propose an alternative idea to improve the message handling interface:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)

Where we can process messages concurrently e.g. like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async def work(client):
    async for message in client._messages():
        await client.route(message)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(work(client))
        tg.create_task(work(client))

(.messages is currently broken, for now we use ._messages() to be able to use it multiple times.)

I opened #304 with a proof of concept. I'd be glad to hear what you think!

@smurfix
Copy link
Author

smurfix commented Jun 2, 2024

Well, the downside of separating subscription and routing is, as your example demonstrates, that the router needs to match every message to all your wildcard patterns. My code uses the fact that the server already did that work.

As to sans-io – in an ideal world that'd be part of Paho and thus not add to aiomqtt's maintainance burden. We'll see.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants