Skip to content

Commit

Permalink
deploy: 0176acc
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] committed Jul 1, 2024
1 parent b563509 commit a3ad61f
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 130 deletions.
Binary file modified .doctrees/developer-interface.doctree
Binary file not shown.
Binary file modified .doctrees/environment.pickle
Binary file not shown.
Binary file modified .doctrees/index.doctree
Binary file not shown.
Binary file modified .doctrees/reconnection.doctree
Binary file not shown.
Binary file modified .doctrees/subscribing-to-a-topic.doctree
Binary file not shown.
5 changes: 3 additions & 2 deletions _sources/reconnection.md.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

Network connections are inherently unstable and can fail at any time. Especially for long-running applications, this can be a challenge. To make our application resilient against connection failures, we can wrap the code in a `try`/`except`-block, listen for `MqttError`s, and reconnect like so:

<!-- The `Client` context is designed to be [reusable (but not reentrant)](https://docs.python.org/3/library/contextlib.html#reusable-context-managers). -->
The `Client` context is designed to be [reusable (but not reentrant)](https://docs.python.org/3/library/contextlib.html#reusable-context-managers).

```python
import asyncio
import aiomqtt


async def main():
client = aiomqtt.Client("test.mosquitto.org")
interval = 5 # Seconds
while True:
try:
async with aiomqtt.Client("test.mosquitto.org") as client:
async with client:
await client.subscribe("humidity/#")
async for message in client.messages:
print(message.payload)
Expand Down
69 changes: 8 additions & 61 deletions _sources/subscribing-to-a-topic.md.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,26 @@ By default, the size of the queue is unlimited. You can set a limit through the

Messages are queued internally and returned sequentially from `Client.messages`. If a message takes a long time to handle, it blocks the handling of other messages.

You can handle messages concurrently by using an `asyncio.TaskGroup` like so:
You can handle messages concurrently by using multiple worker tasks like so:

```python
import asyncio
import aiomqtt


async def handle(message):
await asyncio.sleep(5) # Simulate some I/O-bound work
print(message.payload)
async def work(client):
async for message in client.messages:
await asyncio.sleep(5) # Simulate some I/O-bound work
print(message.payload)


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
# Use a task group to manage and await all tasks
# Use a task group to manage and await all worker tasks
async with asyncio.TaskGroup() as tg:
async for message in client.messages:
tg.create_task(handle(message)) # Spawn new coroutine
for _ in range(2): # You can use more than two workers here
tg.create_task(work(client))


asyncio.run(main())
Expand All @@ -133,60 +134,6 @@ asyncio.run(main())
Coroutines only make sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead.
```

## Multiple queues

The code snippet above handles each message in a new coroutine. Sometimes, we want to handle messages from different topics concurrently, but sequentially inside a single topic.

The idea here is to implement a "distributor" that sorts incoming messages into multiple asyncio queues. Each queue is then processed by a different coroutine. Let's see how this works for our temperature and humidity messages:

```python
import asyncio
import aiomqtt


async def temperature_consumer():
while True:
message = await temperature_queue.get()
print(f"[temperature/#] {message.payload}")


async def humidity_consumer():
while True:
message = await humidity_queue.get()
print(f"[humidity/#] {message.payload}")


temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()


async def distributor(client):
# Sort messages into the appropriate queues
async for message in client.messages:
if message.topic.matches("temperature/#"):
temperature_queue.put_nowait(message)
elif message.topic.matches("humidity/#"):
humidity_queue.put_nowait(message)


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(distributor(client))
tg.create_task(temperature_consumer())
tg.create_task(humidity_consumer())


asyncio.run(main())
```

```{tip}
You can use [different queue types](#the-message-queue) for these queues to e.g. handle temperature in FIFO and humidity in LIFO order.
```

## Listening without blocking

When you run the minimal example for subscribing and listening for messages, you'll notice that the program doesn't finish. Waiting for messages through the `Client.messages()` generator blocks the execution of everything that comes afterward.
Expand Down
8 changes: 4 additions & 4 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@
<article role="main">
<section id="the-idiomatic-asyncio-mqtt-client">
<h1>The idiomatic asyncio MQTT client 🙌<a class="headerlink" href="#the-idiomatic-asyncio-mqtt-client" title="Permalink to this heading">#</a></h1>
<p><code class="docutils literal notranslate"><span class="pre">aiomqtt</span></code> is a lightweight and idiomatic MQTT client:</p>
<p>Write code like this:</p>
<p><strong>Publish</strong></p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">async</span> <span class="k">with</span> <span class="n">aiomqtt</span><span class="o">.</span><span class="n">Client</span><span class="p">(</span><span class="s2">&quot;test.mosquitto.org&quot;</span><span class="p">)</span> <span class="k">as</span> <span class="n">client</span><span class="p">:</span>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">async</span> <span class="k">with</span> <span class="n">Client</span><span class="p">(</span><span class="s2">&quot;test.mosquitto.org&quot;</span><span class="p">)</span> <span class="k">as</span> <span class="n">client</span><span class="p">:</span>
<span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&quot;temperature/outside&quot;</span><span class="p">,</span> <span class="n">payload</span><span class="o">=</span><span class="mf">28.4</span><span class="p">)</span>
</pre></div>
</div>
<p><strong>Subscribe</strong></p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">async</span> <span class="k">with</span> <span class="n">aiomqtt</span><span class="o">.</span><span class="n">Client</span><span class="p">(</span><span class="s2">&quot;test.mosquitto.org&quot;</span><span class="p">)</span> <span class="k">as</span> <span class="n">client</span><span class="p">:</span>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">async</span> <span class="k">with</span> <span class="n">Client</span><span class="p">(</span><span class="s2">&quot;test.mosquitto.org&quot;</span><span class="p">)</span> <span class="k">as</span> <span class="n">client</span><span class="p">:</span>
<span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&quot;temperature/#&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">client</span><span class="o">.</span><span class="n">messages</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">payload</span><span class="p">)</span>
Expand Down Expand Up @@ -272,7 +272,7 @@ <h2>License<a class="headerlink" href="#license" title="Permalink to this headin
</section>
<section id="contributing">
<h2>Contributing<a class="headerlink" href="#contributing" title="Permalink to this heading">#</a></h2>
<p>We’re very happy about contributions to aiomqtt! You can get started by reading <a class="reference external" href="https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md">CONTRIBUTING.md</a>.</p>
<p>We’re happy about contributions to aiomqtt! 🎉 You can get started by reading <a class="reference external" href="https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md">CONTRIBUTING.md</a>.</p>
</section>
<section id="versioning">
<h2>Versioning<a class="headerlink" href="#versioning" title="Permalink to this heading">#</a></h2>
Expand Down
5 changes: 3 additions & 2 deletions reconnection.html
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,17 @@
<section id="reconnection">
<h1>Reconnection<a class="headerlink" href="#reconnection" title="Permalink to this heading">#</a></h1>
<p>Network connections are inherently unstable and can fail at any time. Especially for long-running applications, this can be a challenge. To make our application resilient against connection failures, we can wrap the code in a <code class="docutils literal notranslate"><span class="pre">try</span></code>/<code class="docutils literal notranslate"><span class="pre">except</span></code>-block, listen for <code class="docutils literal notranslate"><span class="pre">MqttError</span></code>s, and reconnect like so:</p>
<!-- The `Client` context is designed to be [reusable (but not reentrant)](https://docs.python.org/3/library/contextlib.html#reusable-context-managers). -->
<p>The <code class="docutils literal notranslate"><span class="pre">Client</span></code> context is designed to be <a class="reference external" href="https://docs.python.org/3/library/contextlib.html#reusable-context-managers">reusable (but not reentrant)</a>.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">asyncio</span>
<span class="kn">import</span> <span class="nn">aiomqtt</span>


<span class="k">async</span> <span class="k">def</span> <span class="nf">main</span><span class="p">():</span>
<span class="n">client</span> <span class="o">=</span> <span class="n">aiomqtt</span><span class="o">.</span><span class="n">Client</span><span class="p">(</span><span class="s2">&quot;test.mosquitto.org&quot;</span><span class="p">)</span>
<span class="n">interval</span> <span class="o">=</span> <span class="mi">5</span> <span class="c1"># Seconds</span>
<span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">async</span> <span class="k">with</span> <span class="n">aiomqtt</span><span class="o">.</span><span class="n">Client</span><span class="p">(</span><span class="s2">&quot;test.mosquitto.org&quot;</span><span class="p">)</span> <span class="k">as</span> <span class="n">client</span><span class="p">:</span>
<span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="p">:</span>
<span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&quot;humidity/#&quot;</span><span class="p">)</span>
<span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">client</span><span class="o">.</span><span class="n">messages</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">payload</span><span class="p">)</span>
Expand Down
Loading

0 comments on commit a3ad61f

Please sign in to comment.