diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 52f8d889..00000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,25 +0,0 @@ -version: 2 -updates: - - package-ecosystem: pip - directory: "/" - schedule: - interval: weekly - day: monday - reviewers: - - "uxio0" - - - package-ecosystem: docker - directory: "/docker/web" - schedule: - interval: weekly - day: monday - reviewers: - - "uxio0" - - - package-ecosystem: github-actions - directory: "/" - schedule: - interval: weekly - day: monday - reviewers: - - "uxio0" diff --git a/README.md b/README.md index e4902c85..b5b5bea9 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ a transaction that is pending to be sent to the blockchain. ## Index of contents -- [Docs](https://docs.gnosis-safe.io/backend/service-architecture) +- [Docs](https://docs.safe.global/safe-core-api/service-architecture) - [Deploying the service](https://github.com/safe-global/safe-infrastructure) ## Setup for development @@ -178,16 +178,16 @@ are deleted and indexing is restarted to the last `confirmed` block. ### If I add my chain to [safe-eth-py](https://github.com/safe-global/safe-eth-py/blob/master/gnosis/safe/addresses.py) will you support it? No, for a chain to be supported we need to set up a dedicated infra for that network -and [have a proper RPC](https://docs.safe.global/learn/infrastructure/rpc-requirements) +and [have a proper RPC](https://docs.safe.global/safe-core-api/rpc-requirements) ### How can I interact with service? Aside from using standard HTTP requests: -- [Safe API Kit](https://github.com/safe-global/safe-core-sdk/tree/main/packages/safe-service-client) +- [Safe{Core} API Kit](https://github.com/safe-global/safe-core-sdk/tree/main/packages/api-kit) - [Safe-eth-py](https://github.com/safe-global/safe-eth-py) - [Safe CLI](https://github.com/5afe/safe-cli): It has a `tx-service` mode to gather offchain signatures. ### What chains do you officially support? -https://docs.safe.global/learn/safe-core/safe-core-api/available-services +https://docs.safe.global/safe-core-api/available-services ### What means banned field in SafeContract model? The `banned` field in the `SafeContract` model is used to prevent indexing of certain Safes that have an unsupported `MasterCopy` or unverified proxies that have issues during indexing. This field does not remove the banned Safe and indexing can be resumed once the issue has been resolved. diff --git a/config/gunicorn.py b/config/gunicorn.py new file mode 100644 index 00000000..247ed43e --- /dev/null +++ b/config/gunicorn.py @@ -0,0 +1,8 @@ +""" +Store gunicorn variables in this file, so they can be read by Django +""" +import os + +gunicorn_request_timeout = os.environ.get("WEB_WORKER_TIMEOUT", 60) +gunicorn_worker_connections = os.environ.get("WEB_WORKER_CONNECTIONS", 1000) +gunicorn_workers = os.environ.get("WEB_CONCURRENCY", 2) diff --git a/config/settings/base.py b/config/settings/base.py index 4a316517..39e8a976 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -7,6 +7,12 @@ import environ from corsheaders.defaults import default_headers as default_cors_headers +from ..gunicorn import ( + gunicorn_request_timeout, + gunicorn_worker_connections, + gunicorn_workers, +) + ROOT_DIR = Path(__file__).resolve(strict=True).parent.parent.parent APPS_DIR = ROOT_DIR / "safe_transaction_service" @@ -47,6 +53,11 @@ # Enable analytics endpoints ENABLE_ANALYTICS = env("ENABLE_ANALYTICS", default=False) +# GUNICORN +GUNICORN_REQUEST_TIMEOUT = gunicorn_request_timeout +GUNICORN_WORKER_CONNECTIONS = gunicorn_worker_connections +GUNICORN_WORKERS = gunicorn_workers + # DATABASES # ------------------------------------------------------------------------------ # https://docs.djangoproject.com/en/dev/ref/settings/#databases @@ -212,8 +223,18 @@ CELERY_BROKER_URL = env("CELERY_BROKER_URL", default="django://") # https://docs.celeryproject.org/en/stable/userguide/optimizing.html#broker-connection-pools # https://docs.celeryq.dev/en/latest/userguide/optimizing.html#broker-connection-pools -CELERY_BROKER_POOL_LIMIT = env( - "CELERY_BROKER_POOL_LIMIT", default=env("CELERYD_CONCURRENCY", default=1000) +# Configured to 0 due to connection issues https://github.com/celery/celery/issues/4355 +CELERY_BROKER_POOL_LIMIT = env.int("CELERY_BROKER_POOL_LIMIT", default=0) +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#broker-heartbeat +CELERY_BROKER_HEARTBEAT = env.int("CELERY_BROKER_HEARTBEAT", default=0) + +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-broker_connection_max_retries +CELERY_BROKER_CONNECTION_MAX_RETRIES = env.int( + "CELERY_BROKER_CONNECTION_MAX_RETRIES", default=0 +) +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#broker-channel-error-retry +CELERY_BROKER_CHANNEL_ERROR_RETRY = env.bool( + "CELERY_BROKER_CHANNEL_ERROR_RETRY", default=True ) # http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-result_backend CELERY_RESULT_BACKEND = env("CELERY_RESULT_BACKEND", default="redis://") @@ -234,6 +255,7 @@ CELERY_TASK_QUEUE_MAX_PRIORITY = 10 # https://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-transport-options CELERY_BROKER_TRANSPORT_OPTIONS = {} + # https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_routes CELERY_ROUTES = ( [ @@ -453,6 +475,9 @@ ETH_EVENTS_UPDATED_BLOCK_BEHIND = env.int( "ETH_EVENTS_UPDATED_BLOCK_BEHIND", default=24 * 60 * 60 // 15 ) # Number of blocks to consider an address 'almost updated'. +ETH_REORG_BLOCKS_BATCH = env.int( + "ETH_REORG_BLOCKS_BATCH", default=250 +) # Number of blocks to be checked in the same batch for reorgs ETH_REORG_BLOCKS = env.int( "ETH_REORG_BLOCKS", default=200 if ETH_L2_NETWORK else 10 ) # Number of blocks from the current block number needed to consider a block valid/stable @@ -502,6 +527,11 @@ EVENTS_QUEUE_ASYNC_CONNECTION = env("EVENTS_QUEUE_ASYNC_CONNECTION", default=False) EVENTS_QUEUE_EXCHANGE_NAME = env("EVENTS_QUEUE_EXCHANGE_NAME", default="amq.fanout") +# Cache +CACHE_ALL_TXS_VIEW = env.int( + "CACHE_ALL_TXS_VIEW", default=10 * 60 +) # 10 minutes. 0 is disabled + # AWS S3 https://github.com/etianen/django-s3-storage # ------------------------------------------------------------------------------ # AWS_QUERYSTRING_AUTH = False # Remove query parameter authentication from generated URLs diff --git a/docker/web/celery/scheduler/run.sh b/docker/web/celery/scheduler/run.sh index dcc85339..cbb5a472 100755 --- a/docker/web/celery/scheduler/run.sh +++ b/docker/web/celery/scheduler/run.sh @@ -13,4 +13,6 @@ fi sleep 10 echo "==> $(date +%H:%M:%S) ==> Running Celery beat <==" -exec celery -C -A config.celery_app beat -S django_celery_beat.schedulers:DatabaseScheduler --loglevel $log_level +exec celery -C -A config.celery_app beat \ + -S django_celery_beat.schedulers:DatabaseScheduler \ + --loglevel $log_level \ No newline at end of file diff --git a/docker/web/celery/worker/run.sh b/docker/web/celery/worker/run.sh index 714d67be..5e5cffe8 100755 --- a/docker/web/celery/worker/run.sh +++ b/docker/web/celery/worker/run.sh @@ -33,4 +33,5 @@ exec celery -C -A config.celery_app worker \ --concurrency=${TASK_CONCURRENCY} \ --max-memory-per-child=${MAX_MEMORY_PER_CHILD} \ --max-tasks-per-child=${MAX_TASKS_PER_CHILD} \ - -Q "$WORKER_QUEUES" + --without-heartbeat --without-gossip \ + --without-mingle -Q "$WORKER_QUEUES" diff --git a/gunicorn.conf.py b/gunicorn.conf.py index bc2b2e05..583e227b 100644 --- a/gunicorn.conf.py +++ b/gunicorn.conf.py @@ -1,3 +1,9 @@ +from config.gunicorn import ( + gunicorn_request_timeout, + gunicorn_worker_connections, + gunicorn_workers, +) + access_logfile = "-" error_logfile = "-" max_requests = 20_000 # Restart a worker after it has processed a given number of requests (for memory leaks) @@ -10,19 +16,9 @@ log_level = "info" logger_class = "safe_transaction_service.utils.loggers.CustomGunicornLogger" preload_app = False # Load application code before the worker processes are forked (problems with gevent patching) -timeout = ( - 60 # Worker will be restarted if it doesn't answer in more than configured seconds -) -worker_class = "gevent" -worker_connections = 2000 - - -def post_fork(server, worker): - try: - from psycogreen.gevent import patch_psycopg +# For timeout to work with gevent, a custom GeventWorker needs to be used +timeout = gunicorn_request_timeout - worker.log.info("Making Psycopg2 Green") - patch_psycopg() - worker.log.info("Made Psycopg2 Green") - except ImportError: - worker.log.info("Psycopg2 not patched") +worker_class = "gunicorn_custom_workers.MyGeventWorker" # "gevent" +worker_connections = gunicorn_worker_connections +workers = gunicorn_workers diff --git a/gunicorn_custom_workers.py b/gunicorn_custom_workers.py new file mode 100644 index 00000000..11d3c529 --- /dev/null +++ b/gunicorn_custom_workers.py @@ -0,0 +1,21 @@ +import gevent +from gunicorn.workers.ggevent import GeventWorker +from psycogreen.gevent import patch_psycopg + + +class MyGeventWorker(GeventWorker): + def patch_psycopg2(self): + patch_psycopg() + self.log.info("Patched Psycopg2 for gevent") + + def patch(self): + super().patch() + self.log.info("Patched all for gevent") + self.patch_psycopg2() + + def handle_request(self, listener_name, req, sock, addr): + try: + with gevent.Timeout(self.cfg.timeout): + super().handle_request(listener_name, req, sock, addr) + except gevent.Timeout: + self.log.error("TimeoutError on %s", req.path) diff --git a/requirements-test.txt b/requirements-test.txt index 66100af6..65fb2f8a 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,13 +1,13 @@ -r requirements.txt -coverage==7.2.7 -django-stubs==4.2.1 +coverage==7.3.1 +django-stubs==4.2.4 django-test-migrations==1.3.0 -factory-boy==3.2.1 -faker==18.10.1 -mypy==1.0.1 -pytest==7.4.0 +factory-boy==3.3.0 +faker==19.6.1 +mypy==1.5.1 +pytest==7.4.2 pytest-celery==0.0.0 pytest-django==4.5.2 -pytest-env==0.8.2 -pytest-rerunfailures==11.1.2 +pytest-env==1.0.1 +pytest-rerunfailures==12.0 pytest-sugar==0.9.7 diff --git a/requirements.txt b/requirements.txt index 05b5b3de..814abcc2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,37 +1,37 @@ -boto3==1.26.151 +boto3==1.28.44 cachetools==5.3.1 -celery==5.2.7 -django==4.2.2 -django-cache-memoize==0.1.10 +celery==5.3.4 +django==4.2.4 +django-cache-memoize==0.2.0 django-celery-beat==2.5.0 -django-cors-headers==4.0.0 +django-cors-headers==4.2.0 django-db-geventpool==4.0.1 django-debug-toolbar django-debug-toolbar-force -django-environ==0.10.0 +django-environ==0.11.2 django-extensions==3.2.3 -django-filter==23.2 +django-filter==23.3 django-imagekit==4.1.0 django-model-utils==4.3.1 -django-redis==5.2.0 +django-redis==5.3.0 django-s3-storage==0.14.0 -django-timezone-field==5.1 +django-timezone-field==6.0.1 djangorestframework==3.14.0 djangorestframework-camel-case==1.4.2 docutils==0.20.1 -drf-yasg[validation]==1.21.5 +drf-yasg[validation]==1.21.7 firebase-admin==6.2.0 -flower==1.2.0 -gunicorn[gevent]==20.1.0 -hexbytes==0.2.3 +flower==2.0.1 +gunicorn[gevent]==21.2.0 +hexbytes==0.3.1 hiredis==2.2.3 packaging>=21.0 pika==1.3.2 -pillow==9.5.0 +pillow==10.0.1 psycogreen==1.0.2 -psycopg2==2.9.6 -redis==4.5.5 +psycopg2==2.9.7 +redis==5.0.0 requests==2.31.0 git+https://github.com/protofire/safe-eth-py.git@rsk#egg=safe-eth-py -#safe-eth-py[django]==5.5.0 -web3==6.5.0 +#safe-eth-py[django]==5.8.0 +web3==6.9.0 diff --git a/safe_transaction_service/__init__.py b/safe_transaction_service/__init__.py index a4513dbd..4e9c2a21 100644 --- a/safe_transaction_service/__init__.py +++ b/safe_transaction_service/__init__.py @@ -1,4 +1,4 @@ -__version__ = "4.21.2" +__version__ = "4.26.0" __version_info__ = tuple( int(num) if num.isdigit() else num for num in __version__.replace("-", ".", 1).split(".") diff --git a/safe_transaction_service/contracts/tx_decoder.py b/safe_transaction_service/contracts/tx_decoder.py index 779460d2..0b7cebc7 100644 --- a/safe_transaction_service/contracts/tx_decoder.py +++ b/safe_transaction_service/contracts/tx_decoder.py @@ -119,6 +119,14 @@ class MultisendDecoded(TypedDict): @cache def get_db_tx_decoder() -> "DbTxDecoder": + """ + :return: Tx decoder with every ABI in the database loaded and indexed by function opcode + .. note:: + Be careful when calling this function in a concurrent way, as if cache is not generated it will compute + the ``DbTxDecoder`` multiple times, and depending on the number of Contracts in the database it could + take a lot. + """ + def _get_db_tx_decoder() -> "DbTxDecoder": return DbTxDecoder() diff --git a/safe_transaction_service/history/apps.py b/safe_transaction_service/history/apps.py index 73d67256..cd00efb0 100644 --- a/safe_transaction_service/history/apps.py +++ b/safe_transaction_service/history/apps.py @@ -1,3 +1,5 @@ +import sys + from django.apps import AppConfig @@ -7,3 +9,14 @@ class HistoryConfig(AppConfig): def ready(self): from . import signals # noqa + + for argument in sys.argv: + if "gunicorn" in argument: # pragma: no cover + # Just run this on production + # TODO Find a better way + from safe_transaction_service.contracts.tx_decoder import ( + get_db_tx_decoder, + ) + + get_db_tx_decoder() # Build tx decoder cache + break diff --git a/safe_transaction_service/history/helpers.py b/safe_transaction_service/history/helpers.py index 770f8b1a..a1709722 100644 --- a/safe_transaction_service/history/helpers.py +++ b/safe_transaction_service/history/helpers.py @@ -62,7 +62,7 @@ def is_valid_unique_transfer_id(unique_transfer_id: str) -> bool: :return: ``True`` for a valid ``unique_transfer_id``, ``False`` otherwise """ token_transfer_id_pattern = r"^(e)([a-fA-F0-9]{64})(\d+)" - internal_transfer_id_pattern = r"^(i)([a-fA-F0-9]{64})(\d+)(,\d+)*" + internal_transfer_id_pattern = r"^(i)([a-fA-F0-9]{64})(\d*)(,\d+)*" return bool( re.fullmatch(token_transfer_id_pattern, unique_transfer_id) diff --git a/safe_transaction_service/history/indexers/erc20_events_indexer.py b/safe_transaction_service/history/indexers/erc20_events_indexer.py index c93f590c..be6606b9 100644 --- a/safe_transaction_service/history/indexers/erc20_events_indexer.py +++ b/safe_transaction_service/history/indexers/erc20_events_indexer.py @@ -27,13 +27,15 @@ class Erc20EventsIndexerProvider: def __new__(cls): if not hasattr(cls, "instance"): - from django.conf import settings - - cls.instance = Erc20EventsIndexer( - EthereumClient(settings.ETHEREUM_NODE_URL) - ) + cls.instance = cls.get_new_instance() return cls.instance + @classmethod + def get_new_instance(cls) -> "Erc20EventsIndexer": + from django.conf import settings + + return Erc20EventsIndexer(EthereumClient(settings.ETHEREUM_NODE_URL)) + @classmethod def del_singleton(cls): if hasattr(cls, "instance"): diff --git a/safe_transaction_service/history/indexers/ethereum_indexer.py b/safe_transaction_service/history/indexers/ethereum_indexer.py index 72349ad0..583860e7 100644 --- a/safe_transaction_service/history/indexers/ethereum_indexer.py +++ b/safe_transaction_service/history/indexers/ethereum_indexer.py @@ -7,6 +7,7 @@ from django.db.models import Min, QuerySet from celery.exceptions import SoftTimeLimitExceeded +from requests import Timeout from gnosis.eth import EthereumClient @@ -393,7 +394,13 @@ def process_addresses( to_block_number, current_block_number=current_block_number, ) - except (FindRelevantElementsException, SoftTimeLimitExceeded) as e: + processed_elements = self.process_elements(elements) + except ( + FindRelevantElementsException, + SoftTimeLimitExceeded, + Timeout, + ValueError, + ) as e: self.block_process_limit = 1 # Set back to the very minimum logger.info( "%s: block_process_limit set back to %d", @@ -402,8 +409,6 @@ def process_addresses( ) raise e - processed_elements = self.process_elements(elements) - if not self.update_monitored_addresses( addresses, from_block_number, to_block_number ): diff --git a/safe_transaction_service/history/indexers/internal_tx_indexer.py b/safe_transaction_service/history/indexers/internal_tx_indexer.py index dabd4ef7..f75f79a6 100644 --- a/safe_transaction_service/history/indexers/internal_tx_indexer.py +++ b/safe_transaction_service/history/indexers/internal_tx_indexer.py @@ -27,17 +27,21 @@ class InternalTxIndexerProvider: def __new__(cls): if not hasattr(cls, "instance"): - from django.conf import settings + cls.instance = cls.get_new_instance() + return cls.instance - if settings.ETH_INTERNAL_NO_FILTER: - instance_class = InternalTxIndexerWithTraceBlock - else: - instance_class = InternalTxIndexer + @classmethod + def get_new_instance(cls) -> "InternalTxIndexer": + from django.conf import settings - cls.instance = instance_class( - EthereumClient(settings.ETHEREUM_TRACING_NODE_URL), - ) - return cls.instance + if settings.ETH_INTERNAL_NO_FILTER: + instance_class = InternalTxIndexerWithTraceBlock + else: + instance_class = InternalTxIndexer + + return instance_class( + EthereumClient(settings.ETHEREUM_TRACING_NODE_URL), + ) @classmethod def del_singleton(cls): diff --git a/safe_transaction_service/history/indexers/proxy_factory_indexer.py b/safe_transaction_service/history/indexers/proxy_factory_indexer.py index 17f04fdb..3446c34d 100644 --- a/safe_transaction_service/history/indexers/proxy_factory_indexer.py +++ b/safe_transaction_service/history/indexers/proxy_factory_indexer.py @@ -21,14 +21,16 @@ class ProxyFactoryIndexerProvider: def __new__(cls): if not hasattr(cls, "instance"): - from django.conf import settings - - cls.instance = ProxyFactoryIndexer( - EthereumClient(settings.ETHEREUM_NODE_URL) - ) + cls.instance = cls.get_new_instance() return cls.instance + @classmethod + def get_new_instance(cls) -> "ProxyFactoryIndexer": + from django.conf import settings + + return ProxyFactoryIndexer(EthereumClient(settings.ETHEREUM_NODE_URL)) + @classmethod def del_singleton(cls): if hasattr(cls, "instance"): diff --git a/safe_transaction_service/history/indexers/safe_events_indexer.py b/safe_transaction_service/history/indexers/safe_events_indexer.py index 20f482c3..d51731e5 100644 --- a/safe_transaction_service/history/indexers/safe_events_indexer.py +++ b/safe_transaction_service/history/indexers/safe_events_indexer.py @@ -31,11 +31,15 @@ class SafeEventsIndexerProvider: def __new__(cls): if not hasattr(cls, "instance"): - from django.conf import settings - - cls.instance = SafeEventsIndexer(EthereumClient(settings.ETHEREUM_NODE_URL)) + cls.instance = cls.get_new_instance() return cls.instance + @classmethod + def get_new_instance(cls) -> "SafeEventsIndexer": + from django.conf import settings + + return SafeEventsIndexer(EthereumClient(settings.ETHEREUM_NODE_URL)) + @classmethod def del_singleton(cls): if hasattr(cls, "instance"): @@ -227,7 +231,7 @@ def _process_decoded_element( trace_address=trace_address, error=None, ) - child_internal_tx = None # For Ether transfers + child_internal_tx: Optional[InternalTx] = None # For Ether transfers internal_tx_decoded = InternalTxDecoded( internal_tx=internal_tx, function_name="", diff --git a/safe_transaction_service/history/migrations/0051_ethereum_address_field_v2_update.py b/safe_transaction_service/history/migrations/0051_ethereum_address_field_v2_update.py index 02260cbe..8854bc92 100644 --- a/safe_transaction_service/history/migrations/0051_ethereum_address_field_v2_update.py +++ b/safe_transaction_service/history/migrations/0051_ethereum_address_field_v2_update.py @@ -1,10 +1,7 @@ # Generated by Django 3.2.9 on 2021-12-01 15:07 -import django.contrib.postgres.fields from django.db import migrations -import gnosis.eth.django.models - class Migration(migrations.Migration): diff --git a/safe_transaction_service/history/migrations/0074_internaltx_history_internal_transfer_idx.py b/safe_transaction_service/history/migrations/0074_internaltx_history_internal_transfer_idx.py new file mode 100644 index 00000000..e3cb1a48 --- /dev/null +++ b/safe_transaction_service/history/migrations/0074_internaltx_history_internal_transfer_idx.py @@ -0,0 +1,21 @@ +# Generated by Django 4.2.3 on 2023-08-08 09:59 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("history", "0073_safe_apps_links"), + ] + + operations = [ + migrations.AddIndex( + model_name="internaltx", + index=models.Index( + condition=models.Q(("call_type", 0), ("value__gt", 0)), + fields=["to", "timestamp"], + include=("ethereum_tx_id", "block_number"), + name="history_internal_transfer_idx", + ), + ), + ] diff --git a/safe_transaction_service/history/models.py b/safe_transaction_service/history/models.py index 73d53d41..3ff15a5f 100644 --- a/safe_transaction_service/history/models.py +++ b/safe_transaction_service/history/models.py @@ -272,16 +272,20 @@ def oldest_than(self, seconds: int): timestamp__lte=timezone.now() - datetime.timedelta(seconds=seconds) ).order_by("-timestamp") - def not_confirmed(self, to_block_number: Optional[int] = None): + def not_confirmed(self): """ :param to_block_number: :return: Block not confirmed until ``to_block_number``, if provided """ queryset = self.filter(confirmed=False) - if to_block_number is not None: - queryset = queryset.filter(number__lte=to_block_number) return queryset + def since_block(self, block_number: int): + return self.filter(number__gte=block_number) + + def until_block(self, block_number: int): + return self.filter(number__lte=block_number) + class EthereumBlock(models.Model): objects = EthereumBlockManager.from_queryset(EthereumBlockQuerySet)() @@ -962,6 +966,13 @@ class Meta: ), Index(fields=["_from", "timestamp"]), Index(fields=["to", "timestamp"]), + # Speed up getting ether transfers in all-transactions and ether transfer count + Index( + name="history_internal_transfer_idx", + fields=["to", "timestamp"], + include=["ethereum_tx_id", "block_number"], + condition=Q(call_type=0) & Q(value__gt=0), + ), ] def __str__(self): @@ -1306,7 +1317,10 @@ def with_confirmations_required(self): :return: queryset with `confirmations_required: int` field """ threshold_safe_status_query = ( - SafeStatus.objects.filter(internal_tx__ethereum_tx=OuterRef("ethereum_tx")) + SafeStatus.objects.filter( + address=OuterRef("safe"), + internal_tx__ethereum_tx=OuterRef("ethereum_tx"), + ) .sorted_reverse_by_mined() .values("threshold") ) @@ -1603,6 +1617,70 @@ class SafeContractManager(models.Manager): def get_banned_safes(self) -> QuerySet[ChecksumAddress]: return self.filter(banned=True).values_list("address", flat=True) + def get_count_relevant_txs_for_safe(self, address: ChecksumAddress) -> int: + """ + This method searches multiple tables and count every tx or event for a Safe. + It will return the same or higher value if compared to counting ``get_all_tx_identifiers`` + as that method will group some transactions (for example, 3 ERC20 can be grouped in a ``MultisigTransaction``, + so it will be ``1`` element for ``get_all_tx_identifiers`` but ``4`` for this function. + + This query should be pretty fast, and it's meant to be used for invalidating caches. + + :param address: + :return: number of relevant txs for a Safe + """ + + query = """ + SELECT SUM(count_all) + FROM ( + -- Get multisig transactions + SELECT COUNT(*) AS count_all + FROM "history_multisigtransaction" + WHERE "history_multisigtransaction"."safe" = %s + UNION ALL + -- Get confirmations + SELECT COUNT(*) + FROM "history_multisigtransaction" + JOIN "history_multisigconfirmation" ON "history_multisigtransaction"."safe_tx_hash" = "history_multisigconfirmation"."multisig_transaction_id" + WHERE "history_multisigtransaction"."safe" = %s + UNION ALL + -- Get ERC20 Transfers + SELECT COUNT(*) + FROM "history_erc20transfer" + WHERE ( + "history_erc20transfer"."to" = %s + OR "history_erc20transfer"."_from" = %s + ) + UNION ALL + -- Get ERC721 Transfers + SELECT COUNT(*) + FROM "history_erc721transfer" + WHERE ( + "history_erc721transfer"."to" = %s + OR "history_erc721transfer"."_from" = %s + ) + UNION ALL + -- Get Ether Transfers + SELECT COUNT(*) + FROM "history_internaltx" + WHERE ( + "history_internaltx"."call_type" = 0 + AND "history_internaltx"."to" = %s + AND "history_internaltx"."value" > 0 + ) + UNION ALL + -- Get Module Transactions + SELECT COUNT(*) + FROM "history_moduletransaction" + WHERE "history_moduletransaction"."safe" = %s + ) subquery + """ + + with connection.cursor() as cursor: + hex_address = HexBytes(address) + cursor.execute(query, [hex_address] * 8) + return cursor.fetchone()[0] + class SafeContract(models.Model): objects = SafeContractManager() @@ -1927,6 +2005,8 @@ class WebHookType(Enum): MODULE_TRANSACTION = 7 OUTGOING_ETHER = 8 OUTGOING_TOKEN = 9 + MESSAGE_CREATED = 10 + MESSAGE_CONFIRMATION = 11 class WebHookQuerySet(models.QuerySet): diff --git a/safe_transaction_service/history/serializers.py b/safe_transaction_service/history/serializers.py index 7133944d..7c88d237 100644 --- a/safe_transaction_service/history/serializers.py +++ b/safe_transaction_service/history/serializers.py @@ -474,7 +474,10 @@ class SafeModuleTransactionResponseSerializer(GnosisBaseModelSerializer): transaction_hash = serializers.SerializerMethodField() block_number = serializers.SerializerMethodField() is_successful = serializers.SerializerMethodField() - module_transaction_id = serializers.SerializerMethodField() + module_transaction_id = serializers.SerializerMethodField( + help_text="Internally calculated parameter to uniquely identify a moduleTransaction \n" + "`ModuleTransactionId = i+tx_hash+trace_address`" + ) class Meta: model = ModuleTransaction @@ -736,7 +739,11 @@ class TransferResponseSerializer(serializers.Serializer): value = serializers.CharField(allow_null=True, source="_value") token_id = serializers.CharField(allow_null=True, source="_token_id") token_address = EthereumAddressField(allow_null=True, default=None) - transfer_id = serializers.SerializerMethodField() + transfer_id = serializers.SerializerMethodField( + help_text="Internally calculated parameter to uniquely identify a transfer \n" + "Token transfers are calculated as `transferId = e+tx_hash+log_index` \n" + "Ether transfers are calculated as `transferId = i+tx_hash+trace_address`" + ) def get_fields(self): result = super().get_fields() diff --git a/safe_transaction_service/history/services/index_service.py b/safe_transaction_service/history/services/index_service.py index 80ebaba1..31d2119d 100644 --- a/safe_transaction_service/history/services/index_service.py +++ b/safe_transaction_service/history/services/index_service.py @@ -375,7 +375,7 @@ def _reindex( addresses: Optional[ChecksumAddress] = None, ) -> int: """ - :param provider: + :param indexer: A new instance must be provider, providing the singleton one can break indexing :param from_block_number: :param to_block_number: :param block_process_limit: @@ -384,14 +384,9 @@ def _reindex( """ assert (not to_block_number) or to_block_number > from_block_number - ignore_addresses_on_log_filter = ( - indexer.IGNORE_ADDRESSES_ON_LOG_FILTER - if hasattr(indexer, "IGNORE_ADDRESSES_ON_LOG_FILTER") - else None - ) - if addresses: # Just process addresses provided + # No issues on modifying the indexer as we should be provided with a new instance indexer.IGNORE_ADDRESSES_ON_LOG_FILTER = False else: addresses = list( @@ -429,10 +424,8 @@ def _reindex( ) element_number += len(elements) - logger.info("End reindexing addresses %s", addresses) + logger.info("End reindexing addresses %s", addresses_str) - # We changed attributes on the indexer, so better restore it - indexer.IGNORE_ADDRESSES_ON_LOG_FILTER = ignore_addresses_on_log_filter return element_number def reindex_master_copies( @@ -457,10 +450,10 @@ def reindex_master_copies( from ..indexers import InternalTxIndexerProvider, SafeEventsIndexerProvider indexer = ( - SafeEventsIndexerProvider + SafeEventsIndexerProvider.get_new_instance() if self.eth_l2_network - else InternalTxIndexerProvider - )() + else InternalTxIndexerProvider.get_new_instance() + ) return self._reindex( indexer, @@ -490,7 +483,7 @@ def reindex_erc20_events( from ..indexers import Erc20EventsIndexerProvider - indexer = Erc20EventsIndexerProvider() + indexer = Erc20EventsIndexerProvider.get_new_instance() return self._reindex( indexer, from_block_number, diff --git a/safe_transaction_service/history/services/reorg_service.py b/safe_transaction_service/history/services/reorg_service.py index bd993bc0..225883ff 100644 --- a/safe_transaction_service/history/services/reorg_service.py +++ b/safe_transaction_service/history/services/reorg_service.py @@ -1,6 +1,7 @@ import logging from typing import Callable, List, Optional +from django.core.paginator import Paginator from django.db import transaction from hexbytes import HexBytes @@ -24,7 +25,9 @@ def __new__(cls): from django.conf import settings cls.instance = ReorgService( - EthereumClientProvider(), settings.ETH_REORG_BLOCKS + EthereumClientProvider(), + settings.ETH_REORG_BLOCKS, + settings.ETH_REORG_BLOCKS_BATCH, ) return cls.instance @@ -40,6 +43,7 @@ def __init__( self, ethereum_client: EthereumClient, eth_reorg_blocks: int, + eth_reorg_blocks_batch: int, eth_reorg_rewind_blocks: Optional[int] = 10, ): """ @@ -50,6 +54,7 @@ def __init__( """ self.ethereum_client = ethereum_client self.eth_reorg_blocks = eth_reorg_blocks + self.eth_reorg_blocks_batch = eth_reorg_blocks_batch self.eth_reorg_rewind_blocks = eth_reorg_rewind_blocks # List with functions for database models to recover from reorgs @@ -77,37 +82,52 @@ def check_reorgs(self) -> Optional[int]: """ :return: Number of the oldest block with reorg detected. `None` if not reorg found """ + first_not_confirmed_block = ( + EthereumBlock.objects.not_confirmed().order_by("number").first() + ) + if not first_not_confirmed_block: + return None current_block_number = self.ethereum_client.current_block_number - to_block = current_block_number - self.eth_reorg_blocks - for database_block in ( - EthereumBlock.objects.not_confirmed(to_block_number=to_block) + confirmation_block = current_block_number - self.eth_reorg_blocks + queryset = ( + EthereumBlock.objects.since_block(first_not_confirmed_block.number) .only("number", "block_hash", "confirmed") .order_by("number") - .iterator() - ): - blockchain_block, blockchain_next_block = self.ethereum_client.get_blocks( - [database_block.number, database_block.number + 1], - full_transactions=False, + ) + paginator = Paginator(queryset, per_page=self.eth_reorg_blocks_batch) + for page_number in paginator.page_range: + current_page = paginator.get_page(page_number) + database_blocks = [] + block_numbers = [] + for block in current_page.object_list: + database_blocks.append(block) + block_numbers.append(block.number) + blockchain_blocks = self.ethereum_client.get_blocks( + block_numbers, full_transactions=False ) - if ( - HexBytes(blockchain_block["hash"]) - == HexBytes(blockchain_next_block["parentHash"]) - == HexBytes(database_block.block_hash) + + for database_block, blockchain_block in zip( + database_blocks, blockchain_blocks ): - logger.debug( - "Block with number=%d and hash=%s is matching blockchain one, setting as confirmed", - database_block.number, - HexBytes(blockchain_block["hash"]).hex(), - ) - database_block.set_confirmed() - else: - logger.warning( - "Block with number=%d and hash=%s is not matching blockchain hash=%s, reorg found", - database_block.number, - HexBytes(database_block.block_hash).hex(), - HexBytes(blockchain_block["hash"]).hex(), - ) - return database_block.number + if HexBytes(blockchain_block["hash"]) == HexBytes( + database_block.block_hash + ): + # Check all the blocks but only mark safe ones as confirmed + if database_block.number <= confirmation_block: + logger.debug( + "Block with number=%d and hash=%s is matching blockchain one, setting as confirmed", + database_block.number, + HexBytes(blockchain_block["hash"]).hex(), + ) + database_block.set_confirmed() + else: + logger.warning( + "Block with number=%d and hash=%s is not matching blockchain hash=%s, reorg found", + database_block.number, + HexBytes(database_block.block_hash).hex(), + HexBytes(blockchain_block["hash"]).hex(), + ) + return database_block.number @transaction.atomic def reset_all_to_block(self, block_number: int) -> int: diff --git a/safe_transaction_service/history/services/safe_service.py b/safe_transaction_service/history/services/safe_service.py index 763eb173..1d469896 100644 --- a/safe_transaction_service/history/services/safe_service.py +++ b/safe_transaction_service/history/services/safe_service.py @@ -81,7 +81,12 @@ def __init__( self.cpk_proxy_factory_contract = get_cpk_factory_contract(dummy_w3) def get_safe_creation_info(self, safe_address: str) -> Optional[SafeCreationInfo]: + """ + :param safe_address: + :return: SafeCreation info for the provided ``safe_address`` + """ try: + # Get first the actual creation transaction for the safe creation_internal_tx = ( InternalTx.objects.filter( ethereum_tx__status=1 # Ignore Internal Transactions for failed Transactions @@ -93,6 +98,8 @@ def get_safe_creation_info(self, safe_address: str) -> Optional[SafeCreationInfo created_time = creation_ethereum_tx.block.timestamp + # Get the parent trace for the creation + # For L2s, `ProxyCreation` event is used to emulate the trace parent_internal_tx = self._get_parent_internal_tx(creation_internal_tx) creator = (parent_internal_tx or creation_ethereum_tx)._from @@ -100,16 +107,17 @@ def get_safe_creation_info(self, safe_address: str) -> Optional[SafeCreationInfo master_copy: Optional[str] = None setup_data: Optional[bytes] = None - data = ( - bytes(parent_internal_tx.data) - if parent_internal_tx - else bytes(creation_ethereum_tx.data) - ) - result = self._decode_proxy_factory(data) or self._decode_cpk_proxy_factory( - data - ) - if result: - master_copy, setup_data = result + data_tx = parent_internal_tx if parent_internal_tx else creation_ethereum_tx + + # A regular ether transfer could trigger a Safe deployment, so it's not guaranteed that there will be + # ``data`` for the transaction + if data_tx.data: + data = bytes(data_tx.data) + result = self._decode_proxy_factory( + data + ) or self._decode_cpk_proxy_factory(data) + if result: + master_copy, setup_data = result if not (master_copy and setup_data): if setup_internal_tx := self._get_next_internal_tx( creation_internal_tx @@ -177,6 +185,14 @@ def get_safe_info_from_db(self, safe_address: ChecksumAddress) -> SafeInfo: def _decode_proxy_factory( self, data: Union[bytes, str] ) -> Optional[Tuple[str, bytes]]: + """ + Decode contract creation function for Safe ProxyFactory deployments + + :param data: + :return: Tuple with the `master_copy` and `setup_data`, `None` if it cannot be decoded + """ + if not data: + return None try: _, data_decoded = self.proxy_factory_contract.decode_function_input(data) master_copy = ( @@ -199,6 +215,17 @@ def _decode_proxy_factory( def _decode_cpk_proxy_factory( self, data: Union[bytes, str] ) -> Optional[Tuple[str, bytes]]: + """ + Decode contract creation function for Safe Contract Proxy Kit Safe deployments (function is different + from the regular ProxyFactory) + + More info: https://github.com/5afe/contract-proxy-kit + + :param data: + :return: Tuple with the `master_copy` and `setup_data`, `None` if it cannot be decoded + """ + if not data: + return None try: _, data_decoded = self.cpk_proxy_factory_contract.decode_function_input( data diff --git a/safe_transaction_service/history/services/transaction_service.py b/safe_transaction_service/history/services/transaction_service.py index 68a98b5b..7946c74f 100644 --- a/safe_transaction_service/history/services/transaction_service.py +++ b/safe_transaction_service/history/services/transaction_service.py @@ -1,4 +1,3 @@ -import copyreg import logging import pickle from collections import defaultdict @@ -8,7 +7,7 @@ from django.db.models import Case, Exists, F, OuterRef, QuerySet, Subquery, Value, When from django.utils import timezone -from eth_typing import HexStr +from eth_typing import ChecksumAddress, HexStr from redis import Redis from gnosis.eth import EthereumClient, EthereumClientProvider @@ -25,6 +24,7 @@ InternalTx, ModuleTransaction, MultisigTransaction, + SafeContract, TransferDict, ) from ..serializers import ( @@ -36,6 +36,9 @@ logger = logging.getLogger(__name__) +AnySafeTransaction = EthereumTx | MultisigTransaction | ModuleTransaction + + class TransactionServiceException(Exception): pass @@ -56,8 +59,6 @@ class TransactionService: def __init__(self, ethereum_client: EthereumClient, redis: Redis): self.ethereum_client = ethereum_client self.redis = redis - # Encode memoryview for redis - copyreg.pickle(memoryview, lambda val: (memoryview, (bytes(val),))) # Cache methods --------------------------------- def get_cache_key(self, safe_address: str, tx_id: str): @@ -65,7 +66,7 @@ def get_cache_key(self, safe_address: str, tx_id: str): def get_txs_from_cache( self, safe_address: str, ids_to_search: Sequence[str] - ) -> List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]]: + ) -> List[AnySafeTransaction]: keys_to_search = [ self.get_cache_key(safe_address, id_to_search) for id_to_search in ids_to_search @@ -78,13 +79,11 @@ def get_txs_from_cache( def store_txs_in_cache( self, safe_address: str, - ids_with_txs: Tuple[ - str, List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]] - ], + ids_with_txs: Tuple[str, List[AnySafeTransaction]], ): """ Store executed transactions older than 10 minutes, using `ethereum_tx_hash` as key (for - MultisigTransaction it will be `SafeTxHash`) and expire then in one hour + MultisigTransaction it will be `SafeTxHash`) and expire them in one hour :param safe_address: :param ids_with_txs: @@ -108,6 +107,21 @@ def store_txs_in_cache( # End of cache methods ---------------------------- + def get_count_relevant_txs_for_safe(self, safe_address: ChecksumAddress) -> int: + """ + This method searches multiple tables and count every tx or event for a Safe. + It will return the same or higher value if compared to counting ``get_all_tx_identifiers`` + as that method will group some transactions (for example, 3 ERC20 can be grouped in a ``MultisigTransaction``, + so it will be ``1`` element for ``get_all_tx_identifiers`` but ``4`` for this function. + + This query should be pretty fast, and it's meant to be used for invalidating caches. + + :param safe_address: + :return: number of relevant txs for a Safe + """ + + return SafeContract.objects.get_count_relevant_txs_for_safe(safe_address) + def get_all_tx_identifiers( self, safe_address: str, @@ -119,7 +133,7 @@ def get_all_tx_identifiers( Build a queryset with identifiers (`safeTxHash` or `txHash`) for every tx for a Safe for paginated filtering. In the case of Multisig Transactions, as some of them are not mined, we use the `safeTxHash`. Criteria for building this list: - - Return only multisig txs with `nonce < current Safe Nonce` + - Return ``SafeTxHash`` for every MultisigTx (even not executed) - The endpoint should only show incoming transactions that have been mined - The transactions should be sorted by execution date. If an outgoing transaction doesn't have an execution date the execution date of the transaction with the same nonce that has been executed should be taken. @@ -134,7 +148,13 @@ def get_all_tx_identifiers( sent by a delegate or indexed). With `False` all txs are returned :return: List with tx hashes sorted by date (newest first) """ - + logger.debug( + "Safe=%s Getting all tx identifiers executed=%s queued=%s trusted=%s", + safe_address, + executed, + queued, + trusted, + ) # If tx is not mined, get the execution date of a tx mined with the same nonce case = Case( When( @@ -197,7 +217,6 @@ def get_all_tx_identifiers( "block", "safe_nonce", ) - .distinct() .order_by("-execution_date") ) @@ -282,7 +301,7 @@ def get_all_tx_identifiers( def get_all_txs_from_identifiers( self, safe_address: str, ids_to_search: Sequence[str] - ) -> List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]]: + ) -> List[AnySafeTransaction]: """ Now that we know how to paginate, we retrieve the real transactions @@ -290,49 +309,79 @@ def get_all_txs_from_identifiers( :param ids_to_search: `SafeTxHash` for MultisigTransactions, `txHash` for other transactions :return: """ - cached_txs = { - id_to_search: cached_tx - for id_to_search, cached_tx in zip( + + logger.debug( + "Safe=%s Getting %d txs from identifiers", safe_address, len(ids_to_search) + ) + ids_with_cached_txs = { + id_to_search: cached_txs + for id_to_search, cached_txs in zip( ids_to_search, self.get_txs_from_cache(safe_address, ids_to_search), ) - if cached_tx + if cached_txs } - id_not_cached = [ + logger.debug( + "Safe=%s Got %d cached txs from identifiers", + safe_address, + len(ids_with_cached_txs), + ) + ids_not_cached = [ hash_to_search for hash_to_search in ids_to_search - if hash_to_search not in cached_txs + if hash_to_search not in ids_with_cached_txs ] - id_with_multisig_txs: Dict[HexStr, List[MultisigTransaction]] = { + logger.debug( + "Safe=%s %d not cached txs from identifiers", + safe_address, + len(ids_not_cached), + ) + ids_with_multisig_txs: Dict[HexStr, List[MultisigTransaction]] = { multisig_tx.safe_tx_hash: [multisig_tx] for multisig_tx in MultisigTransaction.objects.filter( - safe=safe_address, safe_tx_hash__in=id_not_cached + safe=safe_address, safe_tx_hash__in=ids_not_cached ) .with_confirmations_required() .prefetch_related("confirmations") .select_related("ethereum_tx__block") .order_by("-nonce", "-created") } + logger.debug( + "Safe=%s Got %d Multisig txs from identifiers", + safe_address, + len(ids_with_multisig_txs), + ) - id_with_module_txs: Dict[HexStr, List[ModuleTransaction]] = {} + ids_with_module_txs: Dict[HexStr, List[ModuleTransaction]] = {} for module_tx in ModuleTransaction.objects.filter( - safe=safe_address, internal_tx__ethereum_tx__in=id_not_cached + safe=safe_address, internal_tx__ethereum_tx__in=ids_not_cached ).select_related("internal_tx"): - id_with_module_txs.setdefault( + ids_with_module_txs.setdefault( module_tx.internal_tx.ethereum_tx_id, [] ).append(module_tx) + logger.debug( + "Safe=%s Got %d Module txs from identifiers", + safe_address, + len(ids_with_module_txs), + ) - id_with_plain_ethereum_txs: Dict[HexStr, List[EthereumTx]] = { + ids_with_plain_ethereum_txs: Dict[HexStr, List[EthereumTx]] = { ethereum_tx.tx_hash: [ethereum_tx] for ethereum_tx in EthereumTx.objects.filter( - tx_hash__in=id_not_cached + tx_hash__in=ids_not_cached ).select_related("block") } + logger.debug( + "Safe=%s Got %d Plain Ethereum txs from identifiers", + safe_address, + len(ids_with_plain_ethereum_txs), + ) - # We also need the in/out transfers for the MultisigTxs - all_ids = id_not_cached + [ + # We also need the in/out transfers for the MultisigTxs, we add the MultisigTx Ethereum Tx hashes + # to not cached ids + all_ids = ids_not_cached + [ multisig_tx.ethereum_tx_id - for multisig_txs in id_with_multisig_txs.values() + for multisig_txs in ids_with_multisig_txs.values() for multisig_tx in multisig_txs ] @@ -358,6 +407,10 @@ def get_all_txs_from_identifiers( for transfer in transfers: transfer_dict[transfer["transaction_hash"]].append(transfer) + logger.debug( + "Safe=%s Got %d Transfers from identifiers", safe_address, len(transfers) + ) + # Add available information about the token on database for the transfers tokens = { token.address: token @@ -369,28 +422,34 @@ def get_all_txs_from_identifiers( } ) } + logger.debug( + "Safe=%s Got %d tokens for transfers from database", + safe_address, + len(tokens), + ) + for transfer in transfers: transfer["token"] = tokens.get(transfer["token_address"]) # Build the list def get_the_transactions( transaction_id: str, - ) -> List[Union[MultisigTransaction, ModuleTransaction, EthereumTx]]: + ) -> List[MultisigTransaction | ModuleTransaction | EthereumTx]: """ - :param transaction_id: - :return: Transactions for the transaction id + :param transaction_id: SafeTxHash (in case of a ``MultisigTransaction``) or Ethereum ``TxHash`` for the rest + :return: Transactions for the transaction id, with transfers appended """ - if result := cached_txs.get(transaction_id): + if result := ids_with_cached_txs.get(transaction_id): return result result: Optional[Union[MultisigTransaction, ModuleTransaction, EthereumTx]] - if result := id_with_multisig_txs.get(transaction_id): + if result := ids_with_multisig_txs.get(transaction_id): for multisig_tx in result: # Populate transfers multisig_tx.transfers = transfer_dict[multisig_tx.ethereum_tx_id] return result - if result := id_with_module_txs.get(transaction_id): + if result := ids_with_module_txs.get(transaction_id): for module_tx in result: # Populate transfers module_tx.transfers = transfer_dict[ @@ -398,7 +457,7 @@ def get_the_transactions( ] return result - if result := id_with_plain_ethereum_txs.get(transaction_id): + if result := ids_with_plain_ethereum_txs.get(transaction_id): # If no Multisig or Module tx found, fallback to simple tx for ethereum_tx in result: # Populate transfers @@ -411,18 +470,27 @@ def get_the_transactions( "Tx not found, problem merging all transactions together" ) + logger.debug( + "Safe=%s Got all transactions from tx identifiers. Storing in cache", + safe_address, + ) ids_with_txs = [ (id_to_search, get_the_transactions(id_to_search)) for id_to_search in ids_to_search ] self.store_txs_in_cache(safe_address, ids_with_txs) + logger.debug( + "Safe=%s Got all transactions from tx identifiers. Stored in cache", + safe_address, + ) return list( dict.fromkeys(tx for (_, txs) in ids_with_txs for tx in txs) ) # Sorted already by execution_date def serialize_all_txs( - self, models: List[Union[EthereumTx, MultisigTransaction, ModuleTransaction]] + self, models: List[AnySafeTransaction] ) -> List[Dict[str, Any]]: + logger.debug("Serializing all transactions") results = [] for model in models: model_type = type(model) @@ -437,4 +505,6 @@ def serialize_all_txs( serialized = serializer(model) # serialized.is_valid(raise_exception=True) results.append(serialized.data) + + logger.debug("Serialized all transactions") return results diff --git a/safe_transaction_service/history/services/webhooks.py b/safe_transaction_service/history/services/webhooks.py index f43550a8..49022168 100644 --- a/safe_transaction_service/history/services/webhooks.py +++ b/safe_transaction_service/history/services/webhooks.py @@ -18,6 +18,10 @@ TokenTransfer, WebHookType, ) +from safe_transaction_service.safe_messages.models import ( + SafeMessage, + SafeMessageConfirmation, +) from safe_transaction_service.utils.ethereum import get_chain_id @@ -104,6 +108,22 @@ def build_webhook_payload( "txHash": HexBytes(instance.internal_tx.ethereum_tx_id).hex(), } ] + elif sender == SafeMessage: + payloads = [ + { + "address": instance.safe, + "type": WebHookType.MESSAGE_CREATED.name, + "messageHash": HexBytes(instance.message_hash).hex(), + } + ] + elif sender == SafeMessageConfirmation: + payloads = [ + { + "address": instance.safe_message.safe, # This could make a db call + "type": WebHookType.MESSAGE_CONFIRMATION.name, + "messageHash": HexBytes(instance.safe_message.message_hash).hex(), + } + ] # Add chainId to every payload for payload in payloads: diff --git a/safe_transaction_service/history/tasks.py b/safe_transaction_service/history/tasks.py index c6106fa3..cd6f5aaa 100644 --- a/safe_transaction_service/history/tasks.py +++ b/safe_transaction_service/history/tasks.py @@ -452,6 +452,9 @@ def process_decoded_internal_txs_for_safe_task( # Check if a new decoded tx appeared before other already processed (due to a reindex) if InternalTxDecoded.objects.out_of_order_for_safe(safe_address): + logger.error( + "Found out of order transactions for Safe=%s", safe_address + ) tx_processor.clear_cache(safe_address) index_service.reprocess_addresses([safe_address]) diff --git a/safe_transaction_service/history/tests/test_models.py b/safe_transaction_service/history/tests/test_models.py index 38c1fd7e..e815c19e 100644 --- a/safe_transaction_service/history/tests/test_models.py +++ b/safe_transaction_service/history/tests/test_models.py @@ -1391,7 +1391,9 @@ def test_with_confirmations_required(self): ) # SafeStatus not matching the EthereumTx - safe_status = SafeStatusFactory(nonce=1, threshold=8) + safe_status = SafeStatusFactory( + address=multisig_transaction.safe, nonce=1, threshold=8 + ) self.assertIsNone( MultisigTransaction.objects.with_confirmations_required() .first() diff --git a/safe_transaction_service/history/tests/test_safe_service.py b/safe_transaction_service/history/tests/test_safe_service.py index 39ee709a..d3bcb521 100644 --- a/safe_transaction_service/history/tests/test_safe_service.py +++ b/safe_transaction_service/history/tests/test_safe_service.py @@ -51,9 +51,11 @@ def test_get_safe_creation_info_with_tracing(self): ) self.assertIsInstance(safe_creation_info, SafeCreationInfo) - def test_get_safe_creation_info_without_tracing(self): + def test_get_safe_creation_info_without_tracing_but_with_proxy_factory(self): """ Tracing is not used, so traces must be fetched from DB if possible. L2 indexer "emulates" creation traces + if ``ProxyCreation`` event is detected (ProxyFactory used) + :return: """ random_address = Account.create().address @@ -80,6 +82,29 @@ def test_get_safe_creation_info_without_tracing(self): self.assertEqual(safe_creation.master_copy, setup_trace.to) self.assertEqual(bytes(safe_creation.setup_data), b"1234") + def test_get_safe_creation_info_without_tracing_nor_proxy_factory(self): + """ + Tracing is not used, so traces must be fetched from DB if possible. L2 indexer cannot "emulate" creation traces + as ProxyFactory was not used + + :return: + """ + + random_address = Account.create().address + creation_trace = InternalTxFactory( + contract_address=random_address, + ethereum_tx__status=1, + trace_address="0", + ethereum_tx__data=None, + ) + + # Setup can be done by a transfer to a contract, no need to have data + safe_creation = self.safe_service.get_safe_creation_info(random_address) + self.assertEqual(safe_creation.creator, creation_trace.ethereum_tx._from) + self.assertEqual(safe_creation.factory_address, creation_trace._from) + self.assertIsNone(safe_creation.master_copy) + self.assertIsNone(safe_creation.setup_data) + @mock.patch.object( TracingManager, "trace_transaction", return_value=creation_internal_txs ) diff --git a/safe_transaction_service/history/tests/test_signals.py b/safe_transaction_service/history/tests/test_signals.py index 3f8d4468..2c90826f 100644 --- a/safe_transaction_service/history/tests/test_signals.py +++ b/safe_transaction_service/history/tests/test_signals.py @@ -7,10 +7,16 @@ import factory from gnosis.eth import EthereumNetwork +from gnosis.safe.tests.safe_test_case import SafeTestCaseMixin from safe_transaction_service.events.tasks import send_event_to_queue_task from safe_transaction_service.notifications.tasks import send_notification_task +from ...safe_messages.models import SafeMessage, SafeMessageConfirmation +from ...safe_messages.tests.factories import ( + SafeMessageConfirmationFactory, + SafeMessageFactory, +) from ..models import ( ERC20Transfer, InternalTx, @@ -28,7 +34,7 @@ ) -class TestSignals(TestCase): +class TestSignals(SafeTestCaseMixin, TestCase): @factory.django.mute_signals(post_save) def test_build_webhook_payload(self): self.assertEqual( @@ -77,6 +83,23 @@ def test_build_webhook_payload(self): self.assertEqual(payload["type"], WebHookType.PENDING_MULTISIG_TRANSACTION.name) self.assertEqual(payload["chainId"], str(EthereumNetwork.GANACHE.value)) + safe_address = self.deploy_test_safe().address + safe_message = SafeMessageFactory(safe=safe_address) + payload = build_webhook_payload(SafeMessage, safe_message)[0] + self.assertEqual(payload["type"], WebHookType.MESSAGE_CREATED.name) + self.assertEqual(payload["address"], safe_address) + self.assertEqual(payload["messageHash"], safe_message.message_hash) + self.assertEqual(payload["chainId"], str(EthereumNetwork.GANACHE.value)) + + payload = build_webhook_payload( + SafeMessageConfirmation, + SafeMessageConfirmationFactory(safe_message=safe_message), + )[0] + self.assertEqual(payload["type"], WebHookType.MESSAGE_CONFIRMATION.name) + self.assertEqual(payload["address"], safe_address) + self.assertEqual(payload["messageHash"], safe_message.message_hash) + self.assertEqual(payload["chainId"], str(EthereumNetwork.GANACHE.value)) + @factory.django.mute_signals(post_save) @mock.patch.object(send_webhook_task, "apply_async") @mock.patch.object(send_notification_task, "apply_async") diff --git a/safe_transaction_service/history/tests/test_transaction_service.py b/safe_transaction_service/history/tests/test_transaction_service.py index 7cb3ebd9..86744a72 100644 --- a/safe_transaction_service/history/tests/test_transaction_service.py +++ b/safe_transaction_service/history/tests/test_transaction_service.py @@ -5,15 +5,22 @@ from eth_account import Account -from ..models import EthereumTx, ModuleTransaction, MultisigTransaction +from ..models import ( + EthereumTx, + EthereumTxCallType, + ModuleTransaction, + MultisigTransaction, +) from ..services.transaction_service import ( TransactionService, TransactionServiceProvider, ) from .factories import ( ERC20TransferFactory, + ERC721TransferFactory, InternalTxFactory, ModuleTransactionFactory, + MultisigConfirmationFactory, MultisigTransactionFactory, ) @@ -29,6 +36,57 @@ def tearDown(self): super().tearDown() self.transaction_service.redis.flushall() + def test_get_count_relevant_txs_for_safe(self): + transaction_service: TransactionService = self.transaction_service + safe_address = Account.create().address + + self.assertEqual( + transaction_service.get_count_relevant_txs_for_safe(safe_address), 0 + ) + + MultisigTransactionFactory(safe=safe_address) + self.assertEqual( + transaction_service.get_count_relevant_txs_for_safe(safe_address), 1 + ) + + multisig_transaction = MultisigTransactionFactory(safe=safe_address) + MultisigConfirmationFactory(multisig_transaction=multisig_transaction) + MultisigConfirmationFactory(multisig_transaction=multisig_transaction) + # Not related MultisigConfirmation should not show + MultisigConfirmationFactory() + ERC20TransferFactory(to=safe_address) + ERC20TransferFactory(_from=safe_address) + ERC721TransferFactory(to=safe_address) + ERC721TransferFactory(_from=safe_address) + ModuleTransactionFactory(safe=safe_address) + InternalTxFactory( + value=5, call_type=EthereumTxCallType.CALL.value, to=safe_address + ) + + self.assertEqual( + transaction_service.get_count_relevant_txs_for_safe(safe_address), 10 + ) + + # InternalTxs without value are not returned + InternalTxFactory( + value=0, call_type=EthereumTxCallType.CALL.value, to=safe_address + ) + + # InternalTxs without proper type are not returned + InternalTxFactory( + value=5, call_type=EthereumTxCallType.DELEGATE_CALL.value, to=safe_address + ) + + self.assertEqual( + transaction_service.get_count_relevant_txs_for_safe(safe_address), 10 + ) + + # A different Safe must be empty + safe_address_2 = Account.create().address + self.assertEqual( + transaction_service.get_count_relevant_txs_for_safe(safe_address_2), 0 + ) + def test_get_all_tx_identifiers(self): transaction_service: TransactionService = self.transaction_service safe_address = Account.create().address diff --git a/safe_transaction_service/history/tests/test_views.py b/safe_transaction_service/history/tests/test_views.py index 45c4c8f2..ea19d08a 100644 --- a/safe_transaction_service/history/tests/test_views.py +++ b/safe_transaction_service/history/tests/test_views.py @@ -1,5 +1,7 @@ +import datetime import json import logging +import pickle from dataclasses import asdict from unittest import mock from unittest.mock import MagicMock, PropertyMock @@ -32,6 +34,7 @@ from safe_transaction_service.tokens.services.price_service import PriceService from safe_transaction_service.tokens.tests.factories import TokenFactory +from ...utils.redis import get_redis from ..helpers import DelegateSignatureHelper from ..models import ( IndexingStatus, @@ -47,6 +50,7 @@ from .factories import ( ERC20TransferFactory, ERC721TransferFactory, + EthereumBlockFactory, EthereumTxFactory, InternalTxFactory, ModuleTransactionFactory, @@ -338,6 +342,104 @@ def test_all_transactions_executed(self): self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["count"], 1) + def test_all_transactions_ordering(self): + safe_address = Account.create().address + block_2_days_ago = EthereumBlockFactory( + timestamp=timezone.now() - datetime.timedelta(days=2) + ) + ethereum_tx_2_days_ago = EthereumTxFactory(block=block_2_days_ago) + # Older transaction + MultisigTransactionFactory( + safe=safe_address, ethereum_tx=ethereum_tx_2_days_ago + ) + # Earlier transactions + MultisigTransactionFactory(safe=safe_address) + MultisigTransactionFactory(safe=safe_address) + # Nonce is not allowed as a sorting parameter + response = self.client.get( + reverse("v1:history:all-transactions", args=(safe_address,)) + + "?ordering=nonce" + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + response = self.client.get( + reverse("v1:history:all-transactions", args=(safe_address,)) + + "?trusted=False&ordering=execution_date" + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 3) + first_result = response.data["results"][0] + self.assertEqual( + first_result["transaction_hash"], ethereum_tx_2_days_ago.tx_hash + ) + response = self.client.get( + reverse("v1:history:all-transactions", args=(safe_address,)) + + "?trusted=False&ordering=-execution_date" + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 3) + last_result = response.data["results"][2] + self.assertEqual( + last_result["transaction_hash"], ethereum_tx_2_days_ago.tx_hash + ) + + def test_all_transactions_cache(self): + safe_address = "0x54f3c8e4Bf7bFDFF39B36d1FAE4e5ceBdD93C6A9" + # Older transaction + factory_transactions = [ + MultisigTransactionFactory(safe=safe_address), + MultisigTransactionFactory(safe=safe_address), + ] + # all-txs:{safe}:{executed}{queued}{trusted}:{limit}:{offset}:{ordering}:{relevant_elements} + cache_key = "all-txs:0x54f3c8e4Bf7bFDFF39B36d1FAE4e5ceBdD93C6A9:100:10:0:execution_date:2" + redis = get_redis() + redis.delete(cache_key) + cache_result = redis.get(cache_key) + # Should be empty at the beginning + self.assertIsNone(cache_result) + + response = self.client.get( + reverse("v1:history:all-transactions", args=(safe_address,)) + + "?executed=True&queued=False&trusted=False&ordering=execution_date" + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + + cache_result = redis.get(cache_key) + # Should be stored in redis cache + self.assertIsNotNone(cache_result) + # Cache should content the expected values + cache_values, cache_count = pickle.loads(cache_result) + self.assertEqual(cache_count, 2) + for cache_value, factory_transaction in zip(cache_values, factory_transactions): + self.assertEqual( + cache_value["safe_tx_hash"], factory_transaction.safe_tx_hash + ) + self.assertEqual(cache_value["created"], factory_transaction.created) + self.assertEqual( + cache_value["execution_date"], factory_transaction.execution_date + ) + self.assertEqual( + cache_value["block"], factory_transaction.ethereum_tx.block_id + ) + self.assertEqual(cache_value["safe_nonce"], factory_transaction.nonce) + # Modify cache to empty list + redis.set(cache_key, pickle.dumps(([], 0)), ex=60 * 10) + response = self.client.get( + reverse("v1:history:all-transactions", args=(safe_address,)) + + "?executed=True&queued=False&trusted=False&ordering=execution_date" + ) + # Response should be returned from cache + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 0) + + # Cache should be invalidated because there is new transaction + MultisigTransactionFactory(safe=safe_address) + response = self.client.get( + reverse("v1:history:all-transactions", args=(safe_address,)) + + "?executed=True&queued=False&trusted=False&ordering=execution_date" + ) + self.assertEqual(response.data["count"], 3) + def test_all_transactions_wrong_transfer_type_view(self): # No token in database, so we must trust the event safe_address = Account.create().address @@ -2559,16 +2661,6 @@ def test_get_transfer_view(self): ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - # test internal_tx transfer_id empty trace_address - transfer_id = ( - "ief060441f0101ab83d62066b962f97e3a582686e0720157407c965c5946c2f7a" - ) - response = self.client.get( - reverse("v1:history:transfer", args=(transfer_id,)), - format="json", - ) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - # test invalid erc20 transfer_id empty log_index transfer_id = ( "e27e15ba8dea473d98c80a6b45d372c0f3c6f8c184177044c935c37eb419d7216" @@ -2633,6 +2725,39 @@ def test_get_transfer_view(self): } self.assertEqual(response.json(), expected_result) + # test internal_tx transfer_id empty trace_address + ethereum_tx_hash = ( + "0x12bafc5ee165d825201a24418e00bef6039bb06f6d09420ab1c5f7b4098c0809" + ) + ethereum_tx = EthereumTxFactory(tx_hash=ethereum_tx_hash) + internal_tx_empty_trace_address = InternalTxFactory( + ethereum_tx=ethereum_tx, to=safe_address, trace_address="" + ) + transfer_id_empty_trace_address = ( + "i12bafc5ee165d825201a24418e00bef6039bb06f6d09420ab1c5f7b4098c0809" + ) + response = self.client.get( + reverse("v1:history:transfer", args=(transfer_id_empty_trace_address,)), + format="json", + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + expected_result = { + "type": TransferType.ETHER_TRANSFER.name, + "executionDate": internal_tx_empty_trace_address.ethereum_tx.block.timestamp.isoformat().replace( + "+00:00", "Z" + ), + "blockNumber": internal_tx_empty_trace_address.ethereum_tx.block_id, + "transferId": transfer_id_empty_trace_address, + "transactionHash": internal_tx_empty_trace_address.ethereum_tx_id, + "to": safe_address, + "value": str(internal_tx_empty_trace_address.value), + "tokenId": None, + "tokenAddress": None, + "from": internal_tx_empty_trace_address._from, + "tokenInfo": None, + } + self.assertEqual(response.json(), expected_result) + # Test filtering ERC20 transfer by transfer_id erc20_tx_hash = ( "0x406754000f0432d3b5e6d8341597ec3c5338239f8d311de9061fbc959f443d59" @@ -3044,8 +3169,8 @@ def test_safe_info_view(self): """ SafeMasterCopy.objects.get_version_for_address.cache_clear() - def test_master_copies_view(self): - response = self.client.get(reverse("v1:history:master-copies")) + def _test_singletons_view(self, url: str): + response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data, []) @@ -3055,7 +3180,7 @@ def test_master_copies_view(self): initial_block_number=deployed_block_number, tx_block_number=last_indexed_block_number, ) - response = self.client.get(reverse("v1:history:master-copies")) + response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) expected_master_copy = [ { @@ -3070,7 +3195,7 @@ def test_master_copies_view(self): self.assertCountEqual(response.data, expected_master_copy) safe_master_copy = SafeMasterCopyFactory(l2=True) - response = self.client.get(reverse("v1:history:master-copies")) + response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) expected_l2_master_copy = [ { @@ -3088,10 +3213,18 @@ def test_master_copies_view(self): ) with self.settings(ETH_L2_NETWORK=True): - response = self.client.get(reverse("v1:history:master-copies")) + response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertCountEqual(response.data, expected_l2_master_copy) + def test_singletons_view(self): + url = reverse("v1:history:singletons") + return self._test_singletons_view(url) + + def test_master_copies_view(self): + url = reverse("v1:history:master-copies") + return self._test_singletons_view(url) + def test_modules_view(self): invalid_address = "0x2A" response = self.client.get( diff --git a/safe_transaction_service/history/urls.py b/safe_transaction_service/history/urls.py index b5627dd9..e95ab111 100644 --- a/safe_transaction_service/history/urls.py +++ b/safe_transaction_service/history/urls.py @@ -18,7 +18,8 @@ ), path( "about/master-copies/", views.MasterCopiesView.as_view(), name="master-copies" - ), + ), # Deprecated + path("about/singletons/", views.SingletonsView.as_view(), name="singletons"), path( "about/indexing/", views.IndexingView.as_view(), diff --git a/safe_transaction_service/history/views.py b/safe_transaction_service/history/views.py index 11b118d2..947e898b 100644 --- a/safe_transaction_service/history/views.py +++ b/safe_transaction_service/history/views.py @@ -1,6 +1,7 @@ import hashlib import logging -from typing import Any, Dict, Tuple +import pickle +from typing import Any, Dict, Optional, Tuple from django.conf import settings from django.utils.decorators import method_decorator @@ -9,7 +10,7 @@ import django_filters from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema -from eth_typing import HexStr +from eth_typing import ChecksumAddress, HexStr from rest_framework import status from rest_framework.filters import OrderingFilter from rest_framework.generics import ( @@ -33,6 +34,7 @@ from safe_transaction_service.utils.ethereum import get_chain_id from safe_transaction_service.utils.utils import parse_boolean_query_param +from ..utils.redis import get_redis from . import filters, pagination, serializers from .helpers import add_tokens_to_transfers, is_valid_unique_transfer_id from .models import ( @@ -48,6 +50,7 @@ SafeMasterCopy, TransferDict, ) +from .pagination import ListPagination from .serializers import get_data_decoded_from_data from .services import ( BalanceServiceProvider, @@ -67,7 +70,7 @@ class AboutView(APIView): renderer_classes = (JSONRenderer,) - @method_decorator(cache_page(60 * 60)) # 1 hour + @method_decorator(cache_page(5 * 60)) # 5 minutes def get(self, request, format=None): content = { "name": "Safe Transaction Service", @@ -164,7 +167,7 @@ def get(self, request): return Response(status=status.HTTP_200_OK, data=serializer.data) -class MasterCopiesView(ListAPIView): +class SingletonsView(ListAPIView): serializer_class = serializers.MasterCopyResponseSerializer pagination_class = None @@ -172,12 +175,25 @@ def get_queryset(self): return SafeMasterCopy.objects.relevant() +class MasterCopiesView(SingletonsView): + @swagger_auto_schema( + deprecated=True, + operation_description="Use `singletons` instead of `master-copies`", + responses={200: "Ok"}, + ) + def get(self, *args, **kwargs): + return super().get(*args, **kwargs) + + class AllTransactionsListView(ListAPIView): filter_backends = ( django_filters.rest_framework.DjangoFilterBackend, OrderingFilter, ) - ordering_fields = ["execution_date", "safe_nonce", "block", "created"] + ordering_fields = ["execution_date"] + allowed_ordering_fields = ordering_fields + [ + f"-{ordering_field}" for ordering_field in ordering_fields + ] pagination_class = pagination.SmallPagination serializer_class = ( serializers.AllTransactionsSchemaSerializer @@ -231,27 +247,165 @@ def get_parameters(self) -> Tuple[bool, bool, bool]: ) return executed, queued, trusted - def list(self, request, *args, **kwargs): + def get_ordering_parameter(self) -> Optional[str]: + return self.request.query_params.get(OrderingFilter.ordering_param) + + def get_page_tx_identifiers( + self, + safe: ChecksumAddress, + executed: bool, + queued: bool, + trusted: bool, + ordering: Optional[str], + limit: int, + offset: int, + ) -> Optional[Response]: + """ + This query will merge txs and events and will return the important + identifiers (``safeTxHash`` or ``txHash``) filtered + + :param safe: + :param executed: + :param queued: + :param trusted: + :param ordering: + :param limit: + :param offset: + :return: Return tx identifiers paginated + """ transaction_service = TransactionServiceProvider() - safe = self.kwargs["address"] - executed, queued, trusted = self.get_parameters() + + logger.debug( + "%s: Getting all tx identifiers for Safe=%s executed=%s queued=%s trusted=%s ordering=%s limit=%d offset=%d", + self.__class__.__name__, + safe, + executed, + queued, + trusted, + ordering, + limit, + offset, + ) queryset = self.filter_queryset( transaction_service.get_all_tx_identifiers( safe, executed=executed, queued=queued, trusted=trusted ) ) page = self.paginate_queryset(queryset) + logger.debug( + "%s: Got all tx identifiers for Safe=%s executed=%s queued=%s trusted=%s ordering=%s limit=%d offset=%d", + self.__class__.__name__, + safe, + executed, + queued, + trusted, + ordering, + limit, + offset, + ) + + return page + + def get_cached_page_tx_identifiers( + self, + safe: ChecksumAddress, + executed: bool, + queued: bool, + trusted: bool, + ordering: Optional[str], + limit: int, + offset: int, + ) -> Optional[Response]: + """ + Cache for tx identifiers. A quick ``SQL COUNT`` in all the transactions/events + tables will determinate if cache for the provided values is still valid or not + + :param safe: + :param executed: + :param queued: + :param trusted: + :param ordering: + :param limit: + :param offset: + :return: + """ + transaction_service = TransactionServiceProvider() + cache_timeout = settings.CACHE_ALL_TXS_VIEW + redis = get_redis() + + # Get all relevant elements for a Safe to be cached + relevant_elements = transaction_service.get_count_relevant_txs_for_safe(safe) + cache_key = f"all-txs:{safe}:{int(executed)}{int(queued)}{int(trusted)}:{limit}:{offset}:{ordering}:{relevant_elements}" + lock_key = f"locks:{cache_key}" + + if not cache_timeout: + # Cache disabled + return self.get_page_tx_identifiers( + safe, executed, queued, trusted, ordering, limit, offset + ) + + with redis.lock( + lock_key, + timeout=settings.GUNICORN_REQUEST_TIMEOUT, # This prevents a service restart to leave a lock forever + ): + if result := redis.get(cache_key): + # Count needs to be retrieved to set it up the paginator + page, count = pickle.loads(result) + # Setting the paginator like this is not very elegant and needs to be tested really well + self.paginator.count = count + self.paginator.limit = limit + self.paginator.offset = offset + self.paginator.request = self.request + return page + page = self.get_page_tx_identifiers( + safe, executed, queued, trusted, ordering, limit, offset + ) + redis.set( + cache_key, pickle.dumps((page, self.paginator.count)), ex=cache_timeout + ) + + return page + + def list(self, request, *args, **kwargs): + transaction_service = TransactionServiceProvider() + safe = self.kwargs["address"] + executed, queued, trusted = self.get_parameters() + ordering = self.get_ordering_parameter() + # Trick to get limit and offset + list_pagination = ListPagination(self.request) + limit, offset = list_pagination.limit, list_pagination.offset - if not page: + tx_identifiers_page = self.get_cached_page_tx_identifiers( + safe, executed, queued, trusted, ordering, limit, offset + ) + if not tx_identifiers_page: return self.get_paginated_response([]) # Tx identifiers are retrieved using `safe_tx_hash` attribute name due to how Django # handles `UNION` of all the Transaction models using the first model attribute name - all_tx_identifiers = [element["safe_tx_hash"] for element in page] + all_tx_identifiers = [ + element["safe_tx_hash"] for element in tx_identifiers_page + ] all_txs = transaction_service.get_all_txs_from_identifiers( safe, all_tx_identifiers ) + logger.debug( + "%s: Got all txs from identifiers for Safe=%s executed=%s queued=%s trusted=%s", + self.__class__.__name__, + safe, + executed, + queued, + trusted, + ) all_txs_serialized = transaction_service.serialize_all_txs(all_txs) + logger.debug( + "%s: All txs from identifiers for Safe=%s executed=%s queued=%s trusted=%s were serialized", + self.__class__.__name__, + safe, + executed, + queued, + trusted, + ) return self.get_paginated_response(all_txs_serialized) @swagger_auto_schema( @@ -275,7 +429,7 @@ def get(self, request, *args, **kwargs): by a delegate). If you need that behaviour to be disabled set the query parameter `trusted=False` - Module Transactions for a Safe. `tx_type=MODULE_TRANSACTION` - Incoming Transfers of Ether/ERC20 Tokens/ERC721 Tokens. `tx_type=ETHEREUM_TRANSACTION` - Ordering_fields: ["execution_date", "safe_nonce", "block", "created"] eg: `created` or `-created` + Ordering_fields: ["execution_date"] eg: `execution_date` or `-execution_date` """ address = kwargs["address"] if not fast_is_checksum_address(address): @@ -287,6 +441,16 @@ def get(self, request, *args, **kwargs): "arguments": [address], }, ) + ordering = self.get_ordering_parameter() + if ordering and ordering not in self.allowed_ordering_fields: + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={ + "code": 1, + "message": "Ordering field is not valid, only `execution_date` is allowed", + "arguments": [ordering], + }, + ) response = super().get(request, *args, **kwargs) response.setdefault( diff --git a/safe_transaction_service/notifications/serializers.py b/safe_transaction_service/notifications/serializers.py index 61f67c2a..23c0adca 100644 --- a/safe_transaction_service/notifications/serializers.py +++ b/safe_transaction_service/notifications/serializers.py @@ -1,5 +1,5 @@ import time -from typing import Any, Dict, Sequence, Set +from typing import Any, Dict, List, Sequence, Set, Tuple from uuid import uuid4 from django.db import IntegrityError, transaction @@ -88,53 +88,83 @@ def get_valid_owners( return valid_owners + def process_parsed_signatures( + self, + safe_owners: Sequence[ChecksumAddress], + signatures: Sequence[bytes], + hash_to_sign: bytes, + ) -> Tuple[List[ChecksumAddress], List[ChecksumAddress]]: + """ + :param safe_owners: Current owners of the Safe + :param signatures: List of signatures for registration + :param hash_to_sign: Raw hash or EIP191 encoded registration authorization hash is accepted + :return: A tuple with ``accepted owners to register`` and ``not accepted owners`` + """ + owners_to_register = [] # Owners to register for notifications + owners_to_not_register = [] # Owners of the Safe not present in the signature + for signature in signatures: + parsed_signatures = SafeSignature.parse_signature(signature, hash_to_sign) + if not parsed_signatures: + raise ValidationError("Signature cannot be parsed") + for safe_signature in parsed_signatures: + if ( + safe_signature.signature_type != SafeSignatureType.EOA + or not safe_signature.is_valid() + ): + raise ValidationError( + "An externally owned account signature was expected" + ) + owner = safe_signature.owner + if owner in (owners_to_register + owners_to_not_register): + raise ValidationError(f"Signature for owner={owner} is duplicated") + + if owner in safe_owners: + owners_to_register.append(owner) + else: + owners_to_not_register.append(owner) + # raise ValidationError(f'Owner={owner} is not an owner of any of the safes={data["safes"]}. ' + # f'Expected hash to sign {hash_to_sign.hex()}') + return owners_to_register, owners_to_not_register + def validate(self, attrs: Dict[str, Any]): attrs = super().validate(attrs) - signature_owners = [] - owners_without_safe = [] signatures = attrs.get("signatures") or [] safe_addresses = attrs["safes"] + owners_to_register, owners_to_not_register = [], [] if signatures: - valid_owners = self.get_valid_owners(safe_addresses) - for signature in signatures: - hash_to_sign = calculate_device_registration_hash( - attrs["timestamp"], - attrs["uuid"], - attrs["cloud_messaging_token"], - attrs["safes"], - ) - parsed_signatures = SafeSignature.parse_signature( - signature, hash_to_sign + safe_owners = self.get_valid_owners(safe_addresses) + # Allow 2 valid hashes, raw hash and EIP191 one + hash_raw_to_sign = calculate_device_registration_hash( + attrs["timestamp"], + attrs["uuid"], + attrs["cloud_messaging_token"], + attrs["safes"], + ) + hash_eip191_to_sign = calculate_device_registration_hash( + attrs["timestamp"], + attrs["uuid"], + attrs["cloud_messaging_token"], + attrs["safes"], + eip191=True, + ) + for hash_to_sign in (hash_raw_to_sign, hash_eip191_to_sign): + # We will check the 2 accepted hashes, EIP191 and raw one. If we find valid owners, stop + ( + owners_to_register, + owners_to_not_register, + ) = self.process_parsed_signatures( + safe_owners, signatures, hash_to_sign ) - if not parsed_signatures: - raise ValidationError("Signature cannot be parsed") - for safe_signature in parsed_signatures: - if ( - safe_signature.signature_type != SafeSignatureType.EOA - or not safe_signature.is_valid() - ): - raise ValidationError( - "An externally owned account signature was expected" - ) - owner = safe_signature.owner - if owner in (signature_owners + owners_without_safe): - raise ValidationError( - f"Signature for owner={owner} is duplicated" - ) - - if owner not in valid_owners: - owners_without_safe.append(owner) - # raise ValidationError(f'Owner={owner} is not an owner of any of the safes={data["safes"]}. ' - # f'Expected hash to sign {hash_to_sign.hex()}') - else: - signature_owners.append(owner) - if len(signatures) > len(signature_owners + owners_without_safe): + if owners_to_register: + break + + if len(signatures) > len(owners_to_register + owners_to_not_register): raise ValidationError( "Number of signatures is less than the number of owners detected" ) - attrs["owners_registered"] = signature_owners - attrs["owners_not_registered"] = owners_without_safe + attrs["owners_registered"] = owners_to_register + attrs["owners_not_registered"] = owners_to_not_register return attrs @transaction.atomic diff --git a/safe_transaction_service/notifications/tests/test_views.py b/safe_transaction_service/notifications/tests/test_views.py index 123c120a..4d055186 100644 --- a/safe_transaction_service/notifications/tests/test_views.py +++ b/safe_transaction_service/notifications/tests/test_views.py @@ -5,6 +5,7 @@ from django.urls import reverse from eth_account import Account +from eth_account.messages import encode_defunct from rest_framework import status from rest_framework.test import APITestCase @@ -134,7 +135,7 @@ def test_notifications_devices_create_with_signatures_view(self): signatures = [owner_account.signHash(hash_to_sign)["signature"].hex()] data = { "uuid": unique_id, - "safes": [safe_address], + "safes": safes, "cloudMessagingToken": cloud_messaging_token, "buildNumber": 0, "bundle": "company.package.app", @@ -147,7 +148,6 @@ def test_notifications_devices_create_with_signatures_view(self): reverse("v1:notifications:devices"), format="json", data=data ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn( f"Could not get Safe {safe_address} owners from blockchain, check contract exists on network", response.data["non_field_errors"][0], @@ -201,6 +201,49 @@ def test_notifications_devices_create_with_signatures_view(self): [owner_account.address, owner_account_2.address], ) + def test_notifications_devices_create_with_signatures_eip191_view(self): + safe_address = Account.create().address + safe_contract = SafeContractFactory(address=safe_address) + owner_account = Account.create() + owner_account_2 = Account.create() + + self.assertEqual(FirebaseDevice.objects.count(), 0) + unique_id = uuid.uuid4() + timestamp = int(time.time()) + cloud_messaging_token = "A" * 163 + safes = [safe_address] + hash_to_sign = calculate_device_registration_hash( + timestamp, unique_id, cloud_messaging_token, safes + ) + message_to_sign = encode_defunct(hash_to_sign) + signatures = [owner_account.sign_message(message_to_sign)["signature"].hex()] + data = { + "uuid": unique_id, + "safes": safes, + "cloudMessagingToken": cloud_messaging_token, + "buildNumber": 0, + "bundle": "company.package.app", + "deviceType": "WEB", + "version": "2.0.1", + "timestamp": timestamp, + "signatures": signatures, + } + + with mock.patch( + "safe_transaction_service.notifications.serializers.get_safe_owners", + return_value=[owner_account.address], + ): + response = self.client.post( + reverse("v1:notifications:devices"), format="json", data=data + ) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["uuid"], str(unique_id)) + self.assertEqual(FirebaseDevice.objects.count(), 1) + self.assertEqual(FirebaseDeviceOwner.objects.count(), 1) + self.assertEqual( + FirebaseDeviceOwner.objects.first().owner, owner_account.address + ) + def test_notifications_devices_create_with_delegates_signatures_view(self): delegate = Account.create() safe_contract_delegate = SafeContractDelegateFactory(delegate=delegate.address) diff --git a/safe_transaction_service/notifications/utils.py b/safe_transaction_service/notifications/utils.py index 4b179399..c64b43d2 100644 --- a/safe_transaction_service/notifications/utils.py +++ b/safe_transaction_service/notifications/utils.py @@ -5,6 +5,7 @@ from django.conf import settings +from eth_account.messages import defunct_hash_message from hexbytes import HexBytes from gnosis.eth.utils import fast_keccak @@ -67,9 +68,13 @@ def calculate_device_registration_hash( cloud_messaging_token: str, safes: Sequence[str], prefix: str = "gnosis-safe", + eip191: bool = False, ) -> HexBytes: safes_to_str = "".join(sorted(safes)) str_to_sign = ( f"{prefix}{timestamp}{identifier}{cloud_messaging_token}{safes_to_str}" ) - return fast_keccak(str_to_sign.encode()) + raw_hash = fast_keccak(str_to_sign.encode()) + if eip191: + return defunct_hash_message(raw_hash) + return raw_hash diff --git a/safe_transaction_service/safe_messages/admin.py b/safe_transaction_service/safe_messages/admin.py index 0c50cf15..8ca928dc 100644 --- a/safe_transaction_service/safe_messages/admin.py +++ b/safe_transaction_service/safe_messages/admin.py @@ -11,7 +11,7 @@ class SafeMessageAdmin(BinarySearchAdmin): list_display = ("safe", "message_hash", "proposed_by", "message") ordering = ["-created"] readonly_fields = ("message_hash",) - search_fields = ["safe", "message_hash", "proposed_by", "message"] + search_fields = ["=safe", "=message_hash", "=proposed_by", "message"] @admin.register(SafeMessageConfirmation) @@ -26,7 +26,7 @@ class SafeMessageConfirmationAdmin(BinarySearchAdmin): list_select_related = ("safe_message",) ordering = ["-created"] search_fields = [ - "safe_message__safe", - "owner", + "=safe_message__safe", + "=owner", "safe_message__description", ] diff --git a/safe_transaction_service/safe_messages/models.py b/safe_transaction_service/safe_messages/models.py index baa8293c..84f0dfc5 100644 --- a/safe_transaction_service/safe_messages/models.py +++ b/safe_transaction_service/safe_messages/models.py @@ -33,11 +33,7 @@ def __str__(self): message = message_str[:message_size] if len(message_str) > message_size: message += "..." - message_hash = ( - self.message_hash.hex() - if isinstance(self.message_hash, bytes) - else self.message_hash - ) + message_hash = HexBytes(self.message_hash).hex() return f"Safe Message {message_hash} - {message}" def build_signature(self) -> bytes: diff --git a/safe_transaction_service/safe_messages/signals.py b/safe_transaction_service/safe_messages/signals.py index eea436a3..6248debf 100644 --- a/safe_transaction_service/safe_messages/signals.py +++ b/safe_transaction_service/safe_messages/signals.py @@ -1,3 +1,55 @@ import logging +from typing import Type, Union + +from django.db.models import Model +from django.db.models.signals import post_save +from django.dispatch import receiver + +from safe_transaction_service.events.tasks import send_event_to_queue_task +from safe_transaction_service.history.services.webhooks import build_webhook_payload +from safe_transaction_service.history.tasks import send_webhook_task +from safe_transaction_service.safe_messages.models import ( + SafeMessage, + SafeMessageConfirmation, +) logger = logging.getLogger(__name__) + + +@receiver(post_save, sender=SafeMessage, dispatch_uid="safe_message.process_webhook") +@receiver( + post_save, + sender=SafeMessageConfirmation, + dispatch_uid="safe_message_confirmation.process_webhook", +) +def process_webhook( + sender: Type[Model], + instance: Union[ + SafeMessage, + SafeMessageConfirmation, + ], + created: bool, + **kwargs, +) -> None: + logger.debug("Start building payloads for created=%s object=%s", created, instance) + payloads = build_webhook_payload(sender, instance) + logger.debug( + "End building payloads %s for created=%s object=%s", payloads, created, instance + ) + for payload in payloads: + if address := payload.get("address"): + logger.debug( + "Triggering send_webhook and send_notification tasks for created=%s object=%s", + created, + instance, + ) + send_webhook_task.apply_async( + args=(address, payload), priority=2 # Almost lowest priority + ) # Almost the lowest priority + send_event_to_queue_task.delay(payload) + else: + logger.debug( + "Notification will not be sent for created=%s object=%s", + created, + instance, + ) diff --git a/safe_transaction_service/safe_messages/tests/test_signals.py b/safe_transaction_service/safe_messages/tests/test_signals.py new file mode 100644 index 00000000..f4351f94 --- /dev/null +++ b/safe_transaction_service/safe_messages/tests/test_signals.py @@ -0,0 +1,95 @@ +from unittest import mock + +from django.db.models.signals import post_save +from django.test import TestCase + +import factory + +from gnosis.eth import EthereumNetwork +from gnosis.safe.tests.safe_test_case import SafeTestCaseMixin + +from safe_transaction_service.events.tasks import send_event_to_queue_task +from safe_transaction_service.history.models import WebHookType +from safe_transaction_service.history.tasks import send_webhook_task +from safe_transaction_service.safe_messages.models import ( + SafeMessage, + SafeMessageConfirmation, +) +from safe_transaction_service.safe_messages.signals import process_webhook +from safe_transaction_service.safe_messages.tests.factories import ( + SafeMessageConfirmationFactory, + SafeMessageFactory, +) + + +class TestSafeMessageSignals(SafeTestCaseMixin, TestCase): + @factory.django.mute_signals(post_save) + @mock.patch.object(send_webhook_task, "apply_async") + @mock.patch.object(send_event_to_queue_task, "delay") + def test_process_webhook( + self, + send_event_to_queue_task_mock, + webhook_task_mock, + ): + safe_address = self.deploy_test_safe().address + safe_message = SafeMessageFactory(safe=safe_address) + process_webhook(SafeMessage, safe_message, True) + message_created_payload = { + "address": safe_address, + "type": WebHookType.MESSAGE_CREATED.name, + "messageHash": safe_message.message_hash, + "chainId": str(EthereumNetwork.GANACHE.value), + } + webhook_task_mock.assert_called_with( + args=(safe_address, message_created_payload), priority=2 + ) + send_event_to_queue_task_mock.assert_called_with(message_created_payload) + + message_confirmation_payload = { + "address": safe_address, + "type": WebHookType.MESSAGE_CONFIRMATION.name, + "messageHash": safe_message.message_hash, + "chainId": str(EthereumNetwork.GANACHE.value), + } + safe_message_confirmation = SafeMessageConfirmationFactory( + safe_message=safe_message + ) + process_webhook(SafeMessageConfirmation, safe_message_confirmation, True) + webhook_task_mock.assert_called_with( + args=(safe_address, message_confirmation_payload), priority=2 + ) + send_event_to_queue_task_mock.assert_called_with(message_confirmation_payload) + + @mock.patch.object(send_webhook_task, "apply_async") + @mock.patch.object(send_event_to_queue_task, "delay") + def test_signals_are_correctly_fired( + self, + send_event_to_queue_task_mock, + webhook_task_mock, + ): + safe_address = self.deploy_test_safe().address + # Create a confirmation should fire a signal and webhooks should be sended + safe_message = SafeMessageFactory(safe=safe_address) + message_created_payload = { + "address": safe_address, + "type": WebHookType.MESSAGE_CREATED.name, + "messageHash": safe_message.message_hash, + "chainId": str(EthereumNetwork.GANACHE.value), + } + webhook_task_mock.assert_called_with( + args=(safe_address, message_created_payload), priority=2 + ) + send_event_to_queue_task_mock.assert_called_with(message_created_payload) + + message_confirmation_payload = { + "address": safe_address, + "type": WebHookType.MESSAGE_CONFIRMATION.name, + "messageHash": safe_message.message_hash, + "chainId": str(EthereumNetwork.GANACHE.value), + } + # Create a confirmation should fire a signal and webhooks should be sended + SafeMessageConfirmationFactory(safe_message=safe_message) + webhook_task_mock.assert_called_with( + args=(safe_address, message_confirmation_payload), priority=2 + ) + send_event_to_queue_task_mock.assert_called_with(message_confirmation_payload) diff --git a/safe_transaction_service/safe_messages/views.py b/safe_transaction_service/safe_messages/views.py index fa1139b7..91a0205b 100644 --- a/safe_transaction_service/safe_messages/views.py +++ b/safe_transaction_service/safe_messages/views.py @@ -95,6 +95,14 @@ def get(self, request, address, *args, **kwargs): responses={201: "Created"}, ) def post(self, request, address, *args, **kwargs): + """ + Create a new signed message for a Safe. Message can be: + - A ``string``, so ``EIP191`` will be used to get the hash. + - An ``EIP712`` ``object``. + + Hash will be calculated from the provided ``message``. Sending a raw ``hash`` will not be accepted, + service needs to derive it itself. + """ if not fast_is_checksum_address(address): return Response( status=status.HTTP_422_UNPROCESSABLE_ENTITY, diff --git a/safe_transaction_service/tokens/clients/coingecko_client.py b/safe_transaction_service/tokens/clients/coingecko_client.py index c2ee2dc4..fa631073 100644 --- a/safe_transaction_service/tokens/clients/coingecko_client.py +++ b/safe_transaction_service/tokens/clients/coingecko_client.py @@ -33,6 +33,7 @@ class CoingeckoClient(BaseHTTPClient): EthereumNetwork.POLYGON: "polygon-pos", EthereumNetwork.POLYGON_ZKEVM: "polygon-zkevm", EthereumNetwork.CELO_MAINNET: "celo", + EthereumNetwork.METER_MAINNET: "meter", } base_url = "https://api.coingecko.com/" @@ -146,6 +147,9 @@ def get_kcs_usd_price(self) -> float: def get_metis_usd_price(self) -> float: return self.get_price("metis-token") - + def get_btc_usd_price(self) -> float: return self.get_price("bitcoin") + + def get_mtr_usd_price(self) -> float: + return self.get_price("meter-stable") diff --git a/safe_transaction_service/tokens/clients/kucoin_client.py b/safe_transaction_service/tokens/clients/kucoin_client.py index 80fed71c..b4aa4a46 100644 --- a/safe_transaction_service/tokens/clients/kucoin_client.py +++ b/safe_transaction_service/tokens/clients/kucoin_client.py @@ -80,3 +80,10 @@ def get_xdc_usd_price(self) -> float: :raises: CannotGetPrice """ return self._get_price("XDC-USDT") + + def get_ftm_usd_price(self) -> float: + """ + :return: current USD price for FTM Token + :raises: CannotGetPrice + """ + return self._get_price("FTM-USDT") diff --git a/safe_transaction_service/tokens/migrations/0010_tokenlist.py b/safe_transaction_service/tokens/migrations/0010_tokenlist.py index 66426ac1..be7dd604 100644 --- a/safe_transaction_service/tokens/migrations/0010_tokenlist.py +++ b/safe_transaction_service/tokens/migrations/0010_tokenlist.py @@ -1,6 +1,4 @@ # Generated by Django 4.1.3 on 2023-01-24 12:31 -from typing import Dict, Optional, Tuple - from django.db import migrations, models from gnosis.eth import EthereumNetwork diff --git a/safe_transaction_service/tokens/services/price_service.py b/safe_transaction_service/tokens/services/price_service.py index abb0c05d..26efe033 100644 --- a/safe_transaction_service/tokens/services/price_service.py +++ b/safe_transaction_service/tokens/services/price_service.py @@ -191,6 +191,9 @@ def get_cronos_usd_price(self) -> float: def get_xdc_usd_price(self) -> float: return self.kucoin_client.get_xdc_usd_price() + def get_ftm_usd_price(self) -> float: + return self.kucoin_client.get_ftm_usd_price() + def get_kcs_usd_price(self) -> float: try: return self.kucoin_client.get_kcs_usd_price() @@ -200,6 +203,12 @@ def get_kcs_usd_price(self) -> float: def get_btc_usd_price(self) -> float: return self.coingecko_client.get_btc_usd_price() + def get_btc_usd_price(self) -> float: + return self.coingecko_client.get_btc_usd_price() + + def get_mtr_usd_price(self) -> float: + return self.coingecko_client.get_mtr_usd_price() + @cachedmethod(cache=operator.attrgetter("cache_ether_usd_price")) @cache_memoize(60 * 30, prefix="balances-get_ether_usd_price") # 30 minutes def get_ether_usd_price(self) -> float: @@ -293,8 +302,18 @@ def get_native_coin_usd_price(self) -> float: ): return self.get_xdc_usd_price() elif self.ethereum_network in ( - EthereumNetwork.RSK_TESTNET, + EthereumNetwork.METER_MAINNET, + EthereumNetwork.METER_TESTNET, + ): + return self.coingecko_client.get_mtr_usd_price() + elif self.ethereum_network in ( + EthereumNetwork.FANTOM_OPERA, + EthereumNetwork.FANTOM_TESTNET, + ): + return self.get_ftm_usd_price() + elif self.ethereum_network in ( EthereumNetwork.RSK_MAINNET, + EthereumNetwork.RSK_TESTNET, ): return self.get_btc_usd_price() else: @@ -358,7 +377,7 @@ def get_token_eth_value(self, token_address: ChecksumAddress) -> float: def get_token_usd_price(self, token_address: ChecksumAddress) -> float: """ :param token_address: - :return: usd value for a given `token_address` using Curve, if not use Coingecko as last resource + :return: usd value for a given `token_address` using Coingecko """ if self.coingecko_client.supports_network(self.ethereum_network): try: @@ -457,7 +476,8 @@ def get_token_eth_price_from_oracles(self, token_address: ChecksumAddress) -> fl """ return ( self.get_token_eth_value(token_address) - or self.get_token_usd_price(token_address) / self.get_ether_usd_price() + or self.get_token_usd_price(token_address) + / self.get_native_coin_usd_price() ) def get_token_eth_price_from_composed_oracles( diff --git a/safe_transaction_service/tokens/tests/migrations/test_migrations.py b/safe_transaction_service/tokens/tests/migrations/test_migrations.py index 93205bf3..dd6eab6d 100644 --- a/safe_transaction_service/tokens/tests/migrations/test_migrations.py +++ b/safe_transaction_service/tokens/tests/migrations/test_migrations.py @@ -1,29 +1,28 @@ -import json +import importlib from unittest import mock from unittest.mock import MagicMock from django.test import TestCase -from django.utils import timezone from django_test_migrations.migrator import Migrator -from eth_account import Account -from web3 import Web3 from gnosis.eth import EthereumNetwork +# https://github.com/python/cpython/issues/100950 +token_list_migration = importlib.import_module( + "safe_transaction_service.tokens.migrations.0010_tokenlist" +) + class TestMigrations(TestCase): def setUp(self) -> None: self.migrator = Migrator(database="default") @mock.patch( - "safe_transaction_service.tokens.migrations.0010_tokenlist.get_ethereum_network", + f"{__name__}.token_list_migration.get_ethereum_network", return_value=EthereumNetwork.MAINNET, ) def test_migration_forward_0010(self, get_ethereum_network_mock: MagicMock): - """ - Add - """ old_state = self.migrator.apply_initial_migration( ("tokens", "0009_token_token_spam_idx") ) @@ -37,7 +36,7 @@ def test_migration_forward_0010(self, get_ethereum_network_mock: MagicMock): self.assertEqual(token_list.description, "Coingecko") @mock.patch( - "safe_transaction_service.tokens.migrations.0010_tokenlist.get_ethereum_network", + f"{__name__}.token_list_migration.get_ethereum_network", return_value=EthereumNetwork.AIOZ_NETWORK, ) def test_migration_forward_0010_network_without_data( diff --git a/safe_transaction_service/tokens/tests/test_price_service.py b/safe_transaction_service/tokens/tests/test_price_service.py index bc0379ec..8e4c519e 100644 --- a/safe_transaction_service/tokens/tests/test_price_service.py +++ b/safe_transaction_service/tokens/tests/test_price_service.py @@ -162,6 +162,16 @@ def test_get_native_coin_usd_price(self): price_service.cache_native_coin_usd_price.clear() self.assertEqual(price_service.get_xdc_usd_price(), 7.7) + # Meter + with mock.patch.object(CoingeckoClient, "get_mtr_usd_price", return_value=8.0): + price_service.ethereum_network = EthereumNetwork.METER_MAINNET + price_service.cache_native_coin_usd_price.clear() + self.assertEqual(price_service.get_mtr_usd_price(), 8.0) + + price_service.ethereum_network = EthereumNetwork.METER_TESTNET + price_service.cache_native_coin_usd_price.clear() + self.assertEqual(price_service.get_mtr_usd_price(), 8.0) + @mock.patch.object(CoingeckoClient, "get_bnb_usd_price", return_value=3.0) @mock.patch.object(KucoinClient, "get_bnb_usd_price", return_value=5.0) def test_get_binance_usd_price( @@ -274,3 +284,22 @@ def test_get_token_eth_price_from_composed_oracles( curve_price = "0xe7ce624c00381b4b7abb03e633fb4acac4537dd6" eth_price = price_service.get_token_eth_price_from_composed_oracles(curve_price) self.assertEqual(eth_price, 1.0) + + def test_get_token_eth_price_from_oracles(self): + mainnet_node = just_test_if_mainnet_node() + price_service = PriceService(EthereumClient(mainnet_node), self.redis) + gno_token_address = "0x6810e776880C02933D47DB1b9fc05908e5386b96" + token_eth_value = price_service.get_token_eth_price_from_oracles( + gno_token_address + ) + self.assertIsInstance(token_eth_value, float) + self.assertGreater(token_eth_value, 0) + with mock.patch.object( + PriceService, "get_token_eth_value", autospec=True, return_value=0 + ): + token_eth_value_from_coingecko = ( + price_service.get_token_eth_price_from_oracles(gno_token_address) + ) + self.assertAlmostEqual( + token_eth_value, token_eth_value_from_coingecko, delta=0.1 + ) diff --git a/safe_transaction_service/utils/redis.py b/safe_transaction_service/utils/redis.py index 83693432..33269709 100644 --- a/safe_transaction_service/utils/redis.py +++ b/safe_transaction_service/utils/redis.py @@ -1,10 +1,19 @@ +import copyreg +import logging from functools import cache from django.conf import settings from redis import Redis +logger = logging.getLogger(__name__) + @cache def get_redis() -> Redis: + logger.info("Opening connection to Redis") + + # Encode memoryview for redis when using pickle + copyreg.pickle(memoryview, lambda val: (memoryview, (bytes(val),))) + return Redis.from_url(settings.REDIS_URL) diff --git a/safe_transaction_service/utils/tasks.py b/safe_transaction_service/utils/tasks.py index dc425858..df88d1ea 100644 --- a/safe_transaction_service/utils/tasks.py +++ b/safe_transaction_service/utils/tasks.py @@ -56,9 +56,8 @@ def shutdown_worker(): def only_one_running_task( task: CeleryTask, lock_name_suffix: Optional[str] = None, - blocking_timeout: int = 1, lock_timeout: Optional[int] = LOCK_TIMEOUT, - gevent: bool = True, + gevent_enabled: bool = True, ): """ Ensures one running task at the same, using `task` name as a unique key @@ -66,11 +65,9 @@ def only_one_running_task( :param task: CeleryTask :param lock_name_suffix: A suffix for the lock name, in the case that the same task can be run at the same time when it has different arguments - :param blocking_timeout: Waiting blocking timeout, it should be as small as possible to the worker can release - the task :param lock_timeout: How long the lock will be stored, in case worker is halted so key is not stored forever in Redis - :param gevent: If `True`, `close_gevent_db_connection` will be called at the end + :param gevent_enabled: If `True`, `close_gevent_db_connection` will be called at the end :return: Instance of redis `Lock` :raises: LockError if lock cannot be acquired """ @@ -80,14 +77,12 @@ def only_one_running_task( lock_name = f"locks:tasks:{task.name}" if lock_name_suffix: lock_name += f":{lock_name_suffix}" - with redis.lock( - lock_name, blocking_timeout=blocking_timeout, timeout=lock_timeout - ) as lock: + with redis.lock(lock_name, blocking=False, timeout=lock_timeout) as lock: try: ACTIVE_LOCKS.add(lock_name) yield lock ACTIVE_LOCKS.remove(lock_name) finally: - if gevent: + if gevent_enabled: # Needed for django-db-geventpool close_gevent_db_connection() diff --git a/safe_transaction_service/utils/utils.py b/safe_transaction_service/utils/utils.py index 04571f45..abe971d6 100644 --- a/safe_transaction_service/utils/utils.py +++ b/safe_transaction_service/utils/utils.py @@ -1,3 +1,4 @@ +import socket from functools import wraps from itertools import islice from typing import Any, Iterable, List, Union @@ -5,7 +6,7 @@ from django.core.signals import request_finished from django.db import connection -from gevent.monkey import saved +import gevent.socket class FixedSizeDict(dict): @@ -53,7 +54,7 @@ def chunks_iterable(iterable: Iterable[Any], n: int) -> Iterable[Iterable[Any]]: def running_on_gevent() -> bool: - return "sys" in saved + return socket.socket is gevent.socket.socket def close_gevent_db_connection() -> None: diff --git a/setup.cfg b/setup.cfg index 530c47ea..570c062d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,11 +2,11 @@ max-line-length = 88 select = C,E,F,W,B,B950 extend-ignore = E203,E501,F841,W503 -exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,venv +exclude = .tox,.git,*/static/CACHE/*,docs,node_modules,venv [pycodestyle] max-line-length = 120 -exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,venv +exclude = .tox,.git,*/static/CACHE/*,docs,node_modules,venv [isort] profile = black