Skip to content

Commit

Permalink
ampq over websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
aloschilov committed Jun 18, 2024
1 parent a1285d4 commit 43d8428
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 15 deletions.
10 changes: 2 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,8 @@ RABBITMQ_CONTAINER_NAME:=aiormq_rabbitmq
RABBITMQ_IMAGE:=mosquito/aiormq-rabbitmq

rabbitmq:
docker kill $(RABBITMQ_CONTAINER_NAME) || true
docker run --pull=always --rm -d \
--name $(RABBITMQ_CONTAINER_NAME) \
-p 5671:5671 \
-p 5672:5672 \
-p 15671:15671 \
-p 15672:15672 \
$(RABBITMQ_IMAGE)
docker compose down
docker compose up -d

upload:
poetry publish --build --skip-existing
Expand Down
13 changes: 9 additions & 4 deletions aiormq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from .tools import Countdown, censor_url

from .websocket_transport import open_websocket_connection

# noinspection PyUnresolvedReferences
try:
Expand Down Expand Up @@ -452,10 +453,14 @@ async def connect(

log.debug("Connecting to: %s", self)
try:
reader, writer = await asyncio.open_connection(
self.url.host, self.url.port, ssl=ssl_context,
**self.__create_connection_kwargs,
)

if self.url.scheme == "ws" or self.url.scheme == "wss":
reader, writer = await open_websocket_connection(self.url)
else:
reader, writer = await asyncio.open_connection(
self.url.host, self.url.port, ssl=ssl_context,
**self.__create_connection_kwargs,
)

frame_receiver = FrameReceiver(reader)
except OSError as e:
Expand Down
116 changes: 116 additions & 0 deletions aiormq/websocket_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from asyncio import streams, transports, get_event_loop, events
import asyncio

import aiohttp
from yarl import URL

import logging

logging.basicConfig(level=logging.DEBUG)

# Create a custom logger
logger = logging.getLogger(__name__)

# Create handlers
c_handler = logging.StreamHandler()
c_handler.setLevel(logging.WARNING)

# Create formatters and add it to handlers
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
c_handler.setFormatter(c_format)

# Add handlers to the logger
logger.addHandler(c_handler)



class WebsocketTransport(transports.Transport):

def __init__(self, loop, protocol, url, extra=None):

if extra is None:
extra = {}
self._extra = extra

self.url = url

self._loop = loop
self._protocol = protocol
self._closing = False # Set when close() or write_eof() called.
self._paused = False

self.task = self._loop.create_task(self.main_loop())

self.write_queue = asyncio.Queue()
self.read_queue = asyncio.Queue()
self.ws = None

async def sender(self, ws):
while True:
data = await self.write_queue.get()
await ws.send_bytes(data)

async def receiver(self, ws):
async for msg in ws:
if msg.type == aiohttp.WSMsgType.BINARY:
self._protocol.data_received(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
break

self._protocol.eof_received()

async def main_loop(self):
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self.url) as ws:
self.ws = ws
await asyncio.gather(
self.sender(ws),
self.receiver(ws)
)

def get_protocol(self):
return self._protocol

def set_protocol(self, protocol):
return self._protocol

def is_closing(self):
return self._closing

def write(self, data):
self.write_queue.put_nowait(data)

def is_reading(self):
return not self._paused and not self._closing

def resume_reading(self):
if self._closing or not self._paused:
return
self._paused = False
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def close(self) -> None:
self._closing = True
self.task.cancel()
self._protocol._closed.set_result(None)


async def create_websocket_connection(protocol_factory, url, *args, **kwargs):
loop = get_event_loop()
protocol = protocol_factory()
transport = WebsocketTransport(loop, protocol, url, **kwargs)
return transport, protocol


_DEFAULT_LIMIT = 2 ** 16 # 64 KiB


async def open_websocket_connection(url: URL, limit=_DEFAULT_LIMIT, *args, **kwargs):
loop = events.get_running_loop()
reader = streams.StreamReader(limit=limit, loop=loop)
protocol = streams.StreamReaderProtocol(reader, loop=loop)
factory = lambda: protocol
transport, _ = await create_websocket_connection(factory, url, *args, **kwargs)
writer = streams.StreamWriter(transport, protocol, reader, loop)
return reader, writer
48 changes: 48 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Docker Compose description of the combined application.
#
# 'docker-compose up' will run this.

# This section describes the various containers (services).
services:

rabbitmq:
# There is a prebuilt RabbitMQ image; see
# https://hub.docker.com/_/rabbitmq/ for details.
# This variant is built on Alpine Linux (it's smaller) and includes
# the management UI.
image: 'mosquito/aiormq-rabbitmq'

# These ports are exposed on the host; 'hostport:containerport'.
# You could connect to this server from outside with the *host's*
# DNS name or IP address and port 5672 (the left-hand side of the
# colon).
ports:
# The standard AMQP protocol port
- '5671:5671'
- '5672:5672'
# HTTP management UI
- '15671:15671'
- '15672:15672'


# Run this container on a private network for this application.
# This is necessary for magic Docker DNS to work: other containers
# also running on this network will see a host name "rabbitmq"
# (the name of this section) and the internal port 5672, even though
# that's not explicitly published above.
networks:
- network

websocket-tcp-relay:
image: 'cloudamqp/websocket-tcp-relay:latest'
ports:
# ws socket port
- '15670:15670'
command: ["/usr/bin/websocket-tcp-relay", "--bind=0.0.0.0","--upstream=tcp://rabbitmq:5672"]
networks:
- network

networks:
# Declare our private network. We must declare one for the magic
# Docker DNS to work, but otherwise its default settings are fine.
network: {}
Loading

0 comments on commit 43d8428

Please sign in to comment.