Skip to content

Commit

Permalink
Migrate amqp library (#194)
Browse files Browse the repository at this point in the history
* migrate amqp library

* few tests/modifications

* formatting

* removing uamqp library

* fixing issue on data encoding

* fixing tests/ isort and black checker

* removing useless files

* fixing examples

* reverting utils.py

* fixing doc
  • Loading branch information
DanielePalaia authored May 28, 2024
1 parent 65b7a22 commit 3e24496
Show file tree
Hide file tree
Showing 27 changed files with 2,810 additions and 85 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The client supports two codecs to store the messages to the server:
By default you should use `AMQP 1.0` codec:
```python
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
```

Expand Down Expand Up @@ -287,7 +287,7 @@ This one:
```python
for i in range(1_000_000):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
# send is asynchronous
await producer.send(stream=STREAM, message=amqp_message)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/basic_producers/producer_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def publish():

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
# send is asynchronous
await producer.send(stream=STREAM, message=amqp_message)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/basic_producers/producer_send_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def publish():
messages = []
for i in range(BATCH):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
messages.append(amqp_message)
# send_batch is synchronous. will wait till termination
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/basic_producers/producer_send_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async def publish():
# sending a thousand of messages in AMQP format
for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
# send is synchronous. It will also wait synchronously for the confirmation to arrive from the server
# it is really very slow and send() + callback for asynchronous confirmation should be used instead.
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/filtering/producer_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def publish():
"region": "New York",
}
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
application_properties=application_properties,
)
# send is asynchronous
Expand All @@ -43,7 +43,7 @@ async def publish():
"region": "California",
}
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
application_properties=application_properties,
)
# send is asynchronous
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/filtering/super_stream_producer_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def publish():
for i in range(MESSAGES):
application_properties = {"region": "New York", "id": "{}".format(i)}
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
application_properties=application_properties,
)
# send is asynchronous
Expand All @@ -51,7 +51,7 @@ async def publish():
for i in range(MESSAGES):
application_properties = {"region": "California", "id": "{}".format(i)}
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
application_properties=application_properties,
)
# send is asynchronous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def publish():
messages = []
for i in range(BATCH):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
messages.append(amqp_message)
# send_batch is synchronous. will wait till termination
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def publish():
# sending a million of messages in AMQP format
for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
# send is asynchronous - also confirmation is taken asynchronously by _on_publish_confirm_client callback
# you can specify different callbacks for different messages.
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/reliable_client/BestPracticesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async def publish(rabbitmq_configuration: dict):
return

amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
application_properties={"id": "{}".format(i)},
)
# send is asynchronous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def publish():
# run slowly several messages in order to test with sac
for i in range(1000000):
amqp_message = AMQPMessage(
body="message_:{}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
properties=uamqp.message.MessageProperties(message_id=i),
)
await producer.send(message=amqp_message)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/sub_entry_batch/producer_sub_entry_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def publish():
messages = []
for i in range(BATCH):
amqp_message = AMQPMessage(
body="a:{}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
messages.append(amqp_message)

Expand Down
2 changes: 1 addition & 1 deletion docs/examples/super_stream/super_stream_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def publish():
start_time = time.perf_counter()
for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
application_properties={"id": "{}".format(i)},
)
await super_stream_producer.send(amqp_message)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/super_stream/super_stream_producer_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def publish():
# Sending a million messages
for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
application_properties={"id": "{}".format(i)},
)
await super_stream_producer.send(amqp_message)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/tls/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def publish():
messages = []
for i in range(BATCH):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
body=bytes("hello: {}".format(i), "utf-8"),
)
messages.append(amqp_message)
# send_batch is synchronous. will wait till termination
Expand Down
37 changes: 2 additions & 35 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rstream"
version = "0.19.1"
version = "0.20.0"
description = "A python client for RabbitMQ Streams"
authors = ["George Fortunatov <[email protected]>", "Daniele Palaia <[email protected]>"]
readme = "README.md"
Expand All @@ -10,7 +10,6 @@ license = "MIT"

[tool.poetry.dependencies]
python = "^3.9"
uamqp = "^1.6.3"
requests = "^2.31.0"
mmh3 = "^4.0.0"

Expand All @@ -24,11 +23,13 @@ pytest-asyncio = "^0.15.1"
black = "^23.12.1"
requests = "^2.31.0"
mmh3 = "^4.0.0"
typing_extensions ="^4.11.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"
requests = "^2.31.0"
types-requests = "^2.31.0.20240406"
typing_extensions ="^4.11.0"

[tool.black]
line-length = 110
Expand Down
4 changes: 4 additions & 0 deletions rstream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
del metadata

from .amqp import AMQPMessage, amqp_decoder # noqa: E402
from ._pyamqp.message import Properties # noqa: E402
from ._pyamqp.message import Header # noqa: E402
from .compression import CompressionType # noqa: E402
from .constants import ( # noqa: E402
ConsumerOffsetSpecification,
Expand Down Expand Up @@ -56,6 +58,8 @@
"Consumer",
"RawMessage",
"Producer",
"Properties",
"Header",
"OffsetType",
"ConsumerOffsetSpecification",
"ConfirmationStatus",
Expand Down
Loading

0 comments on commit 3e24496

Please sign in to comment.