Skip to content

Commit

Permalink
refactor: miscellaneous minor refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Nov 7, 2024
1 parent aedf046 commit 09f1d09
Show file tree
Hide file tree
Showing 29 changed files with 204 additions and 105 deletions.
2 changes: 1 addition & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def _get_or_create_rocksdb_storage(self) -> RocksDBStorage:
if self._rocksdb_storage is not None:
return self._rocksdb_storage

kwargs = {}
kwargs: dict[str, Any] = {}
if self._rocksdb_cache_capacity is not None:
kwargs = dict(cache_capacity=self._rocksdb_cache_capacity)

Expand Down
35 changes: 26 additions & 9 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from hathor.cli.run_node_args import RunNodeArgs
from hathor.cli.side_dag import SideDagArgs
from hathor.cli.util import LoggingOptions, LoggingOutput
from hathor.consensus import ConsensusAlgorithm
from hathor.daa import DifficultyAdjustmentAlgorithm
from hathor.event import EventManager
Expand Down Expand Up @@ -51,8 +52,6 @@

logger = get_logger()

DEFAULT_CACHE_SIZE: int = 100000


class SyncChoice(Enum):
V1_DEFAULT = auto() # v1 enabled, v2 disabled but can be enabled in runtime
Expand All @@ -66,9 +65,10 @@ class CliBuilder:
TODO Refactor to use Builder. It could even be ported to a Builder.from_args classmethod.
"""
def __init__(self, args: RunNodeArgs) -> None:
def __init__(self, args: RunNodeArgs, logging_args: tuple[LoggingOutput, LoggingOptions, bool]) -> None:
self.log = logger.new()
self._args = args
self._logging_args = logging_args

def check_or_raise(self, condition: bool, message: str) -> None:
"""Will exit printing `message` if `condition` is False."""
Expand Down Expand Up @@ -183,10 +183,14 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
self.log.warn('using --cache-interval with --memory-storage has no effect')

if not self._args.disable_cache and not self._args.memory_storage:
tx_storage = TransactionCacheStorage(tx_storage, reactor, indexes=indexes, settings=settings)
tx_storage.capacity = self._args.cache_size if self._args.cache_size is not None else DEFAULT_CACHE_SIZE
if self._args.cache_interval:
tx_storage.interval = self._args.cache_interval
tx_storage = TransactionCacheStorage(
reactor=reactor,
settings=settings,
store=tx_storage,
indexes=indexes,
capacity=self._args.cache_size,
interval=self._args.cache_interval,
)
self.log.info('with cache', capacity=tx_storage.capacity, interval=tx_storage.interval)

self.tx_storage = tx_storage
Expand Down Expand Up @@ -332,11 +336,24 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
log_vertex_bytes=self._args.log_vertex_bytes,
)

whitelist_only = False
use_ssl = True

if self._args.x_multiprocess_p2p:
self.check_or_raise(
self._args.x_remove_sync_v1,
'multiprocess support for P2P is only available if sync-v1 is removed (use --x-remove-sync-v1)'
)

self.check_or_raise(
(
not self._args.memory_storage
and bool(self._args.data)
and not self._args.memory_indexes
and not self._args.disable_cache
),
'multiprocess support for P2P is only available if rocksdb is used, with cache and rocksdb indexes'
)
raise NotImplementedError('Multiprocess support for P2P is not yet implemented.')

p2p_dependencies = P2PDependencies(
Expand All @@ -346,15 +363,15 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
tx_storage=tx_storage,
vertex_handler=vertex_handler,
verification_service=verification_service,
whitelist_only=False,
whitelist_only=whitelist_only,
capabilities=capabilities,
)

p2p_manager = ConnectionsManager(
dependencies=p2p_dependencies,
my_peer=peer,
pubsub=pubsub,
ssl=True,
ssl=use_ssl,
rng=Random(),
)

Expand Down
7 changes: 5 additions & 2 deletions hathor/cli/db_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import io
import struct
from argparse import ArgumentParser, FileType
Expand All @@ -20,6 +22,7 @@
from hathor.cli.run_node import RunNode

if TYPE_CHECKING:
from hathor.cli.util import LoggingOptions, LoggingOutput
from hathor.transaction import BaseTransaction

MAGIC_HEADER = b'HathDB'
Expand Down Expand Up @@ -139,5 +142,5 @@ def run(self) -> None:
self.log.info('exported', tx_count=tx_count, block_count=block_count)


def main():
DbExport().run()
def main(*, logging_args: tuple[LoggingOutput, LoggingOptions, bool]) -> None:
DbExport(logging_args=logging_args).run()
7 changes: 5 additions & 2 deletions hathor/cli/db_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import io
import struct
import sys
Expand All @@ -21,6 +23,7 @@
from hathor.cli.run_node import RunNode

if TYPE_CHECKING:
from hathor.cli.util import LoggingOptions, LoggingOutput
from hathor.transaction import BaseTransaction


Expand Down Expand Up @@ -94,5 +97,5 @@ def _import_txs(self) -> Iterator['BaseTransaction']:
yield tx


def main():
DbImport().run()
def main(*, logging_args: tuple[LoggingOutput, LoggingOptions, bool]) -> None:
DbImport(logging_args=logging_args).run()
10 changes: 8 additions & 2 deletions hathor/cli/load_from_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import sys
from argparse import ArgumentParser, FileType
from typing import TYPE_CHECKING

from twisted.internet.defer import Deferred
from twisted.internet.task import deferLater

from hathor.cli.run_node import RunNode

if TYPE_CHECKING:
from hathor.cli.util import LoggingOptions, LoggingOutput


class LoadFromLogs(RunNode):
@classmethod
Expand Down Expand Up @@ -54,5 +60,5 @@ async def _load_from_logs(self) -> None:
self.reactor.fireSystemEvent('shutdown')


def main():
LoadFromLogs().run()
def main(*, logging_args: tuple[LoggingOutput, LoggingOptions, bool]) -> None:
LoadFromLogs(logging_args=logging_args).run()
6 changes: 5 additions & 1 deletion hathor/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ def execute_from_command_line(self):
output = process_logging_output(sys.argv)
options = process_logging_options(sys.argv)
setup_logging(logging_output=output, logging_options=options, capture_stdout=capture_stdout)
module.main()
try:
module.main(logging_args=(output, options, capture_stdout))
except TypeError:
# TODO: Temporary workaround for calling main() functions that do not take `logging_args`.
module.main()
else:
module.main(capture_stdout=capture_stdout)

Expand Down
5 changes: 3 additions & 2 deletions hathor/cli/quick_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from hathor.cli.run_node import RunNode

if TYPE_CHECKING:
from hathor.cli.util import LoggingOptions, LoggingOutput
from hathor.transaction import Vertex

logger = get_logger()
Expand Down Expand Up @@ -100,5 +101,5 @@ def run(self) -> None:
super().run()


def main():
QuickTest().run()
def main(*, logging_args: tuple[LoggingOutput, LoggingOptions, bool]) -> None:
QuickTest(logging_args=logging_args).run()
15 changes: 11 additions & 4 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import os
import sys
import tempfile
Expand All @@ -27,6 +29,7 @@

if TYPE_CHECKING:
from hathor.cli.run_node_args import RunNodeArgs
from hathor.cli.util import LoggingOptions, LoggingOutput
from hathor.sysctl.runner import SysctlRunner


Expand Down Expand Up @@ -192,7 +195,7 @@ def prepare(self, *, register_resources: bool = True) -> None:

from hathor.builder import CliBuilder, ResourcesBuilder
from hathor.exception import BuilderError
builder = CliBuilder(self._args)
builder = CliBuilder(self._args, self.logging_args)
try:
self.manager = builder.create_manager(reactor)
except BuilderError as err:
Expand Down Expand Up @@ -467,11 +470,15 @@ def check_python_version(self) -> None:
'',
]))

def __init__(self, *, argv=None):
def __init__(self, *, logging_args, argv=None):
from hathor.conf import NANO_TESTNET_SETTINGS_FILEPATH, TESTNET_SETTINGS_FILEPATH
from hathor.conf.get_settings import get_global_settings
self.log = logger.new()

# TODO: We should correctly type the method definition so this comment wouldn't be necessary,
# but it breaks the linter in multiple other places. Will do it later.
self.logging_args = logging_args # tuple[LoggingOutput, LoggingOptions, bool]

if argv is None:
import sys
argv = sys.argv[1:]
Expand Down Expand Up @@ -549,5 +556,5 @@ def run(self) -> None:
self.reactor.run()


def main():
RunNode().run()
def main(*, logging_args: tuple[LoggingOutput, LoggingOptions, bool]) -> None:
RunNode(logging_args=logging_args).run()
11 changes: 8 additions & 3 deletions hathor/cli/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from argparse import Namespace
from typing import Any, Callable
from typing import TYPE_CHECKING, Any, Callable

from hathor.cli.run_node import RunNode

if TYPE_CHECKING:
from hathor.cli.util import LoggingOptions, LoggingOutput


def get_ipython(extra_args: list[Any], imported_objects: dict[str, Any]) -> Callable[[], None]:
from IPython import start_ipython
Expand Down Expand Up @@ -65,5 +70,5 @@ def run(self) -> None:
self.shell()


def main():
Shell().run()
def main(*, logging_args: tuple[LoggingOutput, LoggingOptions, bool]) -> None:
Shell(logging_args=logging_args).run()
2 changes: 1 addition & 1 deletion hathor/cli/side_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def _run_node(args: _RunNodeArgs) -> None:
extra_log_info=_get_extra_log_info(args.name)
)
logger.info(f'initializing node "{args.name}"')
node = args.runner(argv=args.argv)
node = args.runner(argv=args.argv, logging_args=(args.logging_output, log_options, args.capture_stdout))
except KeyboardInterrupt:
logger.warn(f'{args.name} node interrupted by user')
return
Expand Down
12 changes: 10 additions & 2 deletions hathor/multiprocess/connect_on_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import os
import pickle
import signal
import sys
from pathlib import Path
Expand All @@ -27,6 +28,7 @@
from twisted.protocols.tls import BufferingTLSTransport
from twisted.python.failure import Failure

from hathor.cli.util import LoggingOptions, LoggingOutput
from hathor.multiprocess.subprocess_runner import SubprocessSpawnArgs
from hathor.multiprocess.utils import log_connection_closed
from hathor.p2p.peer_endpoint import PeerAddress
Expand All @@ -47,6 +49,7 @@ class ConnectOnSubprocessFactory(ServerFactory, Generic[T]):
__slots__ = (
'reactor',
'_main_file',
'_serialized_logging_args',
'_custom_args',
'_built_protocol_callback'
)
Expand All @@ -56,6 +59,7 @@ def __init__(
*,
reactor: ReactorProtocol,
main_file: Path,
logging_args: tuple[LoggingOutput, LoggingOptions, bool],
custom_args: T,
built_protocol_callback: Callable[[PeerAddress], None] | None,
) -> None:
Expand All @@ -74,6 +78,7 @@ def build_my_factory(reactor: ReactorProtocol) -> Factory:
"""
self.reactor = reactor
self._main_file = main_file
self._serialized_logging_args = pickle.dumps(logging_args).hex()
self._custom_args = custom_args
self._built_protocol_callback = built_protocol_callback

Expand All @@ -86,6 +91,7 @@ def buildProtocol(self, addr: IAddress) -> Protocol | None:
reactor=self.reactor,
main_file=self._main_file,
addr=peer_addr,
logging_args=self._serialized_logging_args,
custom_args=self._custom_args,
)

Expand All @@ -97,20 +103,22 @@ class _ConnectOnSubprocessProtocol(Protocol, Generic[T]):
specified in a `main_file` defined in its factory, above.
"""

__slots__ = ('log', 'reactor', '_main_file', '_addr', '_custom_args')
__slots__ = ('log', 'reactor', '_main_file', '_addr', '_logging_args', '_custom_args')

def __init__(
self,
*,
reactor: ReactorProtocol,
main_file: Path,
addr: PeerAddress,
logging_args: str,
custom_args: T,
) -> None:
self.log = logger.new(addr=addr)
self.reactor = reactor
self._main_file = main_file
self._addr = addr
self._logging_args = logging_args
self._custom_args = custom_args

def makeConnection(self, transport: ITransport) -> None:
Expand Down Expand Up @@ -143,7 +151,7 @@ def makeConnection(self, transport: ITransport) -> None:
subprocess_transport = self.reactor.spawnProcess(
processProtocol=_SubprocessProtocol(addr=self._addr),
executable=sys.executable,
args=[sys.executable, main_file_path, subprocess_args.json_dumpb().hex()],
args=[sys.executable, main_file_path, self._logging_args, subprocess_args.json_dumpb().hex()],
env=os.environ,
path=os.getcwd(),
childFDs={1: 1, 2: 2, fileno: fileno},
Expand Down
Loading

0 comments on commit 09f1d09

Please sign in to comment.