Skip to content

Commit

Permalink
chore: cleanup cb_broadcast_processing_logs zset and related function…
Browse files Browse the repository at this point in the history
…ality
  • Loading branch information
xadahiya committed May 31, 2023
1 parent f399fde commit 0137ac4
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 204 deletions.
24 changes: 0 additions & 24 deletions pooler/processor_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
from pooler.utils.models.settings_model import AggregateOn
from pooler.utils.rabbitmq_helpers import RabbitmqSelectLoopInteractor
from pooler.utils.redis.redis_conn import RedisPoolCache
from pooler.utils.redis.redis_keys import (
cb_broadcast_processing_logs_zset,
)
from pooler.utils.redis.redis_keys import project_finalized_data_zset
from pooler.utils.rpc import RpcHelper
from pooler.utils.snapshot_utils import warm_up_cache_for_snapshot_constructors
Expand Down Expand Up @@ -159,27 +156,6 @@ def _send_message_for_processing(self, process_unit, type_, routing_key):
),
)

update_log = {
'worker': self.name,
'update': {
'action': 'RabbitMQ.Publish',
'info': {
'routing_key': f'powerloom-backend-callback:{settings.namespace}'
f':{settings.instance_id}.{type_}',
'exchange': f'{settings.rabbitmq.setup.callbacks.exchange}:{settings.namespace}',
'msg': process_unit.dict(),
},
},
}
self.ev_loop.run_until_complete(
self._redis_conn.zadd(
cb_broadcast_processing_logs_zset.format(
process_unit.broadcastId,
),
{json.dumps(update_log): int(time.time())},
),
)

def _cache_and_forward_to_payload_commit_queue(self, dont_use_ch, method, properties, body):
event_type = method.routing_key.split('.')[-1]

Expand Down
89 changes: 1 addition & 88 deletions pooler/utils/aggregation_worker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import hashlib
import importlib
import json
import time
from typing import Callable
from typing import List
from typing import Union
Expand All @@ -26,9 +24,6 @@
from pooler.utils.models.message_models import PowerloomSnapshotFinalizedMessage
from pooler.utils.models.settings_model import AggregateOn
from pooler.utils.redis.rate_limiter import load_rate_limiter_scripts
from pooler.utils.redis.redis_keys import (
cb_broadcast_processing_logs_zset,
)


class AggregationAsyncWorker(GenericAsyncWorker):
Expand Down Expand Up @@ -150,43 +145,8 @@ async def _send_payload_commit_service_queue(
audit_stream,
epoch,
)
# TODO: standardize/unify update log data model
update_log = {
'worker': self._unique_id,
'update': {
'action': f'AggregateBuild-{audit_stream}',
'info': {
'epoch': epoch.dict(),
'status': 'Failed',
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)

else:
update_log = {
'worker': self._unique_id,
'update': {
'action': f'AggregateBuild-{audit_stream}',
'info': {
'epoch': epoch.dict(),
'status': 'Success',
'snapshot': snapshot.dict(),
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)

source_chain_details = await get_source_chain_id(
redis_conn=self._redis_conn,
Expand Down Expand Up @@ -234,25 +194,6 @@ async def _send_payload_commit_service_queue(
'Sent message to commit payload queue: {}', commit_payload,
)

update_log = {
'worker': self._unique_id,
'update': {
'action': f'AggregateCommit-{audit_stream}',
'info': {
'snapshot': payload,
'epoch': epoch.dict(),
'status': 'Success',
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)

except Exception as e:
self._logger.opt(exception=True).error(
(
Expand All @@ -262,25 +203,6 @@ async def _send_payload_commit_service_queue(
snapshot,
e,
)
update_log = {
'worker': self._unique_id,
'update': {
'action': f'AggregateCommit-{audit_stream}',
'info': {
'snapshot': payload,
'epoch': epoch.dict(),
'status': 'Failed',
'exception': e,
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)

async def _map_processed_epochs_to_adapters(
self,
Expand Down Expand Up @@ -321,15 +243,6 @@ async def _map_processed_epochs_to_adapters(
)
raise e

async def _update_broadcast_processing_status(
self, broadcast_id, update_state,
):
await self._redis_conn.hset(
cb_broadcast_processing_logs_zset.format(self.name),
broadcast_id,
json.dumps(update_state),
)

async def _on_rabbitmq_message(self, message: IncomingMessage):
task_type = message.routing_key.split('.')[-1]
if task_type not in self._task_types:
Expand Down
4 changes: 0 additions & 4 deletions pooler/utils/redis/redis_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
':{}:{}'
)

cb_broadcast_processing_logs_zset = (
'broadcastID:' + settings.namespace + ':{}:processLogs'
)

cached_block_details_at_height = (
'uniswap:blockDetail:' + settings.namespace + ':blockDetailZset'
)
Expand Down
88 changes: 0 additions & 88 deletions pooler/utils/snapshot_worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
import importlib
import json
import time
from typing import Callable
from typing import List
from typing import Union
Expand All @@ -20,9 +18,6 @@
from pooler.utils.models.message_models import PowerloomSnapshotProcessMessage
from pooler.utils.models.message_models import SnapshotBase
from pooler.utils.redis.rate_limiter import load_rate_limiter_scripts
from pooler.utils.redis.redis_keys import (
cb_broadcast_processing_logs_zset,
)


class SnapshotAsyncWorker(GenericAsyncWorker):
Expand Down Expand Up @@ -106,43 +101,7 @@ async def _send_payload_commit_service_queue(
audit_stream,
epoch,
)
# TODO: standardize/unify update log data model
update_log = {
'worker': self._unique_id,
'update': {
'action': f'SnapshotBuild-{audit_stream}',
'info': {
'epoch': epoch.dict(),
'status': 'Failed',
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)
else:
update_log = {
'worker': self._unique_id,
'update': {
'action': f'SnapshotBuild-{audit_stream}',
'info': {
'epoch': epoch.dict(),
'status': 'Success',
'snapshot': snapshot.dict(),
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)
source_chain_details = await get_source_chain_id(
redis_conn=self._redis_conn,
rpc_helper=self._anchor_rpc_helper,
Expand Down Expand Up @@ -187,25 +146,6 @@ async def _send_payload_commit_service_queue(
'Sent message to commit payload queue: {}', commit_payload,
)

update_log = {
'worker': self._unique_id,
'update': {
'action': f'SnapshotCommit-{audit_stream}',
'info': {
'snapshot': payload,
'epoch': epoch.dict(),
'status': 'Success',
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)

except Exception as e:
self._logger.opt(exception=True).error(
(
Expand All @@ -215,25 +155,6 @@ async def _send_payload_commit_service_queue(
snapshot,
e,
)
update_log = {
'worker': self._unique_id,
'update': {
'action': f'SnapshotCommit-{audit_stream}',
'info': {
'snapshot': payload,
'epoch': epoch.dict(),
'status': 'Failed',
'exception': e,
},
},
}

await self._redis_conn.zadd(
name=cb_broadcast_processing_logs_zset.format(
epoch.broadcastId,
),
mapping={json.dumps(update_log): int(time.time())},
)

async def _map_processed_epochs_to_adapters(
self,
Expand Down Expand Up @@ -269,15 +190,6 @@ async def _map_processed_epochs_to_adapters(
)
raise e

async def _update_broadcast_processing_status(
self, broadcast_id, update_state,
):
await self._redis_conn.hset(
cb_broadcast_processing_logs_zset.format(self.name),
broadcast_id,
json.dumps(update_state),
)

async def _on_rabbitmq_message(self, message: IncomingMessage):
task_type = message.routing_key.split('.')[-1]
if task_type not in self._task_types:
Expand Down

0 comments on commit 0137ac4

Please sign in to comment.