Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python: Move most of the tests to run under a single sandbox #96

Merged
merged 1 commit into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/dazl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _get_version() -> str:


from .damlsdk.sandbox import sandbox # noqa
from .client import run, simple_client, Network, SimplePartyClient, AIOPartyClient # noqa
from .client import run, simple_client, async_network, Network, SimplePartyClient, AIOPartyClient # noqa
from .model.core import ContractId, ContractData, DazlError, Party # noqa
from .model.writing import create, exercise, exercise_by_key, create_and_exercise, \
Command, CreateCommand, ExerciseCommand, ExerciseByKeyCommand, CreateAndExerciseCommand # noqa
Expand Down
3 changes: 2 additions & 1 deletion python/dazl/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"""

from . import config
from .api import simple_client, Network, PartyClient, AIOPartyClient, SimplePartyClient
from .api import async_network, simple_client, Network, PartyClient, AIOPartyClient, \
SimplePartyClient
from .bots import Bot, BotCollection, BotEntry
from .runner import run
from ._base_model import ExitCode, LedgerRun, CREATE_IF_MISSING, NONE_IF_MISSING, \
Expand Down
47 changes: 31 additions & 16 deletions python/dazl/client/_network_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,34 @@ def resolved_config(self) -> 'NetworkConfig':
def resolved_anonymous_config(self) -> 'AnonymousNetworkConfig':
return AnonymousNetworkConfig.parse_kwargs(**self._config)

def freeze(self):
"""
Freeze configuration, and assume the current run loop can be used to schedule dazl
coroutines. Once this method is called, ``aio_run()`` must be called instead of ``start()``
in order to run dazl.
"""
with self._lock:
self.invoker.set_context_as_current()
config = self.resolved_config()

# From this point on, we're assuming we're on an asyncio event loop so locking is no longer
# required
if self._pool is None:
from ..protocols import LedgerConnectionOptions

# If the connect timeout is non-positive, assume the user intended for there to not be
# a timeout at all
connect_timeout = to_timedelta(config.connect_timeout)
if connect_timeout <= timedelta(0):
connect_timeout = None

options = LedgerConnectionOptions(connect_timeout=connect_timeout)

self._pool = pool = AutodetectLedgerNetwork(self.invoker, options)
self._pool_init.set_result(pool)

return config

async def aio_run(self, *coroutines) -> None:
"""
Coroutine where all network activity is scheduled from.
Expand All @@ -96,11 +124,8 @@ async def aio_run(self, *coroutines) -> None:
of the additional coroutines are also finished.
"""
from ..metrics.instrumenters import AioLoopPerfMonitor
from ..protocols import LedgerConnectionOptions

with self._lock:
self.invoker.set_context_as_current()
config = self.resolved_config()
config = self.freeze()

site = None
with AioLoopPerfMonitor(self._metrics.loop_responsiveness):
Expand All @@ -118,19 +143,8 @@ async def aio_run(self, *coroutines) -> None:
LOG.info('No server_port configuration was specified, so metrics and other stats '
'will not be served.')

# If the connect timeout is non-positive, assume the user intended for there to not be
# a timeout at all
connect_timeout = to_timedelta(config.connect_timeout)
if connect_timeout <= timedelta(0):
connect_timeout = None

options = LedgerConnectionOptions(connect_timeout=connect_timeout)

self._pool = pool = AutodetectLedgerNetwork(self.invoker, options)
self._pool_init.set_result(pool)

try:
runner = _NetworkRunner(pool, self, coroutines)
runner = _NetworkRunner(self._pool, self, coroutines)
await runner.run()
finally:
await self._pool.close()
Expand Down Expand Up @@ -323,6 +337,7 @@ async def upload_package(self, contents: bytes, timeout: 'TimeDeltaConvertible')
:param timeout: Length of time before giving up.
"""
pool = await self._pool_init
await self.connect_anonymous()
package_ids = await self.invoker.run_in_executor(lambda: get_dar_package_ids(contents))
await pool.upload_package(contents)
await self.ensure_package_ids(package_ids, timeout)
Expand Down
106 changes: 100 additions & 6 deletions python/dazl/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# for a more concise representation of the various flavors of the API. The unit test
# ``test_api_consistency.py`` verifies that these implementations are generally in sync with each
# other the way that the documentation says they are.
from asyncio import get_event_loop
from asyncio import get_event_loop, ensure_future
from contextlib import contextmanager, ExitStack
from datetime import datetime
from functools import wraps
Expand All @@ -35,7 +35,7 @@
from ..damlsdk.sandbox import sandbox
from ..metrics import MetricEvents
from ..model.core import ContractId, ContractData, ContractsState, ContractMatch, \
ContractContextualData, ContractContextualDataCollection, Party
ContractContextualData, ContractContextualDataCollection, Party, Dar
from ..model.ledger import LedgerMetadata
from ..model.reading import InitEvent, ReadyEvent, ContractCreateEvent, ContractExercisedEvent, \
ContractArchiveEvent, TransactionStartEvent, TransactionEndEvent, PackagesAddedEvent, EventKey
Expand All @@ -49,7 +49,7 @@
fluentize
from ._network_client_impl import _NetworkImpl
from ._party_client_impl import _PartyClientImpl

from ..util.tools import as_list

DEFAULT_TIMEOUT_SECONDS = 30

Expand Down Expand Up @@ -116,6 +116,35 @@ def simple_client(url: 'Optional[str]' = None, party: 'Union[None, str, Party]'
network.join()


# This class is intended to be used as a function.
# noinspection PyPep8Naming
class async_network:
"""
Create a :class:`Network` and ensure that it has the given set of DARs loaded.
"""
def __init__(
self,
url: 'Optional[str]' = None,
dars: 'Optional[Union[Dar, Collection[Dar]]]' = None):
LOG.debug('async_network.__init__')
self.network = Network()
if url:
self.network.set_config(url=url)
self.dars = as_list(dars) # type: List[Dar]

async def __aenter__(self):
LOG.debug('async_network.__aenter__')
for dar in self.dars:
await self.network.aio_global().ensure_dar(dar)
return self.network

async def __aexit__(self, exc_type, exc_val, exc_tb):
LOG.debug('async_network.__aexit__')
fut = self.network.shutdown()
if fut is not None:
await fut


class Network:
"""
Manages network connection/scheduling logic on behalf of one or more :class:`PartyClient`
Expand All @@ -124,6 +153,7 @@ class Network:

def __init__(self, metrics: 'Optional[MetricEvents]' = None):
self._impl = _NetworkImpl(metrics)
self._main_fut = None

def set_config(
self,
Expand Down Expand Up @@ -151,8 +181,13 @@ def simple_global(self) -> 'SimpleGlobalClient':
def aio_global(self) -> 'AIOGlobalClient':
"""
Return a :class:`GlobalClient` that works on an asyncio event loop.

Note that once this object can only be accessed from the asyncio event loop it is intended
to be used on.
"""
return self._impl.global_impl(AIOGlobalClient)
client = self._impl.global_impl(AIOGlobalClient)
self._impl.freeze()
return client

def simple_party(self, party: 'Union[str, Party]') -> 'SimplePartyClient':
"""
Expand All @@ -163,6 +198,13 @@ def simple_party(self, party: 'Union[str, Party]') -> 'SimplePartyClient':
"""
return self._impl.party_impl(Party(party), SimplePartyClient)

def simple_new_party(self) -> 'SimplePartyClient':
"""
Return a :class:`PartyClient` that exposes thread-safe, synchronous (blocking) methods for
communicating with a ledger. Callbacks are dispatched to background threads.
"""
return self.simple_party(str(uuid4()))

def aio_party(self, party: 'Union[str, Party]') -> 'AIOPartyClient':
"""
Return a :class:`PartyClient` that works on an asyncio event loop.
Expand All @@ -171,6 +213,13 @@ def aio_party(self, party: 'Union[str, Party]') -> 'AIOPartyClient':
"""
return self._impl.party_impl(Party(party), AIOPartyClient)

def aio_new_party(self) -> 'AIOPartyClient':
"""
Return a :class:`PartyClient` for a random party that works on an asyncio event loop.
This will never return the same object twice.
"""
return self.aio_party(str(uuid4()))

def party_bots(
self,
party: 'Union[str, Party]',
Expand Down Expand Up @@ -203,14 +252,18 @@ def start_in_background(
self._impl.invoker.install_signal_handlers()
return self._impl.start(daemon)

def shutdown(self) -> None:
def shutdown(self) -> 'Optional[Awaitable[None]]':
"""
Gracefully shut down all network connections and notify all clients that they are about to
be terminated.

The current thread does NOT block.

:return: ``None`` unless ``start()`` was called, in which case the coroutine that
corresponds to dazl's "main" is returned.
"""
return self._impl.shutdown()
self._impl.shutdown()
return self._main_fut

def join(self, timeout: 'Optional[float]' = None) -> None:
"""
Expand All @@ -225,6 +278,12 @@ def join(self, timeout: 'Optional[float]' = None) -> None:

# <editor-fold desc="asyncio-based scheduling API">

def start(self) -> None:
"""
Start the coroutine that spawns callbacks for listeners on event streams.
"""
self._main_fut = ensure_future(self.aio_run(keep_open=False))

def run_until_complete(
self, *coroutines: 'Awaitable[None]',
install_signal_handlers: 'Optional[bool]' = None) \
Expand Down Expand Up @@ -292,6 +351,12 @@ def parties(self) -> 'Collection[Party]':
def bots(self) -> 'Collection[Bot]':
return self._impl.bots()

def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()


class GlobalClient:
"""
Expand Down Expand Up @@ -914,6 +979,20 @@ def set_time(self, new_datetime: datetime) -> 'Awaitable[None]':
"""
return self._impl.set_time(new_datetime)

async def ensure_dar(
self,
contents: 'Union[str, Path, bytes, BinaryIO]',
timeout: 'TimeDeltaConvertible' = DEFAULT_TIMEOUT_SECONDS) -> None:
"""
Validate that the ledger has the packages specified by the given contents (as a byte array).
Throw an exception if the specified DARs do not exist within the specified timeout.

:param contents: The DAR or DALF to ensure.
:param timeout: The maximum length of time to wait before giving up.
"""
raw_bytes = get_bytes(contents)
return await self._impl.parent.upload_package(raw_bytes, timeout)

def ready(self) -> 'Awaitable[None]':
"""
Block until the ledger client has caught up to the current head and is ready to send
Expand Down Expand Up @@ -1461,6 +1540,21 @@ def get_time(self) -> datetime:
def set_time(self, new_datetime: datetime) -> None:
return self._impl.invoker.run_in_loop(lambda: self._impl.set_time(new_datetime))

def ensure_dar(
self,
contents: 'Union[str, Path, bytes, BinaryIO]',
timeout: 'TimeDeltaConvertible' = DEFAULT_TIMEOUT_SECONDS) -> None:
"""
Validate that the ledger has the packages specified by the given contents (as a byte array).
Throw an exception if the specified DARs do not exist within the specified timeout.

:param contents: The DAR or DALF to ensure.
:param timeout: The maximum length of time to wait before giving up.
"""
raw_bytes = get_bytes(contents)
return self._impl.invoker.run_in_loop(
lambda: self._impl.parent.upload_package(raw_bytes, timeout))

def ready(self) -> None:
"""
Block until the underlying infrastructure has connected to all necessary services.
Expand Down
8 changes: 7 additions & 1 deletion python/dazl/model/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
:members:
"""
import warnings
from pathlib import Path

from dataclasses import dataclass
from typing import Any, Callable, Collection, Dict, NewType, Optional, Tuple, TypeVar, \
from typing import Any, BinaryIO, Callable, Collection, Dict, NewType, Optional, Tuple, TypeVar, \
Union, TYPE_CHECKING
from datetime import datetime

Expand Down Expand Up @@ -156,6 +158,10 @@ class ContractContextualData:
active: bool


# Wherever the API expects a DAR, we can take a file path, `bytes`, or a byte buffer.
Dar = Union[bytes, str, Path, BinaryIO]


@dataclass(frozen=True)
class SourceLocation:
file_name: str
Expand Down
3 changes: 2 additions & 1 deletion python/dazl/scheduler/_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def set_context_as_current(self) -> None:
a default executor if one has not yet been set.
"""
self.loop = get_event_loop()
self.executor = ThreadPoolExecutor()
if self.executor is None:
self.executor = ThreadPoolExecutor()

def run_in_loop(self, func, timeout: float = 30.0):
"""
Expand Down
23 changes: 22 additions & 1 deletion python/dazl/util/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
This module contains miscellaneous utility methods that don't really fit anywhere else.
"""

from typing import Generator, Tuple, TypeVar, Iterable
from typing import Collection, Generator, Iterable, List, Tuple, TypeVar, Union

T = TypeVar('T')

Expand Down Expand Up @@ -49,3 +49,24 @@ def flatten(obj):
for sublist in obj:
ret.extend(sublist)
return ret


def as_list(obj: 'Union[None, T, Collection[Union[None, T]]]') -> 'List[T]':
"""
Convert an object that is either nothing, a single object, or a collection, to a list of type
of that object.
"""
from collections.abc import Iterable

if obj is None:
return []
elif isinstance(obj, str):
# strings are iterable, but it's almost never intended to be used in place of a list; if
# we're given a string, then assume what is wanted is a single list containing the full
# string instead of a list containing every character as an individual item
return [obj]
elif isinstance(obj, Iterable):
return [o for o in obj if o is not None]
else:
# assume we're dealing with a single object of the requested type
return [obj]
2 changes: 2 additions & 0 deletions python/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
14 changes: 14 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import logging
import pytest


@pytest.fixture(scope="session")
def sandbox() -> str:
from dazl import sandbox as test_sandbox
with test_sandbox([]) as proc:
logging.info('Shared test sandbox started at %s', proc.url)
yield proc.url
logging.info('The tests are done. Shutting down the sandbox...')
Loading