Skip to content

Commit

Permalink
asn1 over soup
Browse files Browse the repository at this point in the history
  • Loading branch information
SamDanielThangarajan committed Jan 27, 2025
1 parent 22924d7 commit 78657e2
Show file tree
Hide file tree
Showing 23 changed files with 1,455 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ classifiers = [
dependencies = [
'attrs>=23.1',
'chevron>=0.14.0',
'click>=8.1'
'click>=8.1',
'asn1>=2.6',
'asn1tools==0.164.0'
]
dynamic = ["version"]

Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
attrs>=23.1
chevron>=0.14.0
click>=8.1
asn1>=2.6
asn1tools==0.164.0
pytest>=7.4
pytest-cov>=4.1
pytest-asyncio>=0.23
49 changes: 49 additions & 0 deletions src/nasdaq_protocols/asn1_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Callable
from nasdaq_protocols import soup

from .soup_session import (
OnAns1CloseCoro,
OnAsn1MessageCoro,
Ans1SoupSessionId,
Asn1SoupClientSession
)
from .codegen import generate_soup_app


__all__ = [
'OnAns1CloseCoro',
'OnAsn1MessageCoro',
'Ans1SoupSessionId',
'generate_soup_app',
'connect_async_soup'
]


async def connect_async_soup(remote: tuple[str, int], user: str, passwd: str, session_id, # pylint: disable=R0913
session_factory: Callable[[soup.SoupClientSession], Asn1SoupClientSession],
sequence: int = 0,
client_heartbeat_interval: int = 10,
server_heartbeat_interval: int = 10):
"""
Connect to the Soup-Asn1 server.
:param remote: tuple of host and port
:param user: Username to login
:param passwd: Password to login
:param session_id: Name of the session to join [Default=''] .
:param session_factory: Factory to create a ClientSession.
:param sequence: The sequence number. [Default=1]
:param client_heartbeat_interval: seconds between client heartbeats.
:param server_heartbeat_interval: seconds between server heartbeats.
:return: ClientSession
"""

# Create a soup session
soup_session = await soup.connect_async(
remote, user, passwd, session_id, sequence=sequence,
client_heartbeat_interval=client_heartbeat_interval,
server_heartbeat_interval=server_heartbeat_interval
)

# Create an asn1-soup-session (with the soup session created above)
return session_factory(soup_session)
32 changes: 32 additions & 0 deletions src/nasdaq_protocols/asn1_app/codegen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import click

from nasdaq_protocols.common import Ans1Generator


__all__ = [
'generate_soup_app'
]


@click.command()
@click.option('--asn1-files-dir', type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True))
@click.option('--app-name', type=click.STRING)
@click.option('--pdu-name', type=click.STRING)
@click.option('--prefix', type=click.STRING, default='')
@click.option('--op-dir', type=click.Path(exists=True, writable=True))
@click.option('--package-name', type=click.STRING)
@click.option('--init-file/--no-init-file', show_default=True, default=True)
def generate_soup_app(asn1_files_dir, app_name, pdu_name, prefix, op_dir, package_name, init_file):
generator = Ans1Generator(
asn1_files_dir,
app_name,
pdu_name,
op_dir,
template='ans1_soup_app.mustache',
prefix=prefix,
package_name=package_name,
generate_init_file=init_file
)
generator.generate()


99 changes: 99 additions & 0 deletions src/nasdaq_protocols/asn1_app/soup_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import asyncio
from typing import Callable, Type, Awaitable, ClassVar

import attrs
from nasdaq_protocols.common import DispatchableMessageQueue, logable
from nasdaq_protocols import soup
from nasdaq_protocols.common import Asn1Message


__all__ = [
'OnAsn1MessageCoro',
'OnAns1CloseCoro',
'Ans1SoupSessionId',
'Asn1SoupClientSession'
]
OnAsn1MessageCoro = Callable[[Type[Asn1Message]], Awaitable[None]]
OnAns1CloseCoro = Callable[[], Awaitable[None]]


@attrs.define(auto_attribs=True)
class Ans1SoupSessionId:
soup_session_id: soup.SoupSessionId = None

def __str__(self):
if self.soup_session_id:
return f'asn1-{self.soup_session_id}'
return 'asn1-nosoup'


@attrs.define(auto_attribs=True)
@logable
class Asn1SoupClientSession:
""" Soup client session that can read messages from soup server.
Currently, only one pdu is supported.
"""
Asn1Message: ClassVar[Asn1Message]
soup_session: soup.SoupClientSession
on_msg_coro: OnAsn1MessageCoro = None
on_close_coro: OnAns1CloseCoro = None
closed: bool = False
_session_id: Ans1SoupSessionId = None
_close_event: asyncio.Event = None
_message_queue: DispatchableMessageQueue = None

def __init_subclass__(cls, **kwargs):
if 'asn1_message' not in kwargs:
raise AttributeError("Missing 'asn1_message' attribute")
cls.Asn1Message = kwargs['asn1_message']

def __attrs_post_init__(self):
self._session_id = Ans1SoupSessionId(self.soup_session.session_id)
self._message_queue = DispatchableMessageQueue(self._session_id, self.on_msg_coro)
self.soup_session.set_handlers(on_msg_coro=self._on_soup_message, on_close_coro=self._on_soup_close)
self.soup_session.start_dispatching()

async def receive_message(self):
"""
Asynchronously receive a message from the itch session.
This method blocks until a message is received by the session.
"""
return await self._message_queue.get()

async def close(self):
"""
Asynchronously close the itch session.
"""
if self._close_event:
self.log.debug('%s> closing in progress..', self._session_id)
return
self._close_event = asyncio.Event()
self.soup_session.initiate_close()
await self._close_event.wait()
self.log.debug('%s> closed.', self._session_id)

async def _on_soup_message(self, message: soup.SoupMessage):
if isinstance(message, soup.SequencedData):
self.log.debug('%s> incoming sequenced bytes_', self._session_id)
decoded = self.decode(message.data)
await self._message_queue.put(decoded[1])
# await self._message_queue.put(
# self.decode(message.data)[1]
# )

async def _on_soup_close(self):
await self._message_queue.stop()
if self.on_close_coro is not None:
await self.on_close_coro()
if self._close_event:
self._close_event.set()
self.closed = True

@classmethod
def decode(cls, bytes_: bytes):
"""
Decode the given bytes into an asn1 message.
"""
return cls.Asn1Message.from_bytes(bytes_)
1 change: 1 addition & 0 deletions src/nasdaq_protocols/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .message_queue import *
from .session import *
from .message import *
from .asn1 import *
3 changes: 3 additions & 0 deletions src/nasdaq_protocols/common/asn1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .types import *
from .structures import *
from .codegen import *
Loading

0 comments on commit 78657e2

Please sign in to comment.