diff --git a/_old/_proxy/neon_rpc_api_model/neon_rpc_api_worker.py b/_old/_proxy/neon_rpc_api_model/neon_rpc_api_worker.py index 9d182ec8..095673eb 100644 --- a/_old/_proxy/neon_rpc_api_model/neon_rpc_api_worker.py +++ b/_old/_proxy/neon_rpc_api_model/neon_rpc_api_worker.py @@ -81,52 +81,3 @@ def eth_sendTransaction(self, tx: Dict[str, Any]) -> str: tx = self.eth_signTransaction(tx) return self.eth_sendRawTransaction(tx['raw']) - - @staticmethod - def _mp_pool_tx(neon_tx_info: NeonTxInfo) -> Dict[str, Any]: - to_addr = NeonAddress.from_raw(neon_tx_info.to_addr) - if to_addr: - to_addr = to_addr.checksum_address - - return { - 'blockHash': '0x' + '0' * 64, - 'blockNumber': None, - 'transactionIndex': None, - 'from': NeonAddress.from_raw(neon_tx_info.addr).checksum_address, - 'gas': hex(neon_tx_info.gas_limit), - 'gasPrice': hex(neon_tx_info.gas_price), - 'hash': neon_tx_info.sig, - 'input': neon_tx_info.calldata, - 'nonce': hex(neon_tx_info.nonce), - 'to': to_addr, - 'value': hex(neon_tx_info.value), - 'chainId': hex(neon_tx_info.chain_id) if neon_tx_info.has_chain_id else None - } - - def _mp_pool_queue(self, tx_list: List[NeonTxInfo]) -> Dict[str, Any]: - sender_addr = '' - sender_pool: Dict[int, Any] = dict() - sender_pool_dict: Dict[str, Any] = dict() - for tx in tx_list: - if sender_addr != tx.addr and len(sender_addr): - sender_pool_dict[sender_addr] = sender_pool - sender_pool = dict() - - sender_addr = tx.addr - sender_pool[tx.nonce] = self._mp_pool_tx(tx) - - if sender_addr: - sender_pool_dict[sender_addr] = sender_pool - - return sender_pool_dict - - def txpool_content(self) -> Dict[str, Any]: - result_dict: Dict[str, Any] = dict() - - req_id = get_req_id_from_log() - content = self._mempool_client.get_content(req_id) - - result_dict['pending'] = self._mp_pool_queue(content.pending_list) - result_dict['queued'] = self._mp_pool_queue(content.queued_list) - return result_dict - diff --git a/common/config/config.py b/common/config/config.py index 762e37c2..d239dbf9 100644 --- a/common/config/config.py +++ b/common/config/config.py @@ -94,6 +94,8 @@ class Config: # Statistic configuration gather_stat_name: Final[str] = "GATHER_STATISTICS" # Proxy configuration + rpc_private_ip_name: Final[str] = "RPC_PRIVATE_IP" + rpc_private_port_name: Final[str] = "RPC_PRIVATE_PORT" rpc_public_port_name: Final[str] = "RPC_PUBLIC_PORT" rpc_process_cnt_name: Final[str] = "RPC_PROCESS_COUNT" rpc_worker_cnt_name: Final[str] = "RPC_WORKER_COUNT" @@ -432,6 +434,14 @@ def gather_stat(self) -> bool: ######################### # Proxy configuration + @cached_property + def rpc_private_ip(self) -> str: + return os.environ.get(self.rpc_private_ip_name, self.base_service_ip) + + @cached_property + def rpc_private_port(self) -> int: + return self._env_num(self.rpc_private_port_name, self.rpc_public_port + 1, 8000, 25000) + @cached_property def rpc_public_port(self) -> int: return self._env_num(self.rpc_public_port_name, 9090, 8000, 25000) @@ -832,6 +842,8 @@ def to_string(self) -> str: self.gather_stat_name: self.gather_stat, self.debug_cmd_line_name: self.debug_cmd_line, # Proxy configuration + self.rpc_private_ip_name: self.rpc_private_ip, + self.rpc_private_port_name: self.rpc_private_port, self.rpc_public_port_name: self.rpc_public_port, self.rpc_process_cnt_name: self.rpc_process_cnt, self.rpc_worker_cnt_name: self.rpc_worker_cnt, diff --git a/common/ethereum/hash.py b/common/ethereum/hash.py index 9649f52b..a20b6a9c 100644 --- a/common/ethereum/hash.py +++ b/common/ethereum/hash.py @@ -5,7 +5,7 @@ import eth_utils from typing_extensions import Self -from ..utils.cached import cached_method +from ..utils.cached import cached_method, cached_property from ..utils.format import hex_to_bytes, bytes_to_hex from ..utils.pydantic import PlainValidator, PlainSerializer @@ -52,7 +52,7 @@ def from_raw(cls, raw: _RawHash) -> Self: @classmethod def from_not_none(cls, raw: _RawHash) -> Self: if raw is None: - raise ValueError(f"Wrong input: null") + raise ValueError("Wrong input: null") return cls.from_raw(raw) @property @@ -127,6 +127,10 @@ class EthHash32(_BaseHash): HashSize: ClassVar[int] = 32 ZeroHash: ClassVar[str] = "0x" + "00" * HashSize + @cached_property + def ident(self) -> str: + return self.to_bytes()[:4].hex() + def to_string(self, default: str | None = None) -> str | None: return self._to_string() if self._data else default diff --git a/common/http/utils.py b/common/http/utils.py index 056b1150..05c43f2f 100644 --- a/common/http/utils.py +++ b/common/http/utils.py @@ -103,6 +103,9 @@ def set_property_value(self, name: str, value) -> Self: self._prop_name_set.add(name) return self + def get_property_value(self, name: str, default): + return getattr(self, name, default) + @dataclass(frozen=True) class HttpMethod: @@ -173,7 +176,7 @@ def http_validate_method_name(name: str) -> None: def _validate_request_id(value: HttpRequestId) -> HttpRequestId: if (value is None) or isinstance(value, int) or isinstance(value, str): return value - raise ValueError(f"'id' must be a string or integer") + raise ValueError("'id' must be a string or integer") HttpRequestIdField = Annotated[HttpRequestId, PlainValidator(_validate_request_id)] diff --git a/common/neon/account.py b/common/neon/account.py index abfb4e9a..d04d422c 100644 --- a/common/neon/account.py +++ b/common/neon/account.py @@ -3,6 +3,7 @@ import random from typing import Final, Annotated, Union +import eth_account import eth_keys import eth_utils from pydantic.functional_serializers import PlainSerializer @@ -10,6 +11,7 @@ from typing_extensions import Self from ..ethereum.hash import EthAddress +from .transaction_model import NeonTxModel from ..utils.cached import cached_method, cached_property from ..utils.format import bytes_to_hex, hex_to_bytes, hex_to_int @@ -92,11 +94,14 @@ def from_dict(cls, data: _DictAccount | NeonAccount) -> Self: return cls(address=address, chain_id=chain_id, private_key=private_key) @classmethod - def from_private_key(cls, pk_data: str | bytes, chain_id: int) -> Self: - pk_data = hex_to_bytes(pk_data) - if len(pk_data) < 32: - raise ValueError(f"Not enough data for private key: {len(pk_data)}") - return cls.from_raw(eth_keys.keys.PrivateKey(pk_data[:32]), chain_id) + def from_private_key(cls, pk_data: str | bytes | eth_keys.keys.PrivateKey, chain_id: int) -> Self: + if isinstance(pk_data, str): + pk_data = hex_to_bytes(pk_data) + if isinstance(pk_data, bytes): + if len(pk_data) < 32: + raise ValueError(f"Not enough data for private key: {len(pk_data)}") + pk_data = eth_keys.keys.PrivateKey(pk_data[:32]) + return cls.from_raw(pk_data, chain_id) def to_dict(self: NeonAccount) -> _DictAccount: res = dict( @@ -145,6 +150,15 @@ def private_key(self) -> eth_keys.keys.PrivateKey: assert self._private_key return self._private_key + def sign_msg(self, data: bytes) -> eth_keys.keys.Signature: + return self.private_key.sign_msg(data) + + def sign_tx(self, tx: NeonTxModel) -> bytes: + tx_dict = tx.to_eth_dict() + tx_dict["chainId"] = self._chain_id + signed_tx = eth_account.Account.sign_transaction(tx_dict, self.private_key) + return bytes(signed_tx.raw_transaction) + def __str__(self) -> str: return self.to_string() diff --git a/common/neon/transaction_model.py b/common/neon/transaction_model.py index f8e8ab58..d9211a4d 100644 --- a/common/neon/transaction_model.py +++ b/common/neon/transaction_model.py @@ -146,6 +146,16 @@ def to_rlp_tx(self) -> bytes: return tx.to_bytes() + def to_eth_dict(self) -> dict: + return dict( + nonce=self.nonce, + gasPrice=self.gas_price, + gas=self.gas_limit, + to=self.to_address.to_checksum(), + value=self.value, + data=self.call_data.to_string(), + ) + @property def has_chain_id(self) -> bool: return self.chain_id is not None diff --git a/common/neon_rpc/client.py b/common/neon_rpc/client.py index 26533759..7677eeea 100644 --- a/common/neon_rpc/client.py +++ b/common/neon_rpc/client.py @@ -144,7 +144,7 @@ async def get_neon_account(self, account: NeonAccount, block: NeonBlockHdrModel acct_list = await self.get_neon_account_list([account], block) return acct_list[0] - async def get_state_tx_cnt(self, account: NeonAccount, block: NeonBlockHdrModel | None) -> int: + async def get_state_tx_cnt(self, account: NeonAccount, block: NeonBlockHdrModel | None = None) -> int: acct = await self.get_neon_account(account, block) return acct.state_tx_cnt diff --git a/proxy/base/server.py b/proxy/base/intl_server.py similarity index 84% rename from proxy/base/server.py rename to proxy/base/intl_server.py index 4ae3970a..0b4b7179 100644 --- a/proxy/base/server.py +++ b/proxy/base/intl_server.py @@ -11,8 +11,8 @@ from common.utils.process_pool import ProcessPool -class BaseProxyComponent: - def __init__(self, server: BaseProxyServer): +class BaseIntlProxyComponent: + def __init__(self, server: BaseIntlProxyServer): self._server = server @cached_property @@ -32,17 +32,17 @@ def _msg_filter(self) -> LogMsgFilter: return self._server._msg_filter # noqa -class BaseProxyApi(BaseProxyComponent, AppDataApi): - def __init__(self, server: BaseProxyServer) -> None: +class BaseProxyApi(BaseIntlProxyComponent, AppDataApi): + def __init__(self, server: BaseIntlProxyServer) -> None: AppDataApi.__init__(self) - BaseProxyComponent.__init__(self, server) + BaseIntlProxyComponent.__init__(self, server) -class BaseProxyServer(AppDataServer): +class BaseIntlProxyServer(AppDataServer): class _ProcessPool(ProcessPool): - def __init__(self, server: BaseProxyServer) -> None: + def __init__(self, server: BaseIntlProxyServer) -> None: super().__init__() - self._server: BaseProxyServer | None = server + self._server: BaseIntlProxyServer | None = server def _on_process_start(self, idx: int) -> None: self._server._on_process_start(idx) diff --git a/proxy/base/mp_api.py b/proxy/base/mp_api.py index 8fe457d2..59bf0b0d 100644 --- a/proxy/base/mp_api.py +++ b/proxy/base/mp_api.py @@ -45,7 +45,7 @@ def neon_tx_hash(self) -> EthTxHash: @cached_property def tx_id(self) -> str: - return self.neon_tx_hash.to_bytes()[:4].hex() + return self.neon_tx_hash.ident @property def sender(self) -> EthAddress: @@ -113,7 +113,7 @@ def to_string(self) -> str: @cached_property def tx_id(self) -> str: - return self.neon_tx_hash.to_bytes()[:4].hex() + return self.neon_tx_hash.ident @property def process_time_nsec(self) -> int: @@ -166,6 +166,7 @@ def is_empty(self) -> bool: class MpRequest(BaseModel): ctx_id: str + chain_id: int class MpTxCntRequest(BaseModel): @@ -221,5 +222,5 @@ class MpGetTxResp(BaseModel): class MpTxPoolContentResp(BaseModel): - pending_list: tuple[NeonTxModel, ...] - queued_list: tuple[NeonTxModel, ...] + pending_list: list[NeonTxModel] + queued_list: list[NeonTxModel] diff --git a/proxy/base/mp_client.py b/proxy/base/mp_client.py index 613c873f..956538a2 100644 --- a/proxy/base/mp_client.py +++ b/proxy/base/mp_client.py @@ -62,8 +62,8 @@ async def get_tx_by_sender_nonce(self, ctx_id: str, sender: NeonAccount, tx_nonc resp = await self._get_tx_by_sender_nonce(req) return resp.tx - async def get_content(self, ctx_id: str) -> MpTxPoolContentResp: - return await self._get_content(MpRequest(ctx_id=ctx_id)) + async def get_content(self, ctx_id: str, chain_id: int) -> MpTxPoolContentResp: + return await self._get_content(MpRequest(ctx_id=ctx_id, chain_id=chain_id)) @AppDataClient.method(name="getGasPrice") async def _get_gas_price(self) -> MpGasPriceModel: ... diff --git a/proxy/base/op_api.py b/proxy/base/op_api.py index 237347fc..97491306 100644 --- a/proxy/base/op_api.py +++ b/proxy/base/op_api.py @@ -2,7 +2,9 @@ from typing_extensions import Self, ClassVar +from common.ethereum.bin_str import EthBinStrField from common.ethereum.hash import EthAddressField, EthAddress +from common.neon.transaction_model import NeonTxModel from common.solana.pubkey import SolPubKey, SolPubKeyField from common.solana.transaction_model import SolTxModel from common.utils.cached import cached_method @@ -79,6 +81,28 @@ class OpTokenSolAddressModel(BaseModel): token_sol_address: SolPubKeyField +class OpSignEthMsgRequest(BaseModel): + req_id: dict + sender: EthAddressField + data: EthBinStrField + + +class OpSignEthMsgResp(BaseModel): + signed_msg: EthBinStrField + error: str | None = None + + +class OpSignEthTxRequest(BaseModel): + req_id: dict + neon_tx: NeonTxModel + chain_id: int + + +class OpSignEthTxResp(BaseModel): + signed_tx: EthBinStrField + error: str | None = None + + class OpSignSolTxListRequest(BaseModel): req_id: dict owner: SolPubKeyField diff --git a/proxy/base/op_client.py b/proxy/base/op_client.py index 43e7a7dc..ef809eae 100644 --- a/proxy/base/op_client.py +++ b/proxy/base/op_client.py @@ -3,6 +3,9 @@ from typing import Sequence from common.app_data.client import AppDataClient +from common.ethereum.bin_str import EthBinStrField +from common.ethereum.hash import EthAddressField +from common.neon.transaction_model import NeonTxModel from common.solana.pubkey import SolPubKey from common.solana.transaction import SolTx from common.solana.transaction_model import SolTxModel @@ -14,6 +17,10 @@ OpResourceResp, OpTokenSolAddressModel, OpGetTokenSolAddressRequest, + OpSignEthMsgRequest, + OpSignEthMsgResp, + OpSignEthTxRequest, + OpSignEthTxResp, OpSignSolTxListRequest, OpSolTxListResp, OpGetSignerKeyListRequest, @@ -48,6 +55,14 @@ async def get_token_sol_address(self, req_id: dict, owner: SolPubKey, chain_id: resp = await self._get_token_sol_address(req) return resp.token_sol_address + async def sign_eth_msg(self, req_id: dict, sender: EthAddressField, data: EthBinStrField) -> OpSignEthMsgResp: + req = OpSignEthMsgRequest(req_id=req_id, sender=sender, data=data) + return await self._sign_eth_msg(req) + + async def sign_eth_tx(self, req_id: dict, neon_tx: NeonTxModel, chain_id: int) -> OpSignEthTxResp: + req = OpSignEthTxRequest(req_id=req_id, neon_tx=neon_tx, chain_id=chain_id) + return await self._sign_eth_tx(req) + async def sign_sol_tx_list(self, req_id: dict, owner: SolPubKey, tx_list: Sequence[SolTx]) -> tuple[SolTx, ...]: model_list = [SolTxModel.from_raw(tx) for tx in tx_list] req = OpSignSolTxListRequest(req_id=req_id, owner=owner, tx_list=model_list) @@ -78,6 +93,12 @@ async def _free_resource(self, request: OpFreeResourceRequest) -> OpResourceResp @AppDataClient.method(name="getOperatorTokenAddress") async def _get_token_sol_address(self, request: OpGetTokenSolAddressRequest) -> OpTokenSolAddressModel: ... + @AppDataClient.method(name="signEthMessage") + async def _sign_eth_msg(self, request: OpSignEthMsgRequest) -> OpSignEthMsgResp: ... + + @AppDataClient.method(name="signEthTransaction") + async def _sign_eth_tx(self, request: OpSignEthTxRequest) -> OpSignEthTxResp: ... + @AppDataClient.method(name="signSolanaTransactionList") async def _sign_sol_tx_list(self, request: OpSignSolTxListRequest) -> OpSolTxListResp: ... diff --git a/proxy/base/rpc_api.py b/proxy/base/rpc_api.py new file mode 100644 index 00000000..496e8268 --- /dev/null +++ b/proxy/base/rpc_api.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +from typing import ClassVar + +from pydantic import AliasChoices, Field +from typing_extensions import Self + +from common.ethereum.bin_str import EthBinStrField, EthBinStr +from common.ethereum.hash import EthAddressField, EthHash32Field, EthAddress, EthTxHash, EthBlockHashField, \ + EthTxHashField +from common.jsonrpc.api import BaseJsonRpcModel +from common.neon.transaction_meta_model import NeonTxMetaModel +from common.neon.transaction_model import NeonTxModel +from common.neon_rpc.api import EmulNeonCallModel +from common.utils.pydantic import HexUIntField + + +class RpcAccessItemModel(BaseJsonRpcModel): + address: EthAddressField + storageKeys: list[EthHash32Field] + + +class RpcEthTxRequest(BaseJsonRpcModel): + txType: HexUIntField = Field(default=0, validation_alias="type") + fromAddress: EthAddressField = Field( + default=EthAddress.default(), + validation_alias=AliasChoices("from", "fromAddress"), + ) + toAddress: EthAddressField = Field( + default=EthAddress.default(), + validation_alias=AliasChoices("to", "toAddress"), + ) + data: EthBinStrField = Field( + default=EthBinStr.default(), + validation_alias=AliasChoices("data", "input"), + ) + value: HexUIntField = Field(default=0) + nonce: HexUIntField = Field(default=0) + + gas: HexUIntField = Field(default=2**64) + gasPrice: HexUIntField = Field(default=2**64) + maxFeePerGas: HexUIntField = Field(default=2**64) + maxPriorityFeePerGas: HexUIntField = Field(default=2**64) + + accessList: list[RpcAccessItemModel] = Field(default_factory=list) + chainId: HexUIntField = Field(default=0) + + _default: ClassVar[RpcEthTxRequest | None] = None + + @classmethod + def default(cls) -> Self: + if not cls._default: + cls._default = cls( + fromAddress=EthAddress.default(), + toAddress=EthAddress.default(), + data=EthBinStr.default(), + ) + return cls._default + + def to_emulation_call(self, chain_id: int) -> EmulNeonCallModel: + return EmulNeonCallModel( + from_address=self.fromAddress, + to_address=self.toAddress, + value=self.value, + data=self.data.to_bytes(), + gas_limit=self.gas, + gas_price=self.gasPrice, + chain_id=chain_id + ) + + def to_neon_tx(self) -> NeonTxModel: + return NeonTxModel( + tx_type=self.txType, + neon_tx_hash=EthTxHash.default(), + from_address=self.fromAddress, + to_address=self.toAddress, + contract=EthAddress.default(), + nonce=self.nonce, + gas_price=self.gasPrice, + gas_limit=self.gas, + value=self.value, + call_data=self.data, + v=0, + r=0, + s=0, + ) + + +class RpcEthTxResp(BaseJsonRpcModel): + blockHash: EthBlockHashField | None + blockNumber: HexUIntField | None + transactionIndex: HexUIntField | None + txHash: EthTxHashField = Field(serialization_alias="hash") + txType: HexUIntField = Field(serialization_alias="type") + fromAddress: EthAddressField = Field(serialization_alias="from") + nonce: HexUIntField + gasPrice: HexUIntField + gas: HexUIntField + toAddress: EthAddressField = Field(serialization_alias="to") + value: HexUIntField + data: EthBinStrField = Field(serialization_alias="input") + chainId: HexUIntField | None + v: HexUIntField + r: HexUIntField + s: HexUIntField + + @classmethod + def from_raw(cls, meta: NeonTxMetaModel | NeonTxModel) -> Self: + if isinstance(meta, NeonTxMetaModel): + tx = meta.neon_tx + + rcpt = meta.neon_tx_rcpt + blockhash = rcpt.block_hash + slot = rcpt.slot + tx_idx = rcpt.neon_tx_idx + else: + tx = meta + + blockhash = None + slot = None + tx_idx = None + + return cls( + blockHash=blockhash, + blockNumber=slot, + transactionIndex=tx_idx, + txHash=tx.neon_tx_hash, + txType=tx.tx_type, + fromAddress=tx.from_address.to_string(), + nonce=tx.nonce, + gasPrice=tx.gas_price, + gas=tx.gas_limit, + toAddress=tx.to_address, + value=tx.value, + data=tx.call_data, + chainId=tx.chain_id, + v=tx.v, + r=tx.r, + s=tx.s, + ) diff --git a/proxy/rpc/gas_limit_calculator.py b/proxy/base/rpc_gas_limit_calculator.py similarity index 93% rename from proxy/rpc/gas_limit_calculator.py rename to proxy/base/rpc_gas_limit_calculator.py index 5f04b26d..1fe4b61d 100644 --- a/proxy/rpc/gas_limit_calculator.py +++ b/proxy/base/rpc_gas_limit_calculator.py @@ -6,7 +6,7 @@ from common.ethereum.hash import EthTxHash from common.ethereum.transaction import EthTx from common.neon.block import NeonBlockHdrModel -from common.neon.neon_program import NeonProg +from common.neon.neon_program import NeonProg, NeonIxMode from common.neon_rpc.api import EvmConfigModel, EmulNeonCallResp, EmulNeonCallModel from common.solana.account import SolAccountModel from common.solana.alt_program import SolAltProg @@ -17,12 +17,12 @@ from common.solana.signer import SolSigner from common.solana.transaction_legacy import SolLegacyTx from common.utils.cached import cached_property -from .server_abc import NeonProxyComponent +from .rpc_server_abc import BaseRpcServerComponent _LOG = logging.getLogger(__name__) -class NpGasLimitCalculator(NeonProxyComponent): +class RpcNeonGasLimitCalculator(BaseRpcServerComponent): _oz_gas_limit = 30_000 # openzeppelin gas-limit check _min_gas_limit = 25_000 # minimal gas limit for NeonTx: start (10k), execute (10k), finalization (5k) _u64_max = int.from_bytes(bytes([0xFF] * 8), "big") @@ -40,9 +40,9 @@ async def estimate( self, call: EmulNeonCallModel, sol_account_dict: dict[SolPubKey, SolAccountModel], - block: NeonBlockHdrModel = None, + block: NeonBlockHdrModel | None = None, ) -> int: - evm_cfg = await self.get_evm_cfg() + evm_cfg = await self._get_evm_cfg() resp = await self._core_api_client.emulate_neon_call( evm_cfg, call, @@ -119,7 +119,7 @@ def _sol_tx_from_eth_tx(self, eth_tx: EthTx, resp: EmulNeonCallResp) -> SolLegac neon_prog = self._neon_prog neon_prog.init_neon_tx(EthTxHash.from_raw(eth_tx.neon_tx_hash), eth_tx.to_bytes()) neon_prog.init_account_meta_list(resp.sol_account_meta_list) - ix_list.append(neon_prog.make_tx_step_from_data_ix(False, self._cfg.max_emulate_evm_step_cnt, 101)) + ix_list.append(neon_prog.make_tx_step_from_data_ix(NeonIxMode.Default, self._cfg.max_emulate_evm_step_cnt, 101)) sol_tx = SolLegacyTx(name="Estimate", ix_list=tuple(ix_list)) sol_tx.recent_block_hash = SolBlockHash.fake() diff --git a/proxy/base/rpc_server_abc.py b/proxy/base/rpc_server_abc.py new file mode 100644 index 00000000..42bdd6bc --- /dev/null +++ b/proxy/base/rpc_server_abc.py @@ -0,0 +1,302 @@ +from __future__ import annotations + +import abc +import asyncio +import hashlib +import logging +from typing import Callable, ClassVar + +from typing_extensions import Self + +from common.config.config import Config +from common.config.utils import LogMsgFilter +from common.ethereum.errors import EthError +from common.ethereum.hash import EthAddress +from common.http.errors import HttpRouteError +from common.http.utils import HttpRequestCtx +from common.jsonrpc.api import JsonRpcListRequest, JsonRpcListResp, JsonRpcRequest, JsonRpcResp +from common.jsonrpc.server import JsonRpcApi, JsonRpcServer +from common.neon.neon_program import NeonProg +from common.neon_rpc.api import EvmConfigModel +from common.neon_rpc.client import CoreApiClient +from common.solana_rpc.client import SolClient +from common.stat.api import RpcCallData +from common.utils.cached import ttl_cached_method, cached_property +from common.utils.json_logger import logging_context, log_msg +from common.utils.process_pool import ProcessPool +from indexer.db.indexer_db_client import IndexerDbClient +from .mp_api import MpGasPriceModel, MpTokenGasPriceModel +from ..base.mp_client import MempoolClient +from ..stat.client import StatClient + +_LOG = logging.getLogger(__name__) + + +class BaseRpcServerComponent: + def __init__(self, server: BaseRpcServerAbc) -> None: + self._server = server + + @cached_property + def _cfg(self) -> Config: + return self._server._cfg # noqa + + @cached_property + def _core_api_client(self) -> CoreApiClient: + return self._server._core_api_client # noqa + + @cached_property + def _sol_client(self) -> SolClient: + return self._server._sol_client # noqa + + @cached_property + def _mp_client(self) -> MempoolClient: + return self._server._mp_client # noqa + + @cached_property + def _db(self) -> IndexerDbClient: + return self._server._db # noqa + + @cached_property + def _msg_filter(self) -> LogMsgFilter: + return self._server._msg_filter # noqa + + def _is_default_chain_id(self, ctx: HttpRequestCtx) -> bool: + return self._server.is_default_chain_id(ctx) + + def _get_ctx_id(self, ctx: HttpRequestCtx) -> str: + return self._server.get_ctx_id(ctx) + + def _get_chain_id(self, ctx: HttpRequestCtx) -> int: + return self._server.get_chain_id(ctx) + + async def _get_evm_cfg(self) -> EvmConfigModel: + return await self._server.get_evm_cfg() + + async def _get_token_gas_price(self, ctx: HttpRequestCtx) -> tuple[MpGasPriceModel, MpTokenGasPriceModel]: + return await self._server.get_token_gas_price(ctx) + + async def _has_fee_less_tx_permit( + self, + ctx: HttpRequestCtx, + sender: EthAddress, + contract: EthAddress, + tx_nonce: int, + tx_gas_limit: int, + ) -> bool: + return await self._server.has_fee_less_tx_permit(ctx, sender, contract, tx_nonce, tx_gas_limit) + + +class BaseRpcServerAbc(JsonRpcServer, abc.ABC): + _stat_name: ClassVar[str] = "UNKNOWN" + + class _ProcessPool(ProcessPool): + def __init__(self, server: BaseRpcServerAbc) -> None: + super().__init__() + self._server = server + + def _on_process_start(self, idx: int) -> None: + self._server._on_process_start(idx) + + def _on_process_stop(self) -> None: + self._server._on_process_stop() + self._server = None + + def __init__( + self, + cfg: Config, + core_api_client: CoreApiClient, + sol_client: SolClient, + mp_client: MempoolClient, + stat_client: StatClient, + db: IndexerDbClient, + ) -> None: + super().__init__(cfg) + self._idx = -1 + self._core_api_client = core_api_client + self._sol_client = sol_client + self._mp_client = mp_client + self._stat_client = stat_client + self._db = db + self._process_pool = self._ProcessPool(self) + + def start(self) -> None: + self._register_handler_list() + self._process_pool.start() + + def stop(self) -> None: + self._process_pool.stop() + + @staticmethod + def get_ctx_id(ctx: HttpRequestCtx) -> str: + if ctx_id := ctx.get_property_value("ctx_id", None): + return ctx_id + + size = len(ctx.request.body) + raw_value = f"{ctx.ip_addr}:{size}:{ctx.start_time_nsec}" + ctx_id = hashlib.md5(bytes(raw_value, "utf-8")).hexdigest()[:8] + ctx.set_property_value("ctx_id", ctx_id) + return ctx_id + + @staticmethod + def get_chain_id(ctx: HttpRequestCtx) -> int: + chain_id = ctx.get_property_value("chain_id", None) + assert chain_id is not None + return chain_id + + @staticmethod + def is_default_chain_id(ctx: HttpRequestCtx) -> bool: + return ctx.get_property_value("is_default_chain_id", False) + + @ttl_cached_method(ttl_sec=1) + async def get_evm_cfg(self) -> EvmConfigModel: + # forwarding request to mempool allows to limit the number of requests to Solana to maximum 1 time per second + # for details, see the mempool_server::get_evm_cfg() implementation + evm_cfg = await self._mp_client.get_evm_cfg() + NeonProg.init_prog(evm_cfg.treasury_pool_cnt, evm_cfg.treasury_pool_seed, evm_cfg.version) + return evm_cfg + + async def on_request_list(self, ctx: HttpRequestCtx, request: JsonRpcListRequest) -> None: + chain_id = await self._validate_chain_id(ctx) + with logging_context(ctx=self.get_ctx_id(ctx), chain_id=chain_id): + _LOG.info(log_msg("handle BIG request <<< {IP} size={Size}", IP=ctx.ip_addr, Size=len(request.root))) + + def on_response_list(self, ctx: HttpRequestCtx, resp: JsonRpcListResp) -> None: + with logging_context(ctx=self.get_ctx_id(ctx), chain_id=self.get_chain_id(ctx)): + msg = log_msg( + "done BIG request >>> {IP} size={Size} resp_time={TimeMS} msec", + IP=ctx.ip_addr, + Size=len(resp), + TimeMS=ctx.process_time_msec, + ) + _LOG.info(msg) + + stat = RpcCallData(service=self._stat_name, method="BIG", time_nsec=ctx.process_time_nsec, is_error=False) + self._stat_client.commit_rpc_call(stat) + + def on_bad_request(self, ctx: HttpRequestCtx) -> None: + _LOG.warning(log_msg("BAD request from {IP} with size {Size}", IP=ctx.ip_addr, Size=len(ctx.request.body))) + + stat = RpcCallData(service=self._stat_name, method="UNKNOWN", time_nsec=ctx.process_time_nsec, is_error=True) + self._stat_client.commit_rpc_call(stat) + + async def handle_request( + self, + ctx: HttpRequestCtx, + request: JsonRpcRequest, + handler: Callable, + ) -> JsonRpcResp: + chain_id = await self._validate_chain_id(ctx) + + info = dict(IP=ctx.ip_addr, ReqID=request.id, Method=request.method) + with logging_context(ctx=self.get_ctx_id(ctx), chain_id=chain_id): + _LOG.info(log_msg("handle request <<< {IP} req={ReqID} {Method} {Params}", Params=request.params, **info)) + + resp = await handler(ctx, request) + if resp.is_error: + msg = log_msg( + "error on request >>> {IP} req={ReqID} {Method} {Error} resp_time={TimeMS} msec", + Error=resp.error, + **info, + ) + else: + msg = log_msg( + "done request >>> {IP} req={ReqID} {Method} {Result} resp_time={TimeMS} msec", + Result=resp.result, + **info, + ) + _LOG.info(dict(**msg, TimeMS=ctx.process_time_msec)) + + stat = RpcCallData( + service=self._stat_name, + method=request.method, + time_nsec=ctx.process_time_nsec, + is_error=resp.is_error, + ) + self._stat_client.commit_rpc_call(stat) + + return resp + + @ttl_cached_method(ttl_sec=1) + async def get_gas_price(self) -> MpGasPriceModel: + # for details, see the mempool_server::get_gas_price() implementation + gas_price = await self._mp_client.get_gas_price() + if gas_price.is_empty: + raise EthError(message="Failed to calculate gas price. Try again later") + return gas_price + + async def get_token_gas_price(self, ctx: HttpRequestCtx) -> tuple[MpGasPriceModel, MpTokenGasPriceModel]: + gas_price = await self.get_gas_price() + chain_id = self.get_chain_id(ctx) + if not (token_price := gas_price.chain_dict.get(chain_id, None)): + raise HttpRouteError() + return gas_price, token_price + + @abc.abstractmethod + async def has_fee_less_tx_permit( + self, + ctx: HttpRequestCtx, + sender: EthAddress, + contract: EthAddress, + tx_nonce: int, + tx_gas_limit: int, + ) -> bool: + ... + + # protected: + + def _on_process_start(self, idx: int) -> None: + self._idx = idx + super().start() + + def _on_process_stop(self) -> None: + super().stop() + + async def _validate_chain_id(self, ctx: HttpRequestCtx) -> int: + NeonProg.validate_protocol() + if chain_id := ctx.get_property_value("chain_id", None): + return chain_id + return await self._set_chain_id(ctx) + + async def _set_chain_id(self, ctx: HttpRequestCtx) -> int: + evm_cfg = await self.get_evm_cfg() + if not (token_name := ctx.request.path_params.get("token", "").strip().upper()): + chain_id = evm_cfg.default_chain_id + ctx.set_property_value("is_default_chain_id", True) + elif token := evm_cfg.token_dict.get(token_name, None): + chain_id = token.chain_id + ctx.set_property_value("is_default_chain_id", token.is_default) + else: + raise HttpRouteError() + + ctx.set_property_value("chain_id", chain_id) + return chain_id + + def _add_api(self, api: JsonRpcApi) -> Self: + _LOG.info(log_msg(f"adding API {api.name}")) + + for endpoint in self._get_endpoint_list(): + _LOG.info(log_msg(f"adding API {api.name} to endpoint {endpoint}")) + super().add_api(api, endpoint=endpoint) + return self + + @classmethod + @abc.abstractmethod + def _get_endpoint_list(cls) -> list[str]: ... + + async def _on_server_start(self) -> None: + await asyncio.gather( + self._db.start(), + self._stat_client.start(), + self._mp_client.start(), + self._sol_client.start(), + self._core_api_client.start(), + ) + + async def _on_server_stop(self) -> None: + await asyncio.gather( + self._mp_client.stop(), + self._core_api_client.stop(), + self._sol_client.stop(), + self._stat_client.stop(), + self._db.stop(), + ) diff --git a/proxy/rpc/transaction_validator.py b/proxy/base/rpc_transaction_executor.py similarity index 57% rename from proxy/rpc/transaction_validator.py rename to proxy/base/rpc_transaction_executor.py index 315854c1..0bdf0d4e 100644 --- a/proxy/rpc/transaction_validator.py +++ b/proxy/base/rpc_transaction_executor.py @@ -1,22 +1,79 @@ -from __future__ import annotations +import logging -from common.ethereum.errors import EthError, EthNonceTooLowError, EthWrongChainIdError +from common.ethereum.errors import EthError, EthNonceTooLowError, EthNonceTooHighError, EthWrongChainIdError +from common.ethereum.hash import EthTxHashField, EthTxHash from common.http.utils import HttpRequestCtx +from common.jsonrpc.errors import InvalidParamError from common.neon.account import NeonAccount from common.neon.transaction_model import NeonTxModel from common.neon_rpc.api import NeonAccountModel, NeonContractModel -from .server_abc import NeonProxyComponent -from ..base.mp_api import MpTokenGasPriceModel, MpGasPriceModel +from common.utils.json_logger import logging_context +from proxy.base.mp_api import MpTxRespCode, MpTokenGasPriceModel, MpGasPriceModel +from proxy.base.rpc_server_abc import BaseRpcServerComponent +_LOG = logging.getLogger(__name__) -class NpTxValidator(NeonProxyComponent): + +class RpcNeonTxExecutor(BaseRpcServerComponent): _max_u64 = 2**64 - 1 _max_u256 = 2**256 - 1 - async def validate(self, ctx: HttpRequestCtx, neon_tx: NeonTxModel) -> NeonAccountModel: - global_price, token_price = await self.get_token_gas_price(ctx) + async def send_neon_tx(self, ctx: HttpRequestCtx, eth_tx_rlp: bytes) -> EthTxHashField: + try: + neon_tx = NeonTxModel.from_raw(eth_tx_rlp, raise_exception=True) + except EthError: + raise + except (BaseException,): + raise InvalidParamError(message="wrong transaction format") + + tx_id = neon_tx.neon_tx_hash.ident + with logging_context(tx=tx_id): + _LOG.debug("sendEthTransaction %s: %s", neon_tx.neon_tx_hash, neon_tx) + return await self._send_neon_tx_impl(ctx, neon_tx, eth_tx_rlp) + + async def _send_neon_tx_impl(self, ctx: HttpRequestCtx, neon_tx: NeonTxModel, eth_tx_rlp: bytes) -> EthTxHashField: + try: + if await self._is_neon_tx_exist(neon_tx.neon_tx_hash): + return neon_tx.neon_tx_hash + + ctx_id = self._get_ctx_id(ctx) + sender = await self._validate(ctx, neon_tx) + chain_id = sender.chain_id + + resp = await self._mp_client.send_raw_transaction(ctx_id, eth_tx_rlp, chain_id, sender.state_tx_cnt) + + if resp.code in (MpTxRespCode.Success, MpTxRespCode.AlreadyKnown): + return neon_tx.neon_tx_hash + elif resp.code == MpTxRespCode.NonceTooLow: + EthNonceTooLowError.raise_error(neon_tx.nonce, resp.state_tx_cnt, sender=sender.address) + elif resp.code == MpTxRespCode.Underprice: + raise EthError(message="replacement transaction underpriced") + elif resp.code == MpTxRespCode.NonceTooHigh: + raise EthNonceTooHighError.raise_error(neon_tx.nonce, resp.state_tx_cnt, sender=sender.address) + elif resp.code == MpTxRespCode.UnknownChainID: + raise EthWrongChainIdError() + else: + raise EthError(message="unknown error") + + except BaseException as exc: + # raise already exists error + await self._is_neon_tx_exist(neon_tx.neon_tx_hash) + + if not isinstance(exc, EthError): + _LOG.error("unexpected error on sendRawTransaction", exc_info=exc, extra=self._msg_filter) + raise + + async def _is_neon_tx_exist(self, tx_hash: EthTxHash) -> bool: + if tx_meta := await self._db.get_tx_by_neon_tx_hash(tx_hash): + if tx_meta.neon_tx_rcpt.slot <= await self._db.get_finalized_slot(): + raise EthError(message="already known") + return True + return False + + async def _validate(self, ctx: HttpRequestCtx, neon_tx: NeonTxModel) -> NeonAccountModel: + global_price, token_price = await self._get_token_gas_price(ctx) - chain_id = self._get_chain_id(ctx, neon_tx) + chain_id = self._validate_chain_id(ctx, neon_tx) tx_gas_limit = await self._get_tx_gas_limit(neon_tx) sender = NeonAccount.from_raw(neon_tx.from_address, chain_id) @@ -33,11 +90,11 @@ async def validate(self, ctx: HttpRequestCtx, neon_tx: NeonTxModel) -> NeonAccou return neon_acct - def _get_chain_id(self, ctx: HttpRequestCtx, neon_tx: NeonTxModel) -> int: - chain_id = self.get_chain_id(ctx) + def _validate_chain_id(self, ctx: HttpRequestCtx, neon_tx: NeonTxModel) -> int: + chain_id = self._get_chain_id(ctx) tx_chain_id = neon_tx.chain_id if not tx_chain_id: - if not self.is_default_chain_id(ctx): + if not self._is_default_chain_id(ctx): raise EthWrongChainIdError() elif tx_chain_id != chain_id: raise EthWrongChainIdError() @@ -47,7 +104,7 @@ async def _get_tx_gas_limit(self, neon_tx: NeonTxModel) -> int: if neon_tx.has_chain_id or neon_tx.call_data.is_empty: return neon_tx.gas_limit - evm_cfg = await self.get_evm_cfg() + evm_cfg = await self._get_evm_cfg() tx_gas_limit = neon_tx.gas_limit * evm_cfg.gas_limit_multiplier_wo_chain_id return min(self._max_u64, tx_gas_limit) @@ -83,7 +140,7 @@ async def _prevalidate_tx_gas_price( # Fee-less transaction if not neon_tx.gas_price: - has_fee_less_permit = await self.has_fee_less_tx_permit( + has_fee_less_permit = await self._has_fee_less_tx_permit( ctx, neon_tx.from_address, neon_tx.to_address, neon_tx.nonce, neon_tx.gas_limit ) if has_fee_less_permit: diff --git a/proxy/executor/alt_destroyer.py b/proxy/executor/alt_destroyer.py index 3f3b6822..9190f2fa 100644 --- a/proxy/executor/alt_destroyer.py +++ b/proxy/executor/alt_destroyer.py @@ -34,7 +34,7 @@ class _NeonAltInfo: @cached_property def ctx_id(self) -> dict: - tx = self.neon_tx_hash.to_bytes()[:4].hex() + tx = self.neon_tx_hash.ident return dict(alt=self.sol_alt.ctx_id, tx=tx) @cached_property diff --git a/proxy/executor/ex_transaction_api.py b/proxy/executor/ex_transaction_api.py index 50e2479b..74f23d5c 100644 --- a/proxy/executor/ex_transaction_api.py +++ b/proxy/executor/ex_transaction_api.py @@ -13,7 +13,7 @@ from .transaction_executor_ctx import NeonExecTxCtx from ..base.ex_api import ExecTxRequest, ExecTxResp, ExecStuckTxRequest, ExecTxRespCode, NeonAltModel from ..base.mp_api import MpStuckTxModel -from ..base.server import BaseProxyApi +from ..base.intl_server import BaseProxyApi _LOG = logging.getLogger(__name__) diff --git a/proxy/executor/server_abc.py b/proxy/executor/server_abc.py index dc8a4f61..5ec826f6 100644 --- a/proxy/executor/server_abc.py +++ b/proxy/executor/server_abc.py @@ -11,10 +11,10 @@ from ..base.ex_api import EXECUTOR_ENDPOINT from ..base.mp_client import MempoolClient from ..base.op_client import OpResourceClient -from ..base.server import BaseProxyServer, BaseProxyComponent +from ..base.intl_server import BaseIntlProxyServer, BaseIntlProxyComponent -class ExecutorComponent(BaseProxyComponent): +class ExecutorComponent(BaseIntlProxyComponent): def __init__(self, server: ExecutorServerAbc) -> None: super().__init__(server) self._server = server @@ -33,7 +33,7 @@ def __init__(self, server: ExecutorServerAbc) -> None: ExecutorComponent.__init__(self, server) -class ExecutorServerAbc(BaseProxyServer): +class ExecutorServerAbc(BaseIntlProxyServer): def __init__( self, cfg: Config, @@ -46,9 +46,9 @@ def __init__( self._mp_client = mp_client self._op_client = op_client - def _add_api(self, api: ExecutorApi) -> Self: - return self.add_api(api, endpoint=EXECUTOR_ENDPOINT) - @ttl_cached_method(ttl_sec=1) async def get_evm_cfg(self) -> EvmConfigModel: return await self._mp_client.get_evm_cfg() + + def _add_api(self, api: ExecutorApi) -> Self: + return self.add_api(api, endpoint=EXECUTOR_ENDPOINT) diff --git a/proxy/executor/strategy_iterative_holder.py b/proxy/executor/strategy_iterative_holder.py index 676b8ed8..8ee37912 100644 --- a/proxy/executor/strategy_iterative_holder.py +++ b/proxy/executor/strategy_iterative_holder.py @@ -1,6 +1,6 @@ from typing import ClassVar -from common.neon.neon_program import NeonEvmIxCode, NeonIxMode +from common.neon.neon_program import NeonEvmIxCode from common.solana.transaction_legacy import SolLegacyTx from .strategy_base import SolTxCfg from .strategy_iterative import IterativeTxStrategy diff --git a/proxy/executor/strategy_iterative_no_chain_id.py b/proxy/executor/strategy_iterative_no_chain_id.py index 146c93b7..afc12855 100644 --- a/proxy/executor/strategy_iterative_no_chain_id.py +++ b/proxy/executor/strategy_iterative_no_chain_id.py @@ -1,6 +1,6 @@ from typing import ClassVar -from common.neon.neon_program import NeonEvmIxCode, NeonIxMode +from common.neon.neon_program import NeonEvmIxCode from common.solana.transaction_legacy import SolLegacyTx from .strategy_base import SolTxCfg from .strategy_iterative_holder import HolderTxStrategy diff --git a/proxy/executor/strategy_stage_alt.py b/proxy/executor/strategy_stage_alt.py index 7622ae2e..581ebc31 100644 --- a/proxy/executor/strategy_stage_alt.py +++ b/proxy/executor/strategy_stage_alt.py @@ -3,7 +3,7 @@ import logging from typing import Sequence, ClassVar -from common.neon.neon_program import NeonProg, NeonIxMode +from common.neon.neon_program import NeonProg from common.solana.alt_info import SolAltInfo from common.solana.alt_program import SolAltProg from common.solana.errors import SolTxSizeError, SolAltContentError diff --git a/proxy/health_check_proxy.sh b/proxy/health_check_proxy.sh index 99f17d9b..2d54293a 100755 --- a/proxy/health_check_proxy.sh +++ b/proxy/health_check_proxy.sh @@ -2,7 +2,7 @@ HAS_BLOCK=`curl --location --request POST 'http://proxy:9090/solana' \ --header 'Content-Type: application/json' \ ---data-raw '{"jsonrpc":"2.0", "method":"eth_blockNumber", "params":[], "id":1 }' 2> /dev/null | grep -cF '"result"'` +--data-raw '{"jsonrpc":"2.0", "method":"eth_blockNumber", "id":1 }' 2> /dev/null | grep -cF '"result"'` if [[ "$HAS_BLOCK" == "1" ]]; then exit 0 diff --git a/proxy/mempool/alt_loader.py b/proxy/mempool/alt_loader.py index 3dc9b522..42756337 100644 --- a/proxy/mempool/alt_loader.py +++ b/proxy/mempool/alt_loader.py @@ -87,7 +87,7 @@ async def _scan_stuck_alt(self) -> None: # skip tables from other operators continue - tx = neon_tx_hash.to_bytes()[:4].hex() + tx = neon_tx_hash.ident alt = SolAltID(address=addr, owner=owner, recent_slot=0, nonce=0) with logging_context(alt=alt.ctx_id, tx=tx): msg = log_msg( diff --git a/proxy/mempool/mp_transaction_api.py b/proxy/mempool/mp_transaction_api.py index 6debe637..c149a976 100644 --- a/proxy/mempool/mp_transaction_api.py +++ b/proxy/mempool/mp_transaction_api.py @@ -56,4 +56,4 @@ def get_tx_by_sender_nonce(self, request: MpGetTxBySenderNonceRequest) -> MpGetT @MempoolApi.method(name="getMempoolContent") async def _get_content(self, request: MpRequest) -> MpTxPoolContentResp: with logging_context(ctx=request.ctx_id): - return self._tx_executor.get_content() + return self._tx_executor.get_content(request.chain_id) diff --git a/proxy/mempool/server.py b/proxy/mempool/server.py index 971293b2..c9140ad0 100644 --- a/proxy/mempool/server.py +++ b/proxy/mempool/server.py @@ -2,7 +2,6 @@ import asyncio -from common.http.server import HttpSocket from .alt_loader import SolAltLoader from .gas_price_calculator import MpGasPriceCalculator from .mp_evm_config_api import MpEvmCfgApi diff --git a/proxy/mempool/server_abc.py b/proxy/mempool/server_abc.py index 0d5bd111..be1f1b5d 100644 --- a/proxy/mempool/server_abc.py +++ b/proxy/mempool/server_abc.py @@ -15,11 +15,11 @@ from ..base.ex_client import ExecutorClient from ..base.mp_api import MpGasPriceModel, MP_ENDPOINT from ..base.op_client import OpResourceClient -from ..base.server import BaseProxyServer, BaseProxyComponent +from ..base.intl_server import BaseIntlProxyServer, BaseIntlProxyComponent from ..stat.client import StatClient -class MempoolComponent(BaseProxyComponent): +class MempoolComponent(BaseIntlProxyComponent): def __init__(self, server: MempoolServerAbc) -> None: super().__init__(server) self._server = server @@ -47,7 +47,7 @@ def __init__(self, server: MempoolServerAbc) -> None: MempoolComponent.__init__(self, server) -class MempoolServerAbc(BaseProxyServer, abc.ABC): +class MempoolServerAbc(BaseIntlProxyServer, abc.ABC): def __init__( self, cfg: Config, @@ -64,24 +64,6 @@ def __init__( self._stat_client = stat_client self._db = db - async def _on_server_start(self) -> None: - await asyncio.gather( - super()._on_server_start(), - self._db.start(), - self._op_client.start(), - self._exec_client.start(), - self._stat_client.start(), - ) - - async def _on_server_stop(self) -> None: - await asyncio.gather( - super()._on_server_stop(), - self._db.stop(), - self._exec_client.stop(), - self._op_client.stop(), - self._stat_client.stop(), - ) - @ttl_cached_method(ttl_sec=1) async def get_evm_cfg(self) -> EvmConfigModel: # Finally, this method can be called from 2 places: @@ -104,3 +86,21 @@ def get_gas_price(self) -> MpGasPriceModel: ... def _add_api(self, api: MempoolApi) -> Self: return self.add_api(api, endpoint=MP_ENDPOINT) + + async def _on_server_start(self) -> None: + await asyncio.gather( + super()._on_server_start(), + self._db.start(), + self._op_client.start(), + self._exec_client.start(), + self._stat_client.start(), + ) + + async def _on_server_stop(self) -> None: + await asyncio.gather( + super()._on_server_stop(), + self._db.stop(), + self._exec_client.stop(), + self._op_client.stop(), + self._stat_client.stop(), + ) diff --git a/proxy/mempool/transaction_executor.py b/proxy/mempool/transaction_executor.py index 730ffbb1..24a9cf74 100644 --- a/proxy/mempool/transaction_executor.py +++ b/proxy/mempool/transaction_executor.py @@ -96,14 +96,14 @@ def get_tx_by_hash(self, neon_tx_hash: EthTxHash) -> NeonTxModel | None: def get_tx_by_sender_nonce(self, sender: NeonAccount, tx_nonce: int) -> NeonTxModel | None: return self._tx_dict.get_tx_by_sender_nonce(sender, tx_nonce) - def get_content(self) -> MpTxPoolContentResp: + def get_content(self, chain_id: int) -> MpTxPoolContentResp: pending_list = list() queued_list = list() - for tx_schedule in self._tx_schedule_dict.values(): + if tx_schedule := self._tx_schedule_dict.get(chain_id): cont = tx_schedule.get_content() pending_list.extend(cont.pending_list) queued_list.extend(cont.queued_list) - return MpTxPoolContentResp(pending_list=tuple(pending_list), queued_list=tuple(queued_list)) + return MpTxPoolContentResp(pending_list=pending_list, queued_list=queued_list) async def _update_tx_order(self, tx: MpTxModel) -> MpTxResp | None: if not tx.neon_tx.has_chain_id: diff --git a/proxy/mempool/transaction_schedule.py b/proxy/mempool/transaction_schedule.py index 625caa21..2f947d62 100644 --- a/proxy/mempool/transaction_schedule.py +++ b/proxy/mempool/transaction_schedule.py @@ -569,7 +569,7 @@ def get_content(self) -> MpTxPoolContentResp: pending_list.extend(tx_list[:pending_stop_pos]) queued_list.extend(tx_list[pending_stop_pos:]) - return MpTxPoolContentResp(pending_list=tuple(pending_list), queued_list=tuple(queued_list)) + return MpTxPoolContentResp(pending_list=pending_list, queued_list=queued_list) # protected: diff --git a/proxy/neon_proxy_app.py b/proxy/neon_proxy_app.py index 4d9d52bf..411cd633 100644 --- a/proxy/neon_proxy_app.py +++ b/proxy/neon_proxy_app.py @@ -18,6 +18,7 @@ from .executor.server import ExecutorServer from .mempool.server import MempoolServer from .operator_resource.server import OpResourceServer +from .private_rpc.server import PrivateRpcServer from .rpc.server import NeonProxy from .stat.client import StatClient from .stat.server import StatServer @@ -29,12 +30,11 @@ class NeonProxyApp: def __init__(self): Logger.setup() cfg = Config() + self._msg_filter = LogMsgFilter(cfg) _LOG.info("running NeonProxy %s with the cfg: %s", NEON_PROXY_VER, cfg.to_string()) self._recv_sig_num = signal.SIG_DFL - self._msg_filter = LogMsgFilter(cfg) - # Init Solana client sol_client = SolClient(cfg) @@ -85,6 +85,20 @@ def __init__(self): # Init Prometheus stat self._stat_server = StatServer(cfg=cfg) + # Init private RPC API + self._enable_private_rpc_server = cfg.enable_private_api + + if self._enable_private_rpc_server: + self._private_rpc_server = PrivateRpcServer( + cfg=cfg, + core_api_client=core_api_client, + sol_client=sol_client, + mp_client=mp_client, + stat_client=stat_client, + op_client=op_client, + db=db, + ) + # Init external RPC API self._proxy_server = NeonProxy( cfg=cfg, @@ -105,10 +119,16 @@ def start(self) -> int: self._stat_server.start() self._proxy_server.start() - self._register_term_signal_handler() + if self._enable_private_rpc_server: + self._private_rpc_server.start() + + self._register_term_sig_handler() while self._recv_sig_num == signal.SIG_DFL: time.sleep(1) + if self._enable_private_rpc_server: + self._private_rpc_server.stop() + self._proxy_server.stop() self._stat_server.stop() self._mp_server.stop() @@ -121,11 +141,11 @@ def start(self) -> int: _LOG.error("error on NeonProxy run", exc_info=exc, extra=self._msg_filter) return 1 - def _register_term_signal_handler(self) -> None: - def _signal_handler(_sig: int, _frame) -> None: + def _register_term_sig_handler(self) -> None: + def _sig_handler(_sig: int, _frame) -> None: if self._recv_sig_num == signal.SIG_DFL: self._recv_sig_num = _sig for sig in (signal.SIGINT, signal.SIGTERM): _LOG.info("register signal handler %d", sig) - signal.signal(sig, _signal_handler) + signal.signal(sig, _sig_handler) diff --git a/proxy/operator_resource/key_info.py b/proxy/operator_resource/key_info.py index d197171b..eb1accee 100644 --- a/proxy/operator_resource/key_info.py +++ b/proxy/operator_resource/key_info.py @@ -6,6 +6,7 @@ from typing_extensions import Self from common.ethereum.hash import EthAddress +from common.neon.account import NeonAccount from common.neon.neon_program import NeonProg from common.solana.pubkey import SolPubKey from common.solana.signer import SolSigner @@ -37,7 +38,7 @@ def _neon_account_with_seed(base_address: SolPubKey, seed: str) -> SolPubKey: @dataclass class OpSignerInfo: signer: SolSigner - eth_address: EthAddress + neon_account: NeonAccount token_sol_address_dict: dict[int, SolPubKey] free_holder_list: deque[OpHolderInfo] @@ -51,6 +52,10 @@ class OpSignerInfo: def owner(self) -> SolPubKey: return self.signer.pubkey + @property + def eth_address(self) -> EthAddress: + return self.neon_account.eth_address + def pop_free_holder_list(self) -> deque[OpHolderInfo]: holder_list, self.free_holder_list = self.free_holder_list, deque() return holder_list diff --git a/proxy/operator_resource/op_eth_sign_api.py b/proxy/operator_resource/op_eth_sign_api.py new file mode 100644 index 00000000..89903843 --- /dev/null +++ b/proxy/operator_resource/op_eth_sign_api.py @@ -0,0 +1,55 @@ +import logging +from typing import ClassVar + +from common.ethereum.hash import EthAddress +from common.neon.account import NeonAccount +from common.utils.cached import cached_property +from common.utils.json_logger import logging_context +from .resource_manager import OpResourceMng +from .server_abc import OpResourceApi +from ..base.op_api import OpSignEthMsgRequest, OpSignEthMsgResp, OpSignEthTxRequest, OpSignEthTxResp + +_LOG = logging.getLogger(__name__) + + +class OpEthSignApi(OpResourceApi): + name: ClassVar[str] = "OpResource::EthSign" + + @OpResourceApi.method(name="signEthMessage") + async def sign_eth_message(self, request: OpSignEthMsgRequest) -> OpSignEthMsgResp: + try: + with logging_context(**request.req_id): + if not (neon_account := await self._get_neon_account(request.sender, 0)): + return OpSignEthMsgResp(signed_msg=bytes(), error=f"Unknown sender {request.sender}") + + signed_msg = neon_account.sign_msg(request.data.to_bytes()) + return OpSignEthMsgResp(signed_msg=signed_msg.to_bytes()) + except Exception as exc: + _LOG.error("signing message failed", extra=self._msg_filter, exc_info=exc) + return OpSignEthMsgResp(signed_msg=bytes(), error="Error signing message") + + @OpResourceApi.method(name="signEthTransaction") + async def sign_eth_tx(self, request: OpSignEthTxRequest) -> OpSignEthTxResp: + try: + with logging_context(**request.req_id): + sender = request.neon_tx.from_address + if not (neon_account := await self._get_neon_account(sender, request.chain_id)): + return OpSignEthTxResp(signed_tx=bytes(), error=f"Unknown sender {sender}") + + signed_tx = neon_account.sign_tx(request.neon_tx) + return OpSignEthTxResp(signed_tx=signed_tx) + except Exception as exc: + _LOG.error("signing transaction failed", extra=self._msg_filter, exc_info=exc) + return OpSignEthTxResp(signed_tx=bytes(), error="Error signing transaction") + + @cached_property + def _op_resource_mng(self) -> OpResourceMng: + return self._server._op_resource_mng # noqa + + async def _get_neon_account(self, eth_address: EthAddress, chain_id: int) -> NeonAccount | None: + if not (op_signer := self._op_resource_mng.get_signer_by_eth_address(eth_address)): + return None + + if op_signer.neon_account.chain_id != chain_id: + return NeonAccount.from_private_key(op_signer.neon_account.private_key, chain_id) + return op_signer.neon_account diff --git a/proxy/operator_resource/op_signer_key_api.py b/proxy/operator_resource/op_signer_key_api.py deleted file mode 100644 index c8611220..00000000 --- a/proxy/operator_resource/op_signer_key_api.py +++ /dev/null @@ -1,20 +0,0 @@ -from typing import ClassVar - -from common.utils.cached import cached_property -from common.utils.json_logger import logging_context -from .resource_manager import OpResourceMng -from .server_abc import OpResourceApi -from ..base.op_api import OpSignerKeyListResp, OpGetSignerKeyListRequest - - -class OpSignerKeyApi(OpResourceApi): - name: ClassVar[str] = "OpResource::SignerKey" - - @OpResourceApi.method(name="getSignerKeyList") - async def get_signer_key_list(self, request: OpGetSignerKeyListRequest) -> OpSignerKeyListResp: - with logging_context(**request.req_id): - return OpSignerKeyListResp(signer_key_list=list(self._op_resource_mng.get_signer_key_list())) - - @cached_property - def _op_resource_mng(self) -> OpResourceMng: - return self._server._op_resource_mng # noqa diff --git a/proxy/operator_resource/op_sign_transaction_api.py b/proxy/operator_resource/op_sol_sign_api.py similarity index 62% rename from proxy/operator_resource/op_sign_transaction_api.py rename to proxy/operator_resource/op_sol_sign_api.py index bbbb371b..12691397 100644 --- a/proxy/operator_resource/op_sign_transaction_api.py +++ b/proxy/operator_resource/op_sol_sign_api.py @@ -5,11 +5,16 @@ from common.utils.json_logger import logging_context from .resource_manager import OpResourceMng from .server_abc import OpResourceApi -from ..base.op_api import OpSignSolTxListRequest, OpSolTxListResp +from ..base.op_api import OpSignerKeyListResp, OpGetSignerKeyListRequest, OpSignSolTxListRequest, OpSolTxListResp -class OpSignTxApi(OpResourceApi): - name: ClassVar[str] = "OpResource::SignTransaction" +class OpSolSignApi(OpResourceApi): + name: ClassVar[str] = "OpResource::SolSign" + + @OpResourceApi.method(name="getSignerKeyList") + async def get_signer_key_list(self, request: OpGetSignerKeyListRequest) -> OpSignerKeyListResp: + with logging_context(**request.req_id): + return OpSignerKeyListResp(signer_key_list=list(self._op_resource_mng.get_signer_key_list())) @OpResourceApi.method(name="signSolanaTransactionList") async def sign_sol_tx_list(self, request: OpSignSolTxListRequest) -> OpSolTxListResp: diff --git a/proxy/operator_resource/resource_manager.py b/proxy/operator_resource/resource_manager.py index b842c945..2d47e3b9 100644 --- a/proxy/operator_resource/resource_manager.py +++ b/proxy/operator_resource/resource_manager.py @@ -19,7 +19,7 @@ from common.solana.transaction_legacy import SolLegacyTx from common.solana_rpc.transaction_list_sender import SolTxListSender from common.solana_rpc.ws_client import SolWatchTxSession -from common.utils.cached import cached_property, reset_cached_method +from common.utils.cached import cached_property from common.utils.json_logger import log_msg, logging_context from .key_info import OpSignerInfo, OpHolderInfo from .server_abc import OpResourceComponent @@ -217,6 +217,12 @@ def get_eth_address_list(self) -> tuple[OpEthAddressModel, ...]: ) return tuple(generator) + def get_signer_by_eth_address(self, eth_address: EthAddress) -> OpSignerInfo | None: + for op_signer in self._active_signer_dict.values(): + if op_signer.eth_address == eth_address: + return op_signer + return None + async def withdraw(self) -> None: for op_signer in self._active_signer_dict.values(): ix_list: list[SolTxIx] = list() @@ -303,7 +309,7 @@ def _init_op_signer(self, signer: SolSigner) -> OpSignerInfo: return OpSignerInfo( signer=signer, - eth_address=NeonAccount.from_private_key(signer.secret, 0).eth_address, + neon_account=NeonAccount.from_private_key(signer.secret, 0), token_sol_address_dict=dict(), free_holder_list=deque(), used_holder_dict=dict(), diff --git a/proxy/operator_resource/server.py b/proxy/operator_resource/server.py index 16560957..2298fc78 100644 --- a/proxy/operator_resource/server.py +++ b/proxy/operator_resource/server.py @@ -3,8 +3,8 @@ from common.solana.signer import SolSigner from .op_acquire_resource_api import OpAcquireResourceApi from .op_balance_api import OpBalanceApi -from .op_sign_transaction_api import OpSignTxApi -from .op_signer_key_api import OpSignerKeyApi +from .op_eth_sign_api import OpEthSignApi +from .op_sol_sign_api import OpSolSignApi from .resource_manager import OpResourceMng from .secret_manager import OpSecretMng from .server_abc import OpResourceServerAbc @@ -20,10 +20,13 @@ def __init__(self, *args, **kwargs) -> None: self._op_resource_mng = OpResourceMng(self) self._add_api(OpAcquireResourceApi(self)) - self._add_api(OpSignTxApi(self)) - self._add_api(OpSignerKeyApi(self)) + self._add_api(OpEthSignApi(self)) + self._add_api(OpSolSignApi(self)) self._add_api(OpBalanceApi(self)) + async def get_signer_list(self) -> tuple[SolSigner, ...]: + return await self._op_secret_mng.get_signer_list() + async def _on_server_start(self) -> None: await asyncio.gather( super()._on_server_start(), @@ -37,6 +40,3 @@ async def _on_server_stop(self) -> None: self._op_secret_mng.stop(), self._op_resource_mng.stop(), ) - - async def get_signer_list(self) -> tuple[SolSigner, ...]: - return await self._op_secret_mng.get_signer_list() diff --git a/proxy/operator_resource/server_abc.py b/proxy/operator_resource/server_abc.py index 4899faef..1dfe76e4 100644 --- a/proxy/operator_resource/server_abc.py +++ b/proxy/operator_resource/server_abc.py @@ -14,11 +14,11 @@ from common.utils.cached import cached_property from ..base.mp_client import MempoolClient from ..base.op_api import OP_RESOURCE_ENDPOINT -from ..base.server import BaseProxyServer, BaseProxyComponent +from ..base.intl_server import BaseIntlProxyServer, BaseIntlProxyComponent from ..stat.client import StatClient -class OpResourceComponent(BaseProxyComponent): +class OpResourceComponent(BaseIntlProxyComponent): def __init__(self, server: OpResourceServerAbc) -> None: super().__init__(server) self._server = server @@ -34,7 +34,7 @@ def __init__(self, server: OpResourceServerAbc) -> None: OpResourceComponent.__init__(self, server) -class OpResourceServerAbc(BaseProxyServer, abc.ABC): +class OpResourceServerAbc(BaseIntlProxyServer, abc.ABC): def __init__( self, cfg: Config, @@ -47,9 +47,6 @@ def __init__( self._mp_client = mp_client self._stat_client = stat_client - def _add_api(self, api: OpResourceApi) -> Self: - return self.add_api(api, endpoint=OP_RESOURCE_ENDPOINT) - @abc.abstractmethod async def get_signer_list(self) -> tuple[SolSigner, ...]: ... @@ -58,6 +55,9 @@ async def get_evm_cfg(self) -> EvmConfigModel: NeonProg.init_prog(evm_cfg.treasury_pool_cnt, evm_cfg.treasury_pool_seed, evm_cfg.version) return evm_cfg + def _add_api(self, api: OpResourceApi) -> Self: + return self.add_api(api, endpoint=OP_RESOURCE_ENDPOINT) + async def _on_server_start(self) -> None: await super()._on_server_start() await self._stat_client.start() diff --git a/proxy/private_rpc/__init__.py b/proxy/private_rpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/proxy/private_rpc/pr_eth_account_api.py b/proxy/private_rpc/pr_eth_account_api.py new file mode 100644 index 00000000..8eec96f5 --- /dev/null +++ b/proxy/private_rpc/pr_eth_account_api.py @@ -0,0 +1,14 @@ +from typing import ClassVar + +from common.ethereum.hash import EthAddressField +from common.http.utils import HttpRequestCtx +from .server_abc import PrivateRpcApi + + +class PrEthAccountApi(PrivateRpcApi): + name: ClassVar[str] = "PrivateRpc::EthAccount" + + @PrivateRpcApi.method(name="eth_accounts") + async def eth_accounts(self, ctx: HttpRequestCtx) -> list[EthAddressField]: + eth_address_list = await self._op_client.get_eth_address_list(dict(ctx=self._get_ctx_id(ctx))) + return [a.eth_address for a in eth_address_list] diff --git a/proxy/private_rpc/pr_eth_tx_api.py b/proxy/private_rpc/pr_eth_tx_api.py new file mode 100644 index 00000000..c6bc23fb --- /dev/null +++ b/proxy/private_rpc/pr_eth_tx_api.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from typing import ClassVar + +from common.ethereum.bin_str import EthBinStrField +from common.ethereum.errors import EthError, EthWrongChainIdError +from common.ethereum.hash import EthAddressField, EthTxHashField +from common.http.utils import HttpRequestCtx +from common.jsonrpc.api import BaseJsonRpcModel +from common.jsonrpc.errors import InvalidParamError +from common.neon.account import NeonAccount +from common.neon.transaction_model import NeonTxModel +from common.utils.cached import cached_property +from common.utils.format import hex_to_bytes +from .server_abc import PrivateRpcApi +from ..base.rpc_api import RpcEthTxRequest, RpcEthTxResp +from ..base.rpc_gas_limit_calculator import RpcNeonGasLimitCalculator +from ..base.rpc_transaction_executor import RpcNeonTxExecutor + + +class _RpcSignEthTxResp(BaseJsonRpcModel): + tx: RpcEthTxResp + raw: EthBinStrField + + +class PrEthTxApi(PrivateRpcApi): + name: ClassVar[str] = "PrivateRpc::Transaction" + + @cached_property + def _gas_calculator(self) -> RpcNeonGasLimitCalculator: + return RpcNeonGasLimitCalculator(self._server) + + @cached_property + def _tx_executor(self) -> RpcNeonTxExecutor: + return RpcNeonTxExecutor(self._server) + + @PrivateRpcApi.method(name="eth_sendTransaction") + async def eth_send_tx(self, ctx: HttpRequestCtx, tx: RpcEthTxRequest) -> EthTxHashField: + signed_tx = await self._eth_sign_tx(ctx, tx) + return await self._tx_executor.send_neon_tx(ctx, signed_tx) + + @PrivateRpcApi.method(name="eth_signTransaction") + async def eth_sign_tx(self, ctx: HttpRequestCtx, tx: RpcEthTxRequest) -> _RpcSignEthTxResp: + signed_tx = await self._eth_sign_tx(ctx, tx) + neon_tx = NeonTxModel.from_raw(signed_tx) + return _RpcSignEthTxResp(tx=RpcEthTxResp.from_raw(neon_tx), raw=signed_tx) + + @PrivateRpcApi.method(name="eth_sign") + async def eth_sign(self, ctx: HttpRequestCtx, eth_address: EthAddressField, data: EthBinStrField) -> EthBinStrField: + data = hex_to_bytes(data) + msg = str.encode(f"\x19Ethereum Signed Message:\n{len(data)}") + data + + resp = await self._op_client.sign_eth_msg(dict(ctx_id=self._get_ctx_id(ctx)), eth_address, msg) + if resp.error: + raise EthError(message=resp.error) + + return resp.signed_msg + + async def _eth_sign_tx(self, ctx: HttpRequestCtx, tx: RpcEthTxRequest) -> bytes: + chain_id = self._get_chain_id(ctx) + if tx.chainId and tx.chainId != chain_id: + raise EthWrongChainIdError() + elif tx.fromAddress.is_empty: + raise InvalidParamError(message='no sender in transaction') + + sender_acct = NeonAccount.from_raw(tx.fromAddress, chain_id) + neon_tx = tx.to_neon_tx() + + if not neon_tx.gas_limit: + emul_call = tx.to_emulation_call(chain_id) + gas_limit = await self._gas_calculator.estimate(emul_call, dict()) + object.__setattr__(neon_tx, "gas_limit", gas_limit) + + if not neon_tx.nonce: + nonce = await self._core_api_client.get_state_tx_cnt(sender_acct) + object.__setattr__(neon_tx, "nonce", nonce) + + ctx_id = self._get_ctx_id(ctx) + resp = await self._op_client.sign_eth_tx(dict(ctx=ctx_id), neon_tx, chain_id) + if resp.error: + raise EthError(message=resp.error) + return resp.signed_tx.to_bytes() diff --git a/proxy/private_rpc/pr_mempool_api.py b/proxy/private_rpc/pr_mempool_api.py new file mode 100644 index 00000000..20547e28 --- /dev/null +++ b/proxy/private_rpc/pr_mempool_api.py @@ -0,0 +1,46 @@ +from typing import ClassVar + +from common.ethereum.hash import EthAddressField +from common.http.utils import HttpRequestCtx +from common.jsonrpc.api import BaseJsonRpcModel +from common.neon.transaction_model import NeonTxModel +from .server_abc import PrivateRpcApi +from ..base.rpc_api import RpcEthTxResp + + +class _RpcTxPoolResp(BaseJsonRpcModel): + pending: dict[EthAddressField, dict[int, RpcEthTxResp]] + queued: dict[EthAddressField, dict[int, RpcEthTxResp]] + + +class PrMempoolApi(PrivateRpcApi): + name: ClassVar[str] = "PrivateRpc::Mempool" + + @PrivateRpcApi.method(name="txpool_content") + async def txpool_content(self, ctx: HttpRequestCtx) -> _RpcTxPoolResp: + ctx_id = self._get_ctx_id(ctx) + chain_id = self._get_chain_id(ctx) + txpool_content = await self._mp_client.get_content(ctx_id, chain_id) + return _RpcTxPoolResp( + pending=self._get_queue(txpool_content.pending_list), + queued=self._get_queue(txpool_content.queued_list), + ) + + @staticmethod + def _get_queue(tx_list: list[NeonTxModel]) -> dict[EthAddressField, dict[int, RpcEthTxResp]]: + sender_addr = EthAddressField.default() + sender_pool: dict[int, RpcEthTxResp] = dict() + sender_pool_dict: dict[EthAddressField, dict[int, RpcEthTxResp]] = dict() + for tx in tx_list: + if sender_addr != tx.from_address: + if sender_pool: + sender_pool_dict[sender_addr] = sender_pool + sender_pool = dict() + sender_addr = tx.from_address + + sender_pool[tx.nonce] = RpcEthTxResp.from_raw(tx) + + if not sender_addr.is_empty: + sender_pool_dict[sender_addr] = sender_pool + + return sender_pool_dict diff --git a/proxy/private_rpc/server.py b/proxy/private_rpc/server.py new file mode 100644 index 00000000..fcd34869 --- /dev/null +++ b/proxy/private_rpc/server.py @@ -0,0 +1,24 @@ +from typing import ClassVar + +from .pr_eth_account_api import PrEthAccountApi +from .pr_eth_tx_api import PrEthTxApi +from .pr_mempool_api import PrMempoolApi +from .server_abc import PrivateRpcServerAbc + +_ENDPOINT_LIST = ["/", "/:token"] + + +class PrivateRpcServer(PrivateRpcServerAbc): + _stat_name: ClassVar[str] = "PrivateRpc" + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.listen(host=self._cfg.rpc_private_ip, port=self._cfg.rpc_private_port) + + self._add_api(PrEthAccountApi(self)) + self._add_api(PrEthTxApi(self)) + self._add_api(PrMempoolApi(self)) + + @classmethod + def _get_endpoint_list(cls) -> list[str]: + return _ENDPOINT_LIST diff --git a/proxy/private_rpc/server_abc.py b/proxy/private_rpc/server_abc.py new file mode 100644 index 00000000..118919c7 --- /dev/null +++ b/proxy/private_rpc/server_abc.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import abc +import logging + +from common.config.config import Config +from common.ethereum.hash import EthAddress +from common.http.utils import HttpRequestCtx +from common.jsonrpc.server import JsonRpcApi +from common.neon_rpc.client import CoreApiClient +from common.solana_rpc.client import SolClient +from common.utils.cached import cached_property +from indexer.db.indexer_db_client import IndexerDbClient +from ..base.mp_client import MempoolClient +from ..base.op_client import OpResourceClient +from ..base.rpc_server_abc import BaseRpcServerComponent, BaseRpcServerAbc +from ..stat.client import StatClient + +_LOG = logging.getLogger(__name__) + + +class PrivateRpcComponent(BaseRpcServerComponent): + def __init__(self, server: PrivateRpcServerAbc) -> None: + super().__init__(server) + self._server = server + + @cached_property + def _op_client(self) -> OpResourceClient: + return self._server._op_client # noqa + + +class PrivateRpcApi(PrivateRpcComponent, JsonRpcApi): + def __init__(self, server: PrivateRpcServerAbc) -> None: + JsonRpcApi.__init__(self) + PrivateRpcComponent.__init__(self, server) + + +class PrivateRpcServerAbc(BaseRpcServerAbc, abc.ABC): + def __init__( + self, + cfg: Config, + core_api_client: CoreApiClient, + sol_client: SolClient, + mp_client: MempoolClient, + stat_client: StatClient, + op_client: OpResourceClient, + db: IndexerDbClient, + ) -> None: + super().__init__(cfg, core_api_client, sol_client, mp_client, stat_client, db) + self._op_client = op_client + + async def has_fee_less_tx_permit( + self, + ctx: HttpRequestCtx, + sender: EthAddress, + contract: EthAddress, + tx_nonce: int, + tx_gas_limit: int, + ) -> bool: + return True + + async def _on_server_start(self) -> None: + try: + await super()._on_server_start() + except BaseException as exc: + _LOG.error("error on start private RPC", exc_info=exc, extra=self._msg_filter) + + async def _on_server_stop(self) -> None: + try: + await super()._on_server_stop() + except BaseException as exc: + _LOG.error("error on stop private RPC", exc_info=exc, extra=self._msg_filter) diff --git a/proxy/rpc/api.py b/proxy/rpc/api.py index c8817557..319a7f75 100644 --- a/proxy/rpc/api.py +++ b/proxy/rpc/api.py @@ -2,22 +2,20 @@ from typing import ClassVar -from pydantic import Field, AliasChoices +from pydantic import Field from typing_extensions import Self -from common.ethereum.bin_str import EthBinStrField, EthBinStr +from common.ethereum.bin_str import EthBinStrField from common.ethereum.commit_level import EthCommitField, EthCommit from common.ethereum.hash import ( EthBlockHashField, EthBlockHash, EthAddressField, EthHash32Field, - EthAddress, EthTxHashField, ) from common.jsonrpc.api import BaseJsonRpcModel from common.neon.evm_log_decoder import NeonTxEventModel -from common.neon_rpc.api import EmulNeonCallModel from common.solana.account import SolAccountModel from common.solana.pubkey import SolPubKeyField from common.solana.signature import SolTxSigField @@ -95,60 +93,6 @@ def model_post_init(self, _ctx) -> None: raise ValueError(f"{type(self).__name__} can't be null") -class RpcAccessItemModel(BaseJsonRpcModel): - address: EthAddressField - storageKeys: list[EthHash32Field] - - -class RpcCallRequest(BaseJsonRpcModel): - type: HexUIntField = Field(default=0) - fromAddress: EthAddressField = Field( - default=EthAddress.default(), - validation_alias=AliasChoices("from", "fromAddress"), - ) - toAddress: EthAddressField = Field( - default=EthAddress.default(), - validation_alias=AliasChoices("to", "toAddress"), - ) - data: EthBinStrField = Field( - default=EthBinStr.default(), - validation_alias=AliasChoices("data", "input"), - ) - value: HexUIntField = Field(default=0) - nonce: HexUIntField = Field(default=0) - - gas: HexUIntField = Field(default=2**64) - gasPrice: HexUIntField = Field(default=2**64) - maxFeePerGas: HexUIntField = Field(default=2**64) - maxPriorityFeePerGas: HexUIntField = Field(default=2**64) - - accessList: list[RpcAccessItemModel] = Field(default_factory=list) - chainId: HexUIntField = Field(default=0) - - _default: ClassVar[RpcCallRequest | None] = None - - @classmethod - def default(cls) -> Self: - if not cls._default: - cls._default = cls( - fromAddress=EthAddress.default(), - toAddress=EthAddress.default(), - data=EthBinStr.default(), - ) - return cls._default - - def to_emulation_call(self, chain_id: int) -> EmulNeonCallModel: - return EmulNeonCallModel( - from_address=self.fromAddress, - to_address=self.toAddress, - value=self.value, - data=self.data.to_bytes(), - gas_limit=self.gas, - gas_price=self.gasPrice, - chain_id=chain_id - ) - - class RpcNeonCallRequest(BaseJsonRpcModel): sol_account_dict: dict[SolPubKeyField, SolAccountModel] = Field( default_factory=dict, @@ -173,7 +117,6 @@ class RpcEthTxEventModel(BaseJsonRpcModel): transactionHash: EthTxHashField transactionIndex: HexUIntField logIndex: HexUIntField | None - # transactionLogIndex: HexUIntField | None removed: bool = False @@ -192,7 +135,6 @@ def _to_dict(event: NeonTxEventModel) -> dict: transactionHash=event.neon_tx_hash, transactionIndex=event.neon_tx_idx, logIndex=event.block_log_idx, - # transactionLogIndex=event.neon_tx_log_idx, ) diff --git a/proxy/rpc/np_account_api.py b/proxy/rpc/np_account_api.py index 259bf867..412c0d5e 100644 --- a/proxy/rpc/np_account_api.py +++ b/proxy/rpc/np_account_api.py @@ -55,12 +55,12 @@ async def get_tx_cnt( block_tag: RpcBlockRequest, ) -> HexUIntField: block = await self.get_block_by_tag(block_tag) - chain_id = self.get_chain_id(ctx) + chain_id = self._get_chain_id(ctx) acct = NeonAccount.from_raw(address, chain_id) mp_tx_nonce: int | None = None if block.commit == EthCommit.Pending: - mp_tx_nonce = await self._mp_client.get_pending_tx_cnt(self.get_ctx_id(ctx), acct) + mp_tx_nonce = await self._mp_client.get_pending_tx_cnt(self._get_ctx_id(ctx), acct) _LOG.debug("pending tx count for %s is %s", acct, mp_tx_nonce) tx_cnt = await self._core_api_client.get_state_tx_cnt(acct, block) @@ -73,13 +73,13 @@ async def get_balance( address: EthNotNoneAddressField, block_tag: RpcBlockRequest = RpcBlockRequest.latest(), ) -> HexUIntField: - chain_id = self.get_chain_id(ctx) + chain_id = self._get_chain_id(ctx) block = await self.get_block_by_tag(block_tag) acct = await self._core_api_client.get_neon_account(NeonAccount.from_raw(address, chain_id), block) # custom case for Metamask: allow fee-less txs from accounts without balance if not acct.balance: - if await self.has_fee_less_tx_permit(ctx, address, EthAddress.default(), acct.state_tx_cnt, 0): + if await self._has_fee_less_tx_permit(ctx, address, EthAddress.default(), acct.state_tx_cnt, 0): return 1 return acct.balance @@ -92,7 +92,7 @@ async def get_code( block_tag: RpcBlockRequest, ) -> EthBinStrField: block = await self.get_block_by_tag(block_tag) - chain_id = self.get_chain_id(ctx) + chain_id = self._get_chain_id(ctx) neon_acct = NeonAccount.from_raw(address, chain_id) resp = await self._core_api_client.get_neon_contract(neon_acct, block) return resp.code @@ -115,7 +115,7 @@ async def get_neon_account( block_tag: RpcBlockRequest, ) -> _NeonRpcAccountResp: block = await self.get_block_by_tag(block_tag) - chain_id = self.get_chain_id(ctx) + chain_id = self._get_chain_id(ctx) acct = NeonAccount.from_raw(address, chain_id) resp = await self._core_api_client.get_neon_account(acct, block) diff --git a/proxy/rpc/np_block_transaction_api.py b/proxy/rpc/np_block_transaction_api.py index 640d51a0..e241c59d 100644 --- a/proxy/rpc/np_block_transaction_api.py +++ b/proxy/rpc/np_block_transaction_api.py @@ -27,13 +27,13 @@ from common.neon.neon_program import NeonEvmIxCode from common.neon.transaction_decoder import SolNeonAltTxIxModel, SolNeonTxIxMetaModel from common.neon.transaction_meta_model import NeonTxMetaModel -from common.neon.transaction_model import NeonTxModel from common.solana.commit_level import SolCommit from common.solana.pubkey import SolPubKeyField, SolPubKey from common.solana.signature import SolTxSigField, SolTxSig, SolTxSigSlotInfo from common.utils.pydantic import HexUIntField, HexUInt256Field, HexUInt8Field, Base58Field from .api import RpcBlockRequest, RpcEthTxEventModel, RpcNeonTxEventModel from .server_abc import NeonProxyApi +from ..base.rpc_api import RpcEthTxResp _LOG = logging.getLogger(__name__) @@ -57,60 +57,6 @@ def from_raw(cls, tag: str | _RpcNeonTxReceiptDetail) -> Self: _RpcNeonTxReceiptDetailField = Annotated[_RpcNeonTxReceiptDetail, PlainValidator(_RpcNeonTxReceiptDetail.from_raw)] -class _RpcTxResp(BaseJsonRpcModel): - blockHash: EthBlockHashField | None - blockNumber: HexUIntField | None - transactionIndex: HexUIntField | None - txHash: EthTxHashField = Field(serialization_alias="hash") - txType: HexUIntField = Field(serialization_alias="type") - fromAddress: EthAddressField = Field(serialization_alias="from") - nonce: HexUIntField - gasPrice: HexUIntField - gas: HexUIntField - toAddress: EthAddressField = Field(serialization_alias="to") - value: HexUIntField - data: EthBinStrField = Field(serialization_alias="input") - chainId: HexUIntField | None - v: HexUIntField - r: HexUIntField - s: HexUIntField - - @classmethod - def from_raw(cls, meta: NeonTxMetaModel | NeonTxModel) -> Self: - if isinstance(meta, NeonTxMetaModel): - tx = meta.neon_tx - - rcpt = meta.neon_tx_rcpt - blockhash = rcpt.block_hash - slot = rcpt.slot - tx_idx = rcpt.neon_tx_idx - else: - tx = meta - - blockhash = None - slot = None - tx_idx = None - - return cls( - blockHash=blockhash, - blockNumber=slot, - transactionIndex=tx_idx, - txHash=tx.neon_tx_hash, - txType=tx.tx_type, - fromAddress=tx.from_address.to_string(), - nonce=tx.nonce, - gasPrice=tx.gas_price, - gas=tx.gas_limit, - toAddress=tx.to_address, - value=tx.value, - data=tx.call_data, - chainId=tx.chain_id, - v=tx.v, - r=tx.r, - s=tx.s, - ) - - class _RpcEthTxReceiptResp(BaseJsonRpcModel): transactionHash: EthTxHashField transactionIndex: HexUIntField @@ -386,7 +332,7 @@ class _RpcBlockResp(BaseJsonRpcModel): number: HexUIntField parentHash: EthBlockHashField timestamp: HexUIntField - transactions: tuple[_RpcTxResp | EthTxHashField, ...] + transactions: tuple[RpcEthTxResp | EthTxHashField, ...] _fake_hash: Final[EthHash32Field] = "0x" + "00" * 31 + "01" _empty_root: Final[EthHash32Field] = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421" @@ -398,7 +344,7 @@ def from_raw(cls, block: NeonBlockHdrModel, tx_list: tuple[NeonTxMetaModel, ...] total_gas_used = 0 log_bloom = 0 - rpc_tx_list: list[_RpcTxResp | EthTxHashField] = list() + rpc_tx_list: list[RpcEthTxResp | EthTxHashField] = list() if not is_pending: block_hash = block.block_hash @@ -407,7 +353,7 @@ def from_raw(cls, block: NeonBlockHdrModel, tx_list: tuple[NeonTxMetaModel, ...] for tx in tx_list: total_gas_used = max(tx.neon_tx_rcpt.sum_gas_used, total_gas_used) log_bloom |= tx.neon_tx_rcpt.log_bloom - rpc_tx_list.append(_RpcTxResp.from_raw(tx) if full else tx.neon_tx_hash) + rpc_tx_list.append(RpcEthTxResp.from_raw(tx) if full else tx.neon_tx_hash) else: block_hash = None miner = None @@ -444,12 +390,12 @@ class NpBlockTxApi(NeonProxyApi): name: ClassVar[str] = "NeonRPC::BlockTransaction" @NeonProxyApi.method(name="eth_getTransactionByHash") - async def get_tx_by_hash(self, ctx: HttpRequestCtx, transaction_hash: EthTxHashField) -> _RpcTxResp | None: + async def get_tx_by_hash(self, ctx: HttpRequestCtx, transaction_hash: EthTxHashField) -> RpcEthTxResp | None: tx_hash = transaction_hash if not (meta := await self._db.get_tx_by_neon_tx_hash(tx_hash)): - if not (meta := await self._mp_client.get_tx_by_hash(self.get_ctx_id(ctx), tx_hash)): + if not (meta := await self._mp_client.get_tx_by_hash(self._get_ctx_id(ctx), tx_hash)): return None - return _RpcTxResp.from_raw(meta) + return RpcEthTxResp.from_raw(meta) @NeonProxyApi.method(name="neon_getTransactionBySenderNonce") async def get_tx_by_sender_nonce( @@ -457,13 +403,13 @@ async def get_tx_by_sender_nonce( ctx: HttpRequestCtx, sender: EthNotNoneAddressField, nonce: HexUIntField, - ) -> _RpcTxResp | None: - neon_acct = NeonAccount.from_raw(sender, self.get_chain_id(ctx)) - inc_no_chain_id = True if self.is_default_chain_id(ctx) else False + ) -> RpcEthTxResp | None: + neon_acct = NeonAccount.from_raw(sender, self._get_chain_id(ctx)) + inc_no_chain_id = True if self._is_default_chain_id(ctx) else False if not (meta := await self._db.get_tx_by_sender_nonce(neon_acct, nonce, inc_no_chain_id)): - if not (meta := await self._mp_client.get_tx_by_sender_nonce(self.get_ctx_id(ctx), neon_acct, nonce)): + if not (meta := await self._mp_client.get_tx_by_sender_nonce(self._get_ctx_id(ctx), neon_acct, nonce)): return None - return _RpcTxResp.from_raw(meta) + return RpcEthTxResp.from_raw(meta) @NeonProxyApi.method(name="eth_getTransactionReceipt") async def get_tx_receipt(self, transaction_hash: EthTxHashField) -> _RpcEthTxReceiptResp | None: @@ -473,22 +419,22 @@ async def get_tx_receipt(self, transaction_hash: EthTxHashField) -> _RpcEthTxRec return _RpcEthTxReceiptResp.from_raw(neon_tx_meta) @NeonProxyApi.method(name="eth_getTransactionByBlockNumberAndIndex") - async def get_tx_by_block_number_idx(self, block_tag: RpcBlockRequest, index: HexUIntField) -> _RpcTxResp | None: + async def get_tx_by_block_number_idx(self, block_tag: RpcBlockRequest, index: HexUIntField) -> RpcEthTxResp | None: block = await self.get_block_by_tag(block_tag) if block.is_empty: return None elif not (neon_tx_meta := await self._db.get_tx_by_slot_tx_idx(block.slot, index)): return None - return _RpcTxResp.from_raw(neon_tx_meta) + return RpcEthTxResp.from_raw(neon_tx_meta) @NeonProxyApi.method(name="eth_getTransactionByBlockHashAndIndex") - async def get_tx_by_block_hash_idx(self, block_hash: EthBlockHashField, index: HexUIntField) -> _RpcTxResp | None: + async def get_tx_by_block_hash_idx(self, block_hash: EthBlockHashField, index: HexUIntField) -> RpcEthTxResp | None: block = await self._db.get_block_by_hash(block_hash) if block.is_empty: return None elif not (neon_tx_meta := await self._db.get_tx_by_slot_tx_idx(block.slot, index)): return None - return _RpcTxResp.from_raw(neon_tx_meta) + return RpcEthTxResp.from_raw(neon_tx_meta) @NeonProxyApi.method(name="eth_getBlockByNumber") async def get_block_by_number(self, block_tag: RpcBlockRequest, full: bool) -> _RpcBlockResp | None: diff --git a/proxy/rpc/np_call_api.py b/proxy/rpc/np_call_api.py index a2f883e5..249ec7c6 100644 --- a/proxy/rpc/np_call_api.py +++ b/proxy/rpc/np_call_api.py @@ -13,10 +13,12 @@ from common.neon.transaction_model import NeonTxModel from common.neon_rpc.api import EmulAccountMetaModel, EmulNeonCallResp, EmulNeonCallModel from common.solana.pubkey import SolPubKeyField +from common.utils.cached import cached_property from common.utils.pydantic import HexUIntField, RootModel -from .api import RpcCallRequest, RpcBlockRequest, RpcNeonCallRequest -from .gas_limit_calculator import NpGasLimitCalculator +from .api import RpcBlockRequest, RpcNeonCallRequest from .server_abc import NeonProxyApi +from ..base.rpc_gas_limit_calculator import RpcNeonGasLimitCalculator +from ..base.rpc_api import RpcEthTxRequest class _RpcEthAccountModel(BaseJsonRpcModel): @@ -92,25 +94,25 @@ def from_raw(cls, raw: _RpcEmulatorResp | EmulNeonCallResp | None) -> Self | Non class NpCallApi(NeonProxyApi): name: ClassVar[str] = "NeonRPC::CallAndEmulate" - @property - def _gas_limit_calc(self) -> NpGasLimitCalculator: - return self._server._gas_limit_calc # noqa + @cached_property + def _gas_limit_calc(self) -> RpcNeonGasLimitCalculator: + return RpcNeonGasLimitCalculator(self._server) @NeonProxyApi.method(name="eth_call") async def eth_call( self, ctx: HttpRequestCtx, - call: RpcCallRequest, + call: RpcEthTxRequest, block_tag: RpcBlockRequest = RpcBlockRequest.latest(), object_state: _RpcEthStateRequest = _RpcEthStateRequest.default(), ) -> EthBinStrField: - chain_id = self.get_chain_id(ctx) + chain_id = self._get_chain_id(ctx) if call.chainId and call.chainId != chain_id: raise EthWrongChainIdError() _ = object_state block = await self.get_block_by_tag(block_tag) - evm_cfg = await self.get_evm_cfg() + evm_cfg = await self._get_evm_cfg() resp = await self._core_api_client.emulate_neon_call( evm_cfg, call.to_emulation_call(chain_id), @@ -123,10 +125,10 @@ async def eth_call( async def estimate_gas( self, ctx: HttpRequestCtx, - call: RpcCallRequest, + call: RpcEthTxRequest, block_tag: RpcBlockRequest = RpcBlockRequest.latest(), ) -> HexUIntField: - chain_id = self.get_chain_id(ctx) + chain_id = self._get_chain_id(ctx) if call.chainId and call.chainId != chain_id: raise EthWrongChainIdError() @@ -137,11 +139,11 @@ async def estimate_gas( async def neon_estimate_gas( self, ctx: HttpRequestCtx, - call: RpcCallRequest, + call: RpcEthTxRequest, neon_call: RpcNeonCallRequest = RpcNeonCallRequest.default(), block_tag: RpcBlockRequest = RpcBlockRequest.latest(), ) -> HexUIntField: - chain_id = self.get_chain_id(ctx) + chain_id = self._get_chain_id(ctx) if call.chainId and call.chainId != chain_id: raise EthWrongChainIdError() @@ -157,8 +159,8 @@ async def neon_emulate( block_tag: RpcBlockRequest = RpcBlockRequest.latest(), ) -> _RpcEmulatorResp: """Executes emulator with given transaction""" - evm_cfg = await self.get_evm_cfg() - chain_id = self.get_chain_id(ctx) + evm_cfg = await self._get_evm_cfg() + chain_id = self._get_chain_id(ctx) block = await self.get_block_by_tag(block_tag) neon_tx = NeonTxModel.from_raw(raw_signed_tx.to_bytes()) diff --git a/proxy/rpc/np_gas_price.py b/proxy/rpc/np_gas_price.py index 5d0dcb8f..3d027fd6 100644 --- a/proxy/rpc/np_gas_price.py +++ b/proxy/rpc/np_gas_price.py @@ -109,7 +109,7 @@ class NpGasPriceApi(NeonProxyApi): @NeonProxyApi.method(name="eth_gasPrice") async def get_eth_gas_price(self, ctx: HttpRequestCtx) -> HexUIntField: - _, token_gas_price = await self.get_token_gas_price(ctx) + _, token_gas_price = await self._get_token_gas_price(ctx) return token_gas_price.suggested_gas_price @NeonProxyApi.method(name="neon_gasPrice") @@ -118,7 +118,7 @@ async def get_neon_gas_price( ctx: HttpRequestCtx, call: _RpcGasCallRequest = _RpcGasCallRequest.default(), ) -> _RpcGasPriceModel: - gas_price, token_gas_price = await self.get_token_gas_price(ctx) + gas_price, token_gas_price = await self._get_token_gas_price(ctx) if call.fromAddress.is_empty: return _RpcDefaultGasPriceModel.from_raw(gas_price, token_gas_price) @@ -131,7 +131,7 @@ async def get_neon_gas_price( tx_gas_limit = call.gas or 0 - if await self.has_fee_less_tx_permit(ctx, call.fromAddress, call.toAddress, tx_nonce, tx_gas_limit): + if await self._has_fee_less_tx_permit(ctx, call.fromAddress, call.toAddress, tx_nonce, tx_gas_limit): return _RpcDefaultGasPriceModel.from_raw(gas_price, token_gas_price, def_gas_price=0) return _RpcDefaultGasPriceModel.from_raw(gas_price, token_gas_price) diff --git a/proxy/rpc/np_send_transaction_api.py b/proxy/rpc/np_send_transaction_api.py index e2f4efc3..79ca6101 100644 --- a/proxy/rpc/np_send_transaction_api.py +++ b/proxy/rpc/np_send_transaction_api.py @@ -2,15 +2,11 @@ from typing import ClassVar from common.ethereum.bin_str import EthBinStrField -from common.ethereum.errors import EthError, EthNonceTooLowError, EthNonceTooHighError, EthWrongChainIdError -from common.ethereum.hash import EthTxHashField, EthTxHash +from common.ethereum.hash import EthTxHashField from common.http.utils import HttpRequestCtx -from common.jsonrpc.errors import InvalidParamError -from common.neon.transaction_model import NeonTxModel -from common.utils.json_logger import logging_context +from common.utils.cached import cached_property from .server_abc import NeonProxyApi -from .transaction_validator import NpTxValidator -from ..base.mp_api import MpTxRespCode +from ..base.rpc_transaction_executor import RpcNeonTxExecutor _LOG = logging.getLogger(__name__) @@ -18,57 +14,10 @@ class NpExecTxApi(NeonProxyApi): name: ClassVar[str] = "NeonRPC::ExecuteTransaction" - @property - def _tx_validator(self) -> NpTxValidator: - return self._server._tx_validator # noqa + @cached_property + def _tx_executor(self) -> RpcNeonTxExecutor: + return RpcNeonTxExecutor(self._server) @NeonProxyApi.method(name="eth_sendRawTransaction") async def send_raw_tx(self, ctx: HttpRequestCtx, raw_tx: EthBinStrField) -> EthTxHashField: - try: - eth_tx_rlp = raw_tx.to_bytes() - neon_tx = NeonTxModel.from_raw(eth_tx_rlp, raise_exception=True) - except EthError: - raise - except (BaseException,): - raise InvalidParamError(message="wrong transaction format") - - tx_id = neon_tx.neon_tx_hash.to_bytes()[:4].hex() - with logging_context(tx=tx_id): - _LOG.debug("sendRawTransaction %s: %s", neon_tx.neon_tx_hash, neon_tx) - # validate that tx was executed 2 times (second after sending to mempool) - if await self._is_neon_tx_exist(neon_tx.neon_tx_hash): - return neon_tx.neon_tx_hash - - try: - ctx_id = self.get_ctx_id(ctx) - acct = await self._tx_validator.validate(ctx, neon_tx) - resp = await self._mp_client.send_raw_transaction(ctx_id, eth_tx_rlp, acct.chain_id, acct.state_tx_cnt) - - if resp.code in (MpTxRespCode.Success, MpTxRespCode.AlreadyKnown): - return neon_tx.neon_tx_hash - elif resp.code == MpTxRespCode.NonceTooLow: - # revalidate that tx was finalized - if await self._is_neon_tx_exist(neon_tx.neon_tx_hash): - return neon_tx.neon_tx_hash - - EthNonceTooLowError.raise_error(neon_tx.nonce, resp.state_tx_cnt, sender=acct.address) - elif resp.code == MpTxRespCode.Underprice: - raise EthError(message="replacement transaction underpriced") - elif resp.code == MpTxRespCode.NonceTooHigh: - raise EthNonceTooHighError.raise_error(neon_tx.nonce, resp.state_tx_cnt, sender=acct.address) - elif resp.code == MpTxRespCode.UnknownChainID: - raise EthWrongChainIdError() - else: - raise EthError(message="unknown error") - - except BaseException as exc: - if not isinstance(exc, EthError): - _LOG.error("unexpected error on eth_sendRawTransaction", exc_info=exc, extra=self._msg_filter) - raise - - async def _is_neon_tx_exist(self, tx_hash: EthTxHash) -> bool: - if tx_meta := await self._db.get_tx_by_neon_tx_hash(tx_hash): - if tx_meta.neon_tx_rcpt.slot <= await self._db.get_finalized_slot(): - raise EthError(message="already known") - return True - return False + return await self._tx_executor.send_neon_tx(ctx, raw_tx.to_bytes()) diff --git a/proxy/rpc/np_transaction_logs_api.py b/proxy/rpc/np_transaction_logs_api.py index c4eca1b5..1aa17430 100644 --- a/proxy/rpc/np_transaction_logs_api.py +++ b/proxy/rpc/np_transaction_logs_api.py @@ -19,10 +19,7 @@ class _RpcLogListRequest(BaseJsonRpcModel): address: EthAddressField | list[EthAddressField] = Field(EthAddress.default()) fromBlock: RpcBlockRequest | None = None toBlock: RpcBlockRequest | None = None - topicList: list[EthHash32Field | list[EthHash32Field]] = Field( - default_factory=list, - validation_alias="topics", - ) + topicList: list[EthHash32Field | list[EthHash32Field]] = Field(default_factory=list, validation_alias="topics") @cached_property def address_list(self) -> tuple[EthAddress, ...]: diff --git a/proxy/rpc/np_version_api.py b/proxy/rpc/np_version_api.py index 9b229b24..98f94981 100644 --- a/proxy/rpc/np_version_api.py +++ b/proxy/rpc/np_version_api.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import ClassVar + from eth_hash.auto import keccak from common.config.constants import NEON_PROXY_PKG_VER @@ -52,7 +53,7 @@ async def neon_core_api_version(self) -> str: @NeonProxyApi.method(name=["neon_evmVersion", "web3_clientVersion", "neon_evm_version"]) async def neon_evm_version(self) -> str: - evm_cfg = await self.get_evm_cfg() + evm_cfg = await self._get_evm_cfg() return evm_cfg.package_version @NeonProxyApi.method(name="neon_proxyVersion") @@ -79,7 +80,7 @@ async def neon_versions(self) -> _RpcVersionResp: @NeonProxyApi.method(name="eth_chainId") def get_eth_chain_id(self, ctx: HttpRequestCtx) -> HexUIntField: - return self.get_chain_id(ctx) + return self._get_chain_id(ctx) @NeonProxyApi.method(name="net_version") async def get_net_version(self) -> str: @@ -88,7 +89,7 @@ async def get_net_version(self) -> str: @NeonProxyApi.method(name="neon_getEvmParams") async def get_neon_evm_param(self) -> _RpcNeonEvmParamResp: - evm_cfg = await self.get_evm_cfg() + evm_cfg = await self._get_evm_cfg() def _get_int_param(_name: str) -> int | None: if value := evm_cfg.evm_param_dict.get(_name, None): diff --git a/proxy/rpc/server.py b/proxy/rpc/server.py index 30660185..2f0a537a 100644 --- a/proxy/rpc/server.py +++ b/proxy/rpc/server.py @@ -1,8 +1,8 @@ from __future__ import annotations import logging +from typing import ClassVar -from .gas_limit_calculator import NpGasLimitCalculator from .np_account_api import NpAccountApi from .np_block_transaction_api import NpBlockTxApi from .np_call_api import NpCallApi @@ -12,21 +12,20 @@ from .np_transaction_logs_api import NpTxLogsApi from .np_version_api import NpVersionApi from .server_abc import NeonProxyAbc -from .transaction_validator import NpTxValidator +_ENDPOINT_LIST = ["/solana", "/solana/:token", "/", "/:token"] _LOG = logging.getLogger(__name__) class NeonProxy(NeonProxyAbc): + _stat_name: ClassVar[str] = "PublicRpc" + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.listen(host="0.0.0.0", port=self._cfg.rpc_public_port) self.set_worker_cnt(self._cfg.rpc_worker_cnt) self._process_pool.set_process_cnt(self._cfg.rpc_process_cnt) - self._gas_limit_calc = NpGasLimitCalculator(self) - self._tx_validator = NpTxValidator(self) - self._add_api(NpVersionApi(self)) self._add_api(NpBlockTxApi(self)) self._add_api(NpGasPriceApi(self)) @@ -38,4 +37,6 @@ def __init__(self, *args, **kwargs) -> None: if self._cfg.enable_send_tx_api: self._add_api(NpExecTxApi(self)) - + @classmethod + def _get_endpoint_list(cls) -> list[str]: + return _ENDPOINT_LIST diff --git a/proxy/rpc/server_abc.py b/proxy/rpc/server_abc.py index aa143d81..4788899c 100644 --- a/proxy/rpc/server_abc.py +++ b/proxy/rpc/server_abc.py @@ -1,110 +1,52 @@ from __future__ import annotations -import asyncio -import hashlib +import abc import logging -from typing import Callable, Final, ClassVar +from typing import Final import base58 -from typing_extensions import Self from common.config.config import Config from common.config.constants import ( MAINNET_PROGRAM_ID, - MAINNET_GENESIS_HASH, MAINNET_GENESIS_TIME, + MAINNET_GENESIS_HASH, DEVNET_PROGRAM_ID, - DEVNET_GENESIS_TIME, DEVNET_GENESIS_HASH, + DEVNET_GENESIS_TIME, UNKNOWN_GENESIS_HASH, ) -from common.config.utils import LogMsgFilter from common.ethereum.commit_level import EthCommit from common.ethereum.errors import EthError from common.ethereum.hash import EthAddress, EthBlockHash from common.http.errors import HttpRouteError from common.http.utils import HttpRequestCtx -from common.jsonrpc.api import JsonRpcListRequest, JsonRpcListResp, JsonRpcRequest, JsonRpcResp -from common.jsonrpc.server import JsonRpcApi, JsonRpcServer +from common.jsonrpc.server import JsonRpcApi from common.neon.block import NeonBlockHdrModel from common.neon.neon_program import NeonProg -from common.neon_rpc.api import EvmConfigModel from common.neon_rpc.client import CoreApiClient from common.solana.commit_level import SolCommit from common.solana_rpc.client import SolClient -from common.stat.api import RpcCallData -from common.utils.cached import cached_property, ttl_cached_method -from common.utils.json_logger import logging_context, log_msg -from common.utils.process_pool import ProcessPool +from common.utils.cached import ttl_cached_method from gas_tank.db.gas_less_accounts_db import GasLessAccountDb from indexer.db.indexer_db_client import IndexerDbClient from .api import RpcBlockRequest from ..base.mp_api import MpGasPriceModel, MpTokenGasPriceModel from ..base.mp_client import MempoolClient +from ..base.rpc_server_abc import BaseRpcServerAbc, BaseRpcServerComponent from ..stat.client import StatClient -_ENDPOINT_LIST = ("/solana", "/solana/:token", "/", "/:token") _LOG = logging.getLogger(__name__) -class NeonProxyComponent: +class NeonProxyComponent(BaseRpcServerComponent): def __init__(self, server: NeonProxyAbc) -> None: - super().__init__() + super().__init__(server) self._server = server - @cached_property - def _cfg(self) -> Config: - return self._server._cfg # noqa - - @cached_property - def _core_api_client(self) -> CoreApiClient: - return self._server._core_api_client # noqa - - @cached_property - def _sol_client(self) -> SolClient: - return self._server._sol_client # noqa - - @cached_property - def _mp_client(self) -> MempoolClient: - return self._server._mp_client # noqa - - @cached_property - def _db(self) -> IndexerDbClient: - return self._server._db # noqa - - @cached_property - def _msg_filter(self) -> LogMsgFilter: - return self._server._msg_filter # noqa - - @staticmethod - def get_ctx_id(ctx: HttpRequestCtx) -> str: - ctx_id = getattr(ctx, "ctx_id", None) - assert ctx_id is not None - return ctx_id - - @staticmethod - def get_chain_id(ctx: HttpRequestCtx) -> int: - chain_id = getattr(ctx, "chain_id", None) - assert chain_id is not None - return chain_id - - @staticmethod - def is_default_chain_id(ctx: HttpRequestCtx) -> bool: - return getattr(ctx, "is_default_chain_id", False) - - async def get_evm_cfg(self) -> EvmConfigModel: - return await self._server.get_evm_cfg() - async def get_gas_price(self) -> MpGasPriceModel: return await self._server.get_gas_price() - async def get_token_gas_price(self, ctx: HttpRequestCtx) -> tuple[MpGasPriceModel, MpTokenGasPriceModel]: - gas_price = await self.get_gas_price() - token_price = gas_price.chain_dict.get(getattr(ctx, "chain_id"), None) - if token_price is None: - raise HttpRouteError() - return gas_price, token_price - async def get_block_by_tag(self, block_tag: RpcBlockRequest) -> NeonBlockHdrModel: if block_tag.is_block_hash: block = await self._db.get_block_by_hash(block_tag.block_hash) @@ -133,19 +75,6 @@ async def get_block_by_tag(self, block_tag: RpcBlockRequest) -> NeonBlockHdrMode return block - async def has_fee_less_tx_permit( - self, - ctx: HttpRequestCtx, - sender: EthAddress, - contract: EthAddress, - tx_nonce: int, - tx_gas_limit: int, - ) -> bool: - if not self.is_default_chain_id(ctx): - return False - gas_tank = self._server._gas_tank # noqa - return await gas_tank.has_fee_less_tx_permit(sender, contract, tx_nonce, tx_gas_limit) - class NeonProxyApi(NeonProxyComponent, JsonRpcApi): def __init__(self, server: NeonProxyAbc) -> None: @@ -153,21 +82,7 @@ def __init__(self, server: NeonProxyAbc) -> None: NeonProxyComponent.__init__(self, server) -class NeonProxyAbc(JsonRpcServer): - _stat_name: ClassVar[str] = "PublicRpc" - - class _ProcessPool(ProcessPool): - def __init__(self, server: NeonProxyAbc) -> None: - super().__init__() - self._server = server - - def _on_process_start(self, idx: int) -> None: - self._server._on_process_start(idx) - - def _on_process_stop(self) -> None: - self._server._on_process_stop() - self._server = None - +class NeonProxyAbc(BaseRpcServerAbc, abc.ABC): def __init__( self, cfg: Config, @@ -178,160 +93,25 @@ def __init__( db: IndexerDbClient, gas_tank: GasLessAccountDb, ) -> None: - super().__init__(cfg) - - self._idx = -1 - self._core_api_client = core_api_client - self._sol_client = sol_client - self._mp_client = mp_client - self._stat_client = stat_client - self._db = db + super().__init__(cfg, core_api_client, sol_client, mp_client, stat_client, db) self._gas_tank = gas_tank self._genesis_block: NeonBlockHdrModel | None = None - self._process_pool = self._ProcessPool(self) - - async def _on_server_start(self) -> None: - try: - if not self._idx: - self._db.enable_debug_query() - - await asyncio.gather( - self._db.start(), - self._stat_client.start(), - self._gas_tank.start(), - self._mp_client.start(), - self._sol_client.start(), - self._core_api_client.start(), - ) - await self._init_genesis_block() - except BaseException as exc: - _LOG.error("error on start public RPC", exc_info=exc, extra=self._msg_filter) - - async def _on_server_stop(self) -> None: - await asyncio.gather( - self._gas_tank.stop(), - self._mp_client.stop(), - self._core_api_client.stop(), - self._sol_client.stop(), - self._db.stop(), - ) @property def genesis_block(self) -> NeonBlockHdrModel: return self._genesis_block - def _add_api(self, api: NeonProxyApi) -> Self: - for endpoint in _ENDPOINT_LIST: - super().add_api(api, endpoint=endpoint) - return self - - @ttl_cached_method(ttl_sec=1) - async def get_evm_cfg(self) -> EvmConfigModel: - # forwarding request to mempool allows to limit the number of requests to Solana to maximum 1 time per second - # for details, see the mempool_server::get_evm_cfg() implementation - evm_cfg = await self._mp_client.get_evm_cfg() - NeonProg.init_prog(evm_cfg.treasury_pool_cnt, evm_cfg.treasury_pool_seed, evm_cfg.version) - return evm_cfg - - @ttl_cached_method(ttl_sec=1) - async def get_gas_price(self) -> MpGasPriceModel: - # for details, see the mempool_server::get_gas_price() implementation - gas_price = await self._mp_client.get_gas_price() - if gas_price.is_empty: - raise EthError(message="Failed to calculate gas price. Try again later") - return gas_price - - @staticmethod - def get_ctx_id(ctx: HttpRequestCtx) -> str: - if ctx_id := getattr(ctx, "ctx_id", None): - return ctx_id - - size = len(ctx.request.body) - raw_value = f"{ctx.ip_addr}:{size}:{ctx.start_time_nsec}" - ctx_id = hashlib.md5(bytes(raw_value, "utf-8")).hexdigest()[:8] - ctx.set_property_value("ctx_id", ctx_id) - return ctx_id - - async def _validate_chain_id(self, ctx: HttpRequestCtx) -> None: - NeonProg.validate_protocol() - - if not getattr(ctx, "chain_id", None): - await self._set_chain_id(ctx) - - async def _set_chain_id(self, ctx: HttpRequestCtx) -> int: - evm_cfg = await self.get_evm_cfg() - if not (token_name := ctx.request.path_params.get("token", "").strip().upper()): - chain_id = evm_cfg.default_chain_id - ctx.set_property_value("is_default_chain_id", True) - elif token := evm_cfg.token_dict.get(token_name, None): - chain_id = token.chain_id - ctx.set_property_value("is_default_chain_id", token.is_default) - else: - raise HttpRouteError() - - ctx.set_property_value("chain_id", chain_id) - return chain_id - - async def on_request_list(self, ctx: HttpRequestCtx, request: JsonRpcListRequest) -> None: - await self._validate_chain_id(ctx) - with logging_context(ctx=self.get_ctx_id(ctx)): - _LOG.info(log_msg("handle BIG request <<< {IP} size={Size}", IP=ctx.ip_addr, Size=len(request.root))) - - def on_response_list(self, ctx: HttpRequestCtx, resp: JsonRpcListResp) -> None: - with logging_context(ctx=self.get_ctx_id(ctx)): - msg = log_msg( - "done BIG request >>> {IP} size={Size} resp_time={TimeMS} msec", - IP=ctx.ip_addr, - Size=len(resp), - TimeMS=ctx.process_time_msec, - ) - _LOG.info(msg) - - stat = RpcCallData(service=self._stat_name, method="BIG", time_nsec=ctx.process_time_nsec, is_error=False) - self._stat_client.commit_rpc_call(stat) - - def on_bad_request(self, ctx: HttpRequestCtx) -> None: - _LOG.warning(log_msg("BAD request from {IP} with size {Size}", IP=ctx.ip_addr, Size=len(ctx.request.body))) - - stat = RpcCallData(service=self._stat_name, method="UNKNOWN", time_nsec=ctx.process_time_nsec, is_error=True) - self._stat_client.commit_rpc_call(stat) - - async def handle_request( + async def has_fee_less_tx_permit( self, ctx: HttpRequestCtx, - request: JsonRpcRequest, - handler: Callable, - ) -> JsonRpcResp: - await self._validate_chain_id(ctx) - - info = dict(IP=ctx.ip_addr, ReqID=request.id, Method=request.method) - with logging_context(ctx=self.get_ctx_id(ctx)): - _LOG.info(log_msg("handle request <<< {IP} req={ReqID} {Method} {Params}", Params=request.params, **info)) - - resp = await handler(ctx, request) - if resp.is_error: - msg = log_msg( - "error on request >>> {IP} req={ReqID} {Method} {Error} resp_time={TimeMS} msec", - Error=resp.error, - **info, - ) - else: - msg = log_msg( - "done request >>> {IP} req={ReqID} {Method} {Result} resp_time={TimeMS} msec", - Result=resp.result, - **info, - ) - _LOG.info(dict(**msg, TimeMS=ctx.process_time_msec)) - - stat = RpcCallData( - service=self._stat_name, - method=request.method, - time_nsec=ctx.process_time_nsec, - is_error=resp.is_error, - ) - self._stat_client.commit_rpc_call(stat) - - return resp + sender: EthAddress, + contract: EthAddress, + tx_nonce: int, + tx_gas_limit: int, + ) -> bool: + if not self.is_default_chain_id(ctx): + return False + return await self._gas_tank.has_fee_less_tx_permit(sender, contract, tx_nonce, tx_gas_limit) async def _init_genesis_block(self) -> None: parent_hash: Final[EthBlockHash] = EthBlockHash.from_raw(b"\0" * 32) @@ -363,16 +143,17 @@ async def _init_genesis_block(self) -> None: if not self._idx: _LOG.debug("genesis hash %s, genesis time %s", block_hash, block_time) - def start(self) -> None: - self._register_handler_list() - self._process_pool.start() - - def stop(self) -> None: - self._process_pool.stop() - - def _on_process_start(self, idx: int) -> None: - self._idx = idx - super().start() + async def _on_server_start(self) -> None: + try: + await super()._on_server_start() + await self._gas_tank.start() + await self._init_genesis_block() + except BaseException as exc: + _LOG.error("error on start public RPC", exc_info=exc, extra=self._msg_filter) - def _on_process_stop(self) -> None: - super().stop() + async def _on_server_stop(self) -> None: + try: + await self._gas_tank.stop() + await super()._on_server_stop() + except BaseException as exc: + _LOG.error("error on stop public RPC", exc_info=exc, extra=self._msg_filter) diff --git a/proxy/stat/data.py b/proxy/stat/data.py index d845834a..757f9838 100644 --- a/proxy/stat/data.py +++ b/proxy/stat/data.py @@ -1,7 +1,5 @@ from __future__ import annotations -from decimal import Decimal - from common.neon.account import NeonAccountField from common.neon_rpc.api import TokenModel from common.solana.pubkey import SolPubKeyField