Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Franciszekjob/1498 3 websockets #1515

Draft
wants to merge 14 commits into
base: franciszekjob/1498-2-get-compiled-casm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions starknet_py/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"starknet_py.tests.e2e.fixtures.contracts_v1",
"starknet_py.tests.e2e.fixtures.misc",
"starknet_py.tests.e2e.fixtures.devnet",
"starknet_py.tests.e2e.fixtures.devnet_ws",
"starknet_py.tests.e2e.fixtures.constants",
"starknet_py.tests.e2e.client.fixtures.transactions",
"starknet_py.tests.e2e.client.fixtures.prepare_network",
Expand Down
244 changes: 244 additions & 0 deletions starknet_py/net/full_node_ws_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
from typing import Any, Callable, Dict, List, Optional, Union, cast

from starknet_py.net.client_models import BlockHeader, EmittedEvent, Hash, Tag
from starknet_py.net.client_utils import _clear_none_values
from starknet_py.net.http_client import RpcHttpClient
from starknet_py.net.schemas.rpc.ws import (
EventsNotificationSchema,
NewHeadsNotificationSchema,
PendingTransactionsNotificationSchema,
ReorgNotificationSchema,
SubscribeResponseSchema,
TransactionStatusNotificationSchema,
UnsubscribeResponseSchema,
)
from starknet_py.net.ws_client import RpcWSClient
from starknet_py.net.ws_full_node_client_models import (
EventsNotification,
NewHeadsNotification,
NewTransactionStatus,
PendingTransactionsNotification,
ReorgNotification,
SubscribeResponse,
Transaction,
TransactionStatusNotification,
UnsubscribeResponse,
)

BlockId = Union[int, Hash, Tag]
HandlerNotification = Union[
NewHeadsNotification,
EventsNotification,
TransactionStatusNotification,
PendingTransactionsNotification,
ReorgNotification,
]
Handler = Callable[[HandlerNotification], Any]


class FullNodeWSClient:
"""
Starknet WebSocket client for RPC API.
"""

def __init__(self, node_url: str):
"""
:param node_url: URL of the node providing the WebSocket API.
"""
self.node_url: str = node_url
self._rpc_ws_client: RpcWSClient = RpcWSClient(node_url)
self._subscriptions: Dict[int, Handler] = {}

async def connect(self):
"""
Establishes the WebSocket connection.
"""
await self._rpc_ws_client.connect()

async def disconnect(self):
"""
Closes the WebSocket connection.
"""
await self._rpc_ws_client.disconnect()

async def _subscribe(
self,
handler: Callable[[Any], Any],
method: str,
params: Optional[Dict[str, Any]] = None,
) -> int:
data = await self._rpc_ws_client.send(method, params)
response = cast(
SubscribeResponse,
SubscribeResponseSchema().load(data),
)

self._subscriptions[response.subscription_id] = handler

return response.subscription_id

async def listen(self):
"""
Listens for incoming WebSocket messages.
"""
await self._rpc_ws_client.listen(self._handle_received_message)

def _handle_received_message(self, message: Dict):
if "params" not in message:
# TODO(#1498): Possibly move `handle_rpc_error` from `RpcHttpClient` to separate function
RpcHttpClient.handle_rpc_error(message)

subscription_id = message["params"]["subscription_id"]

if subscription_id not in self._subscriptions:
return

handler = self._subscriptions[subscription_id]
method = message["method"]

if method == "starknet_subscriptionNewHeads":
notification = cast(
NewHeadsNotification,
NewHeadsNotificationSchema().load(message["params"]),
)
handler(notification)

elif method == "starknet_subscriptionEvents":
notification = cast(
EventsNotification,
EventsNotificationSchema().load(message["params"]),
)
handler(notification)

elif method == "starknet_subscriptionTransactionStatus":
notification = cast(
TransactionStatusNotification,
TransactionStatusNotificationSchema().load(message["params"]),
)
handler(notification)

elif method == "starknet_subscriptionPendingTransactions":
notification = cast(
PendingTransactionsNotification,
PendingTransactionsNotificationSchema().load(message["params"]),
)
handler(notification)

elif method == "starknet_subscriptionReorg":
notification = cast(
ReorgNotification,
ReorgNotificationSchema().load(message["params"]),
)
handler(notification)

async def subscribe_new_heads(
self,
handler: Callable[[BlockHeader], Any],
block: Optional[BlockId],
) -> int:
"""
Creates a WebSocket stream which will fire events for new block headers.

:param handler: The function to call when a new block header is received.
:param block: The block to get notifications from, default is latest, limited to 1024 blocks back.
:return: The subscription ID.
"""
params = {"block": block} if block else {}
subscription_id = await self._subscribe(
handler, "starknet_subscribeNewHeads", params
)

return subscription_id

async def subscribe_events(
self,
handler: Callable[[EmittedEvent], Any],
from_address: Optional[int] = None,
keys: Optional[List[List[int]]] = None,
block: Optional[BlockId] = None,
) -> int:
"""
Creates a WebSocket stream which will fire events for new Starknet events with applied filters.

:param handler: The function to call when a new event is received.
:param from_address: Address which emitted the event.
:param keys: The keys to filter events by.
:param block: The block to get notifications from, default is latest, limited to 1024 blocks back.
:return: The subscription ID.
"""
params = {"from_address": from_address, "keys": keys, "block": block}
subscription_id = await self._subscribe(
handler, "starknet_subscribeEvents", params
)

return subscription_id

async def subscribe_transaction_status(
self,
handler: Callable[[NewTransactionStatus], Any],
transaction_hash: int,
block: Optional[BlockId] = None,
) -> int:
"""
Creates a WebSocket stream which will fire events when a transaction status is updated.

:param handler: The function to call when a new transaction status is received.
:param transaction_hash: The transaction hash to fetch status updates for.
:param block: The block to get notifications from, default is latest, limited to 1024 blocks back.
:return: The subscription ID.
"""
params = {"transaction_hash": transaction_hash, "block": block}
params = _clear_none_values(params)
subscription_id = await self._subscribe(
handler, "starknet_subscribeTransactionStatus", params
)

return subscription_id

async def subscribe_pending_transactions(
self,
handler: Callable[[Union[int, Transaction]], Any],
transaction_details: Optional[bool],
sender_address: Optional[List[int]],
) -> int:
"""
Creates a WebSocket stream which will fire events when a new pending transaction is added.
While there is no mempool, this notifies of transactions in the pending block.

:param handler: The function to call when a new pending transaction is received.
:param transaction_details: Whether to include transaction details in the notification.
If false, only hash is returned.
:param sender_address: The sender address to filter transactions by.
:return: The subscription ID.
"""
params = {
"transaction_details": transaction_details,
"sender_address": sender_address,
}
subscription_id = await self._subscribe(
handler, "starknet_subscribePendingTransactions", params
)

return subscription_id

async def unsubscribe(self, subscription_id: int) -> bool:
"""
Close a previously opened WebSocket stream, with the corresponding subscription id.

:param subscription_id: ID of the subscription to close.
:return: True if the unsubscription was successful, False otherwise.
"""
if subscription_id not in self._subscriptions:
return False

params = {"subscription_id": subscription_id}
res = await self._rpc_ws_client.send("starknet_unsubscribe", params)

unsubscribe_response = cast(
UnsubscribeResponse, UnsubscribeResponseSchema().load(res)
)

if unsubscribe_response:
del self._subscriptions[subscription_id]

return unsubscribe_response.result
103 changes: 103 additions & 0 deletions starknet_py/net/schemas/rpc/ws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from marshmallow import Schema, fields, post_load

from starknet_py.net.schemas.common import Felt
from starknet_py.net.schemas.rpc.block import BlockHeaderSchema
from starknet_py.net.schemas.rpc.event import EmittedEventSchema
from starknet_py.net.ws_full_node_client_models import (
EventsNotification,
NewHeadsNotification,
NewTransactionStatus,
PendingTransactionsNotification,
ReorgData,
ReorgNotification,
SubscribeResponse,
TransactionStatusNotification,
UnsubscribeResponse,
)


class SubscribeResponseSchema(Schema):
subscription_id = fields.Integer(data_key="subscription_id", required=True)

@post_load
def make_dataclass(self, data, **kwargs) -> SubscribeResponse:
return SubscribeResponse(**data)


class NewHeadsNotificationSchema(Schema):
subscription_id = fields.Integer(data_key="subscription_id", required=True)
result = fields.Nested(BlockHeaderSchema(), data_key="result", required=True)

@post_load
def make_dataclass(self, data, **kwargs) -> NewHeadsNotification:
return NewHeadsNotification(**data)


class EventsNotificationSchema(Schema):
subscription_id = fields.Integer(data_key="subscription_id", required=True)
result = fields.Nested(EmittedEventSchema(), data_key="result", required=True)

@post_load
def make_dataclass(self, data, **kwargs) -> EventsNotification:
return EventsNotification(**data)


class NewTransactionStatusSchema(Schema):
transaction_hash = Felt(data_key="transaction_hash", required=True)
status = fields.Dict(data_key="status", required=True)

@post_load
def make_dataclass(self, data, **kwargs) -> NewTransactionStatus:
return NewTransactionStatus(**data)


class TransactionStatusNotificationSchema(Schema):
subscription_id = fields.Integer(data_key="subscription_id", required=True)
result = fields.Nested(
NewTransactionStatusSchema(), data_key="result", required=True
)

@post_load
def make_dataclass(self, data, **kwargs) -> TransactionStatusNotification:
return TransactionStatusNotification(**data)


class PendingTransactionsNotificationSchema(Schema):
subscription_id = fields.Integer(data_key="subscription_id", required=True)
result = fields.Dict(data_key="result", required=True)

@post_load
def make_dataclass(self, data, **kwargs) -> PendingTransactionsNotification:
return PendingTransactionsNotification(**data)


class UnsubscribeResponseSchema(Schema):
result = fields.Boolean(data_key="result", required=True)

@post_load
def make_dataclass(self, data, **kwargs) -> UnsubscribeResponse:
return UnsubscribeResponse(**data)


class ReorgDataSchema(Schema):
starting_block_hash = Felt(data_key="starting_block_hash", required=True)
starting_block_number = fields.Integer(
data_key="starting_block_number", required=True, validate=lambda x: x >= 0
)
ending_block_hash = Felt(data_key="ending_block_hash", required=True)
ending_block_number = fields.Integer(
data_key="ending_block_number", required=True, validate=lambda x: x >= 0
)

@post_load
def make_dataclass(self, data, **kwargs) -> ReorgData:
return ReorgData(**data)


class ReorgNotificationSchema(Schema):
subscription_id = fields.Integer(data_key="subscription_id", required=True)
result = fields.Nested(ReorgDataSchema(), data_key="result", required=True)

@post_load
def make_dataclass(self, data, **kwargs) -> ReorgNotification:
return ReorgNotification(**data)
Loading
Loading