Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve message routing #304

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

Improve message routing #304

wants to merge 1 commit into from

Conversation

empicano
Copy link
Collaborator

@empicano empicano commented Jun 2, 2024

This proposes a simpler way to filter messages and structure message handling.

Unsorted points that I considered:

  • This lets us split message handling into multiple files via multiple routers
  • This lets us use the client inside a handler function, to e.g. publish a message back in a request/response fashion
  • We can dynamically subscribe and unsubscribe
  • The values of wildcards (+/#) of the topic filter are automatically available as *args in the handler function
  • We still only have a single message queue (easier for newcomers, concurrency could be implemented as shown below, optionally with priority queue)
  • We can still pass a non-default queue to the client to prioritize the handling of certain messages
  • We still have flexibility to not use the routers, but handle the messages directly. Routers are a natural development, once the application gets too complex we can iteratively add them
  • We are still flexible enough to process messages concurrently in an individual way
  • It's a non-breaking change

The interface looks like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)

Where we can process messages concurrently e.g. like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async def work(client):
    async for message in client._messages():
        await client.route(message)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(work(client))
        tg.create_task(work(client))

(.messages is currently broken, for now we use ._messages() to be able to use it multiple times.)

Glad to hear feedback on this 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant