Skip to content

Commit

Permalink
Add block subscribe/unsubscribe websocket methods (#418)
Browse files Browse the repository at this point in the history
Co-authored-by: ruiqic <[email protected]>
Co-authored-by: Rui Qi Chen <[email protected]>
  • Loading branch information
3 people authored Apr 17, 2024
1 parent 53adfce commit 1153048
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 2 deletions.
55 changes: 54 additions & 1 deletion src/solana/rpc/websocket_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""This module contains code for interacting with the RPC Websocket endpoint."""

import itertools
from typing import Any, Dict, List, Optional, Sequence, Union, cast

from solders.account_decoder import UiDataSliceConfig
from solders.transaction_status import TransactionDetails
from solders.pubkey import Pubkey
from solders.rpc.config import (
RpcAccountInfoConfig,
Expand All @@ -11,6 +13,9 @@
RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
RpcTransactionLogsFilterMentions,
RpcBlockSubscribeConfig,
RpcBlockSubscribeFilter,
RpcBlockSubscribeFilterMentions,
)
from solders.rpc.filter import Memcmp
from solders.rpc.requests import (
Expand All @@ -19,6 +24,8 @@
Body,
LogsSubscribe,
LogsUnsubscribe,
BlockSubscribe,
BlockUnsubscribe,
ProgramSubscribe,
ProgramUnsubscribe,
RootSubscribe,
Expand All @@ -45,7 +52,7 @@

from solana.rpc import types
from solana.rpc.commitment import Commitment
from solana.rpc.core import _ACCOUNT_ENCODING_TO_SOLDERS, _COMMITMENT_TO_SOLDERS
from solana.rpc.core import _ACCOUNT_ENCODING_TO_SOLDERS, _COMMITMENT_TO_SOLDERS, _TX_ENCODING_TO_SOLDERS


class SubscriptionError(Exception):
Expand Down Expand Up @@ -176,6 +183,52 @@ async def logs_unsubscribe(
await self.send_data(req)
del self.subscriptions[subscription]

async def block_subscribe(
self,
filter_: Union[RpcBlockSubscribeFilter, RpcBlockSubscribeFilterMentions] = RpcBlockSubscribeFilter.All,
commitment: Optional[Commitment] = None,
encoding: Optional[str] = None,
transaction_details: TransactionDetails = None,
show_rewards: Optional[bool] = None,
max_supported_transaction_version: Optional[int] = None,
) -> None:
"""Subscribe to blocks.
Args:
filter_: filter criteria for the blocks.
commitment: The commitment level to use.
encoding: Encoding to use.
transaction_details: level of transaction detail to return.
show_rewards: whether to populate the rewards array.
max_supported_transaction_version: the max transaction version to return in responses.
"""
req_id = self.increment_counter_and_get_id()
commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
encoding_to_use = None if encoding is None else _TX_ENCODING_TO_SOLDERS[encoding]
config = RpcBlockSubscribeConfig(
commitment=commitment_to_use,
encoding=encoding_to_use,
transaction_details=transaction_details,
show_rewards=show_rewards,
max_supported_transaction_version=max_supported_transaction_version,
)
req = BlockSubscribe(filter_, config, req_id)
await self.send_data(req)

async def block_unsubscribe(
self,
subscription: int,
) -> None:
"""Unsubscribe from blocks.
Args:
subscription: ID of subscription to cancel.
"""
req_id = self.increment_counter_and_get_id()
req = BlockUnsubscribe(subscription, req_id)
await self.send_data(req)
del self.subscriptions[subscription]

async def program_subscribe( # pylint: disable=too-many-arguments
self,
program_id: Pubkey,
Expand Down
2 changes: 1 addition & 1 deletion tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ services:
- "9900:9900"
environment:
[
SOLANA_RUN_SH_VALIDATOR_ARGS=--rpc-pubsub-enable-vote-subscription
SOLANA_RUN_SH_VALIDATOR_ARGS=--rpc-pubsub-enable-vote-subscription --rpc-pubsub-enable-block-subscription
]
25 changes: 25 additions & 0 deletions tests/integration/test_websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from solders.rpc.responses import (
AccountNotification,
LogsNotification,
BlockNotification,
ProgramNotification,
RootNotification,
SignatureNotification,
Expand Down Expand Up @@ -103,6 +104,18 @@ async def logs_subscribed_mentions_filter(
await websocket.logs_unsubscribe(subscription_id)


@pytest.fixture
async def block_subscribed(websocket: SolanaWsClientProtocol) -> AsyncGenerator[None, None]:
"""Setup block subscription."""
await websocket.block_subscribe()
first_resp = await websocket.recv()
msg = first_resp[0]
assert isinstance(msg, SubscriptionResult)
subscription_id = msg.result
yield
await websocket.block_unsubscribe(subscription_id)


@pytest.fixture
async def program_subscribed(
websocket: SolanaWsClientProtocol, test_http_client_async: AsyncClient
Expand Down Expand Up @@ -250,6 +263,18 @@ async def test_logs_subscribe_mentions_filter(
assert msg.result.value.logs[0] == "Program 11111111111111111111111111111111 invoke [1]"


@pytest.mark.integration
async def test_block_subscribe(
websocket: SolanaWsClientProtocol,
block_subscribed: None,
):
"""Test block subscription."""
main_resp = await websocket.recv()
msg = main_resp[0]
assert isinstance(msg, BlockNotification)
assert msg.result.value.slot >= 0


@pytest.mark.integration
async def test_program_subscribe(
test_http_client_async: AsyncClient,
Expand Down

0 comments on commit 1153048

Please sign in to comment.