Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add healthcheck #270

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions packages/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
"contract/valory/mech_marketplace/0.1.0": "bafybeigulghv7bvx4evl6kmogvv6gjxovyt22tkwlbdmeretmop2crdmem",
"connection/valory/websocket_client/0.1.0": "bafybeic4ag3gqc7kd3k2o3pucddj2odck5yrfbgmwh5veqny7zao5qayli",
"skill/valory/contract_subscription/0.1.0": "bafybeiefuemlp75obgpxrp6iuleb3hn6vcviwh5oetk5djbuprf4xsmgjy",
"skill/valory/mech_abci/0.1.0": "bafybeihshsjc2pngbaglptgeijgpcd6ns7darhhp75ryhfao5t4uuw3hym",
"skill/valory/task_submission_abci/0.1.0": "bafybeihzghxmvmylj2gehkfbleek5e6q3qyggfmnpve2sov437nyzvu5gy",
"skill/valory/task_execution/0.1.0": "bafybeiafc6rgv46qd2hfl5fxqdegjyyb6f2fxhqxe33g6j342lutbb6fsm",
"skill/valory/mech_abci/0.1.0": "bafybeiesfwxmlidywoifnkhuvjlqaknr76lsqprbkzfxdpb4x5op5sgjfe",
"skill/valory/task_submission_abci/0.1.0": "bafybeigxi5we2lfaqhy2vlt7ktp7gnjx4k3e2poy4y33jmjhfyb5fzmqhq",
"skill/valory/task_execution/0.1.0": "bafybeibv46pbo2qntbensudrrkx7dlrnmvufvijpqzqyn3wrztxmr3s6pq",
"skill/valory/websocket_client/0.1.0": "bafybeif7rrvsu6z4evqkhblxj3u6wwv2eqou576hgkyoehxuj7cntw7o2m",
"skill/valory/subscription_abci/0.1.0": "bafybeifilanuxfvuypcccjku7nphurgp27i2iwncdmug3in6xuzfmslgaq",
"agent/valory/mech/0.1.0": "bafybeifvo3s5mjidovenei6wwozejatfh4kjcapqnipuou4q3v6ff6ycl4",
"service/valory/mech/0.1.0": "bafybeic7hnfs3gum33ndinhyzablfa4yf5y4lkvivuxg4bgvhz5zdxn6oa",
"service/valory/mech_quickstart/0.1.0": "bafybeiezswb5cbhuauk23kd6gsdbwnp2ndjysgc4uabk44k3vy2lmwz5mu"
"agent/valory/mech/0.1.0": "bafybeidwavbqnqvd7qudym2hryryitzpbr75hmfm4yncf2pw3yrdt2uyrm",
"service/valory/mech/0.1.0": "bafybeid2jdy7ez75gtll5d6lu6wqy6dzdh3bl7gxjgc33hpquftipv53rm",
"service/valory/mech_quickstart/0.1.0": "bafybeidawm5ni2nkojx3mytwjbkei6zge7pd6sfkonsgbg2yakhawfwe3u"
},
"third_party": {
"protocol/valory/default/1.0.0": "bafybeifqcqy5hfbnd7fjv4mqdjrtujh2vx3p2xhe33y67zoxa6ph7wdpaq",
Expand Down
9 changes: 5 additions & 4 deletions packages/valory/agents/mech/aea-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ skills:
- valory/abstract_abci:0.1.0:bafybeieo7pe5wqjphs5izpz5aujjbubymlxub62b3rhx6yglu65ibalffu
- valory/abstract_round_abci:0.1.0:bafybeibiw4oqwqvo4jccwz5fb73iardzychgvcl66tceiildohoju2ikti
- valory/contract_subscription:0.1.0:bafybeiefuemlp75obgpxrp6iuleb3hn6vcviwh5oetk5djbuprf4xsmgjy
- valory/mech_abci:0.1.0:bafybeihshsjc2pngbaglptgeijgpcd6ns7darhhp75ryhfao5t4uuw3hym
- valory/mech_abci:0.1.0:bafybeiesfwxmlidywoifnkhuvjlqaknr76lsqprbkzfxdpb4x5op5sgjfe
- valory/registration_abci:0.1.0:bafybeib3n6vqkfbrcubcbliebjnuwyywdinxkbzt76n6gbn2kg7ace47dq
- valory/reset_pause_abci:0.1.0:bafybeihkj6lmaypspyxe5qqrjgnolyck62pyvqoylr24ab6ue4steqcw7e
- valory/subscription_abci:0.1.0:bafybeifilanuxfvuypcccjku7nphurgp27i2iwncdmug3in6xuzfmslgaq
- valory/task_execution:0.1.0:bafybeiafc6rgv46qd2hfl5fxqdegjyyb6f2fxhqxe33g6j342lutbb6fsm
- valory/task_submission_abci:0.1.0:bafybeihzghxmvmylj2gehkfbleek5e6q3qyggfmnpve2sov437nyzvu5gy
- valory/task_execution:0.1.0:bafybeibv46pbo2qntbensudrrkx7dlrnmvufvijpqzqyn3wrztxmr3s6pq
- valory/task_submission_abci:0.1.0:bafybeigxi5we2lfaqhy2vlt7ktp7gnjx4k3e2poy4y33jmjhfyb5fzmqhq
- valory/termination_abci:0.1.0:bafybeifi2uodnrjsrivj53g3sjutocmyusbx6mlsb6oanqdyt2mfbyvusy
- valory/transaction_settlement_abci:0.1.0:bafybeigh2vkt74jrad5gtsczrgqcuhcqe7jkgjy7jdw56yamlzwwnaymjy
- valory/websocket_client:0.1.0:bafybeif7rrvsu6z4evqkhblxj3u6wwv2eqou576hgkyoehxuj7cntw7o2m
Expand Down Expand Up @@ -237,4 +237,5 @@ type: connection
config:
host: ${str:0.0.0.0}
target_skill_id: valory/mech_abci:0.1.0
is_abstract: true
port: ${int:9999}
is_abstract: false
2 changes: 1 addition & 1 deletion packages/valory/services/mech/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license: Apache-2.0
fingerprint:
README.md: bafybeif7ia4jdlazy6745ke2k2x5yoqlwsgwr6sbztbgqtwvs3ndm2p7ba
fingerprint_ignore_patterns: []
agent: valory/mech:0.1.0:bafybeifvo3s5mjidovenei6wwozejatfh4kjcapqnipuou4q3v6ff6ycl4
agent: valory/mech:0.1.0:bafybeidwavbqnqvd7qudym2hryryitzpbr75hmfm4yncf2pw3yrdt2uyrm
number_of_agents: 4
deployment:
agent:
Expand Down
2 changes: 1 addition & 1 deletion packages/valory/services/mech_quickstart/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license: Apache-2.0
fingerprint:
README.md: bafybeiaqaedhfzjxxdfxtygjulorvd4x2h3cbwtiw3xgbigjgsc6qfn7zy
fingerprint_ignore_patterns: []
agent: valory/mech:0.1.0:bafybeifvo3s5mjidovenei6wwozejatfh4kjcapqnipuou4q3v6ff6ycl4
agent: valory/mech:0.1.0:bafybeidwavbqnqvd7qudym2hryryitzpbr75hmfm4yncf2pw3yrdt2uyrm
number_of_agents: 1
deployment:
agent:
Expand Down
71 changes: 70 additions & 1 deletion packages/valory/skills/mech_abci/handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
# Copyright 2023-2024 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

import json
import re
import time
from datetime import datetime
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple, Union, cast
Expand Down Expand Up @@ -67,6 +68,11 @@
TendermintHandler = BaseTendermintHandler
IpfsHandler = BaseIpfsHandler

LAST_SUCCESSFUL_READ = "last_successful_read"
LAST_SUCCESSFUL_EXECUTED_TASK = "last_successful_executed_task"
WAS_LAST_READ_SUCCESSFUL = "was_last_read_successful"
LAST_TX = "last_tx"


class HttpCode(Enum):
"""Http codes"""
Expand All @@ -89,6 +95,32 @@ class HttpHandler(BaseHttpHandler):

SUPPORTED_PROTOCOL = HttpMessage.protocol_id

@property
def last_successful_read(self) -> Optional[Tuple[int, float]]:
"""Get the last successful read."""
return cast(
Optional[Tuple[int, float]],
self.context.shared_state.get(LAST_SUCCESSFUL_READ),
)

@property
def last_successful_executed_task(self) -> Optional[Tuple[int, float]]:
"""Get the last successful executed task."""
return cast(
Optional[Tuple[int, float]],
self.context.shared_state.get(LAST_SUCCESSFUL_EXECUTED_TASK),
)

@property
def was_last_read_successful(self) -> bool:
"""Get the last read status."""
return self.context.shared_state.get(WAS_LAST_READ_SUCCESSFUL) is not False

@property
def last_tx(self) -> Optional[Tuple[str, float]]:
"""Get the last transaction."""
return cast(Optional[Tuple[str, float]], self.context.shared_state.get(LAST_TX))

def setup(self) -> None:
"""Implement the setup."""

Expand Down Expand Up @@ -308,6 +340,23 @@ def _handle_get_health(
r.round_id for r in round_sequence._abci_app._previous_rounds[-10:]
]

# ensure we are delivering
grace_period = 300 # 5 min
last_executed_task = (
self.last_successful_executed_task[1]
if self.last_successful_executed_task
else time.time() + grace_period * 2
)
last_tx_made = self.last_tx[1] if self.last_tx else time.time()
we_are_delivering = last_executed_task > last_tx_made + grace_period

# ensure we can get new reqs
last_successful_read = (
self.last_successful_read[1] if self.last_successful_read else time.time()
)
grace_period = 300 # 5 min
we_can_get_new_reqs = last_successful_read > time.time() - grace_period

data = {
"seconds_since_last_transition": seconds_since_last_transition,
"is_tm_healthy": not is_tm_unhealthy,
Expand All @@ -316,6 +365,26 @@ def _handle_get_health(
"current_round": current_round,
"previous_rounds": previous_rounds,
"is_transitioning_fast": is_transitioning_fast,
"last_successful_read": {
"block_number": self.last_successful_read[0],
"timestamp": self.last_successful_read[1],
}
if self.last_successful_read
else None,
"last_successful_executed_task": {
"request_id": self.last_successful_executed_task[0],
"timestamp": self.last_successful_executed_task[1],
}
if self.last_successful_executed_task
else None,
"was_last_read_successful": self.was_last_read_successful,
"last_tx": {
"tx_hash": self.last_tx[0],
"timestamp": self.last_tx[1],
}
if self.last_tx
else None,
"is_ok": (we_are_delivering and we_can_get_new_reqs),
}

self._send_ok_response(http_msg, http_dialogue, data)
4 changes: 2 additions & 2 deletions packages/valory/skills/mech_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fingerprint:
composition.py: bafybeiaorp75iva5xgl4ebk3lg7oenqmd6wg2dxlm33oserb7aszyujml4
dialogues.py: bafybeifhydd6xmstbh2jx5igj33upip5a3hhlcaxttfsc77heszqmru7ri
fsm_specification.yaml: bafybeib5yne2ke3oc4amgehhn75vajexr3sedehdzmuabhyrovfqpmuipe
handlers.py: bafybeibfsyvno2qgcftlftjmhj66aiurmcdqwfj2ac7jm44z7kwwk6illu
handlers.py: bafybeidldql7dchykfm322t4rsjhnbrejirjolke74iq2et3gu7u6uap6q
models.py: bafybeigpimz5vhgzelhc7c3ipo56wh2o7d7whyqcjd2kjigtxos5d6bwqa
fingerprint_ignore_patterns: []
connections:
Expand All @@ -23,7 +23,7 @@ skills:
- valory/abstract_round_abci:0.1.0:bafybeibiw4oqwqvo4jccwz5fb73iardzychgvcl66tceiildohoju2ikti
- valory/registration_abci:0.1.0:bafybeib3n6vqkfbrcubcbliebjnuwyywdinxkbzt76n6gbn2kg7ace47dq
- valory/reset_pause_abci:0.1.0:bafybeihkj6lmaypspyxe5qqrjgnolyck62pyvqoylr24ab6ue4steqcw7e
- valory/task_submission_abci:0.1.0:bafybeihzghxmvmylj2gehkfbleek5e6q3qyggfmnpve2sov437nyzvu5gy
- valory/task_submission_abci:0.1.0:bafybeigxi5we2lfaqhy2vlt7ktp7gnjx4k3e2poy4y33jmjhfyb5fzmqhq
- valory/termination_abci:0.1.0:bafybeifi2uodnrjsrivj53g3sjutocmyusbx6mlsb6oanqdyt2mfbyvusy
- valory/transaction_settlement_abci:0.1.0:bafybeigh2vkt74jrad5gtsczrgqcuhcqe7jkgjy7jdw56yamlzwwnaymjy
- valory/subscription_abci:0.1.0:bafybeifilanuxfvuypcccjku7nphurgp27i2iwncdmug3in6xuzfmslgaq
Expand Down
10 changes: 10 additions & 0 deletions packages/valory/skills/task_execution/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from packages.valory.protocols.ipfs import IpfsMessage
from packages.valory.protocols.ipfs.dialogues import IpfsDialogue
from packages.valory.protocols.ledger_api import LedgerApiMessage
from packages.valory.skills.task_execution.handlers import LAST_SUCCESSFUL_EXECUTED_TASK
from packages.valory.skills.task_execution.models import Params
from packages.valory.skills.task_execution.utils.apis import KeyChain
from packages.valory.skills.task_execution.utils.benchmarks import TokenCounterCallback
Expand Down Expand Up @@ -116,6 +117,13 @@ def request_id_to_num_timeouts(self) -> Dict[int, int]:
"""Maps the request id to the number of times it has timed out."""
return self.params.request_id_to_num_timeouts

def set_last_executed_task(self, request_id: int) -> None:
"""Set the last executed task."""
self.context.shared_state[LAST_SUCCESSFUL_EXECUTED_TASK] = (
request_id,
time.time(),
)

def count_timeout(self, request_id: int) -> None:
"""Increase the timeout for a request."""
self.request_id_to_num_timeouts[request_id] += 1
Expand Down Expand Up @@ -552,6 +560,8 @@ def _handle_store_response(self, message: IpfsMessage, dialogue: Dialogue) -> No
request_id=str(req_id),
data=ipfs_hash,
)
# for health check metrics
self.set_last_executed_task(req_id)
done_task = cast(Dict[str, Any], self._done_task)
task_result = to_multihash(ipfs_hash)
cost = get_cost_for_done_task(done_task)
Expand Down
22 changes: 20 additions & 2 deletions packages/valory/skills/task_execution/handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
# Copyright 2023-2024 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,8 @@

"""This package contains a scaffold of a handler."""
import threading
from typing import Any, Dict, List, cast
import time
from typing import Any, Dict, List, Optional, cast

from aea.protocols.base import Message
from aea.skills.base import Handler
Expand All @@ -37,6 +38,9 @@
PENDING_TASKS = "pending_tasks"
DONE_TASKS = "ready_tasks"
DONE_TASKS_LOCK = "lock"
LAST_SUCCESSFUL_READ = "last_successful_read"
LAST_SUCCESSFUL_EXECUTED_TASK = "last_successful_executed_task"
WAS_LAST_READ_SUCCESSFUL = "was_last_read_successful"

LEDGER_API_ADDRESS = str(LEDGER_CONNECTION_PUBLIC_ID)

Expand Down Expand Up @@ -131,6 +135,14 @@ def pending_tasks(self) -> List[Dict[str, Any]]:
"""Get pending_tasks."""
return self.context.shared_state[PENDING_TASKS]

def set_last_successful_read(self, block_number: Optional[int]) -> None:
"""Set the last successful read."""
self.context.shared_state[LAST_SUCCESSFUL_READ] = (block_number, time.time())

def set_was_last_read_successful(self, was_successful: bool) -> None:
"""Set the last successful read."""
self.context.shared_state[WAS_LAST_READ_SUCCESSFUL] = was_successful

def handle(self, message: Message) -> None:
"""
Implement the reaction to a contract message.
Expand All @@ -140,6 +152,8 @@ def handle(self, message: Message) -> None:
self.context.logger.info(f"Received message: {message}")
contract_api_msg = cast(ContractApiMessage, message)
if contract_api_msg.performative != ContractApiMessage.Performative.STATE:
# for healthcheck metrics
self.set_was_last_read_successful(False)
self.context.logger.warning(
f"Contract API Message performative not recognized: {contract_api_msg.performative}"
)
Expand All @@ -155,10 +169,14 @@ def _handle_get_undelivered_reqs(self, body: Dict[str, Any]) -> None:
"""Handle get undelivered reqs."""
reqs = body.get("data", [])
if len(reqs) == 0:
# for healthcheck metrics
self.set_last_successful_read(self.params.from_block)
return

self.params.from_block = max([req["block_number"] for req in reqs]) + 1
self.context.logger.info(f"Received {len(reqs)} new requests.")
# for healthcheck metrics
self.set_last_successful_read(self.params.from_block)
reqs = [
req
for req in reqs
Expand Down
4 changes: 2 additions & 2 deletions packages/valory/skills/task_execution/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ license: Apache-2.0
aea_version: '>=1.0.0, <2.0.0'
fingerprint:
__init__.py: bafybeidqhvvlnthkbnmrdkdeyjyx2f2ab6z4xdgmagh7welqnh2v6wczx4
behaviours.py: bafybeihuja3eox24bl7kyym2hbyrkuktkhso5s7yt3cvrjn7ng73bzvvga
behaviours.py: bafybeigt442yaasazy4qlbcvyxswxvmgardufabnphknv4yrzyhauhbqae
dialogues.py: bafybeid4zxalqdlo5mw4yfbuf34hx4jp5ay5z6chm4zviwu4cj7fudtwca
handlers.py: bafybeidbt5ezj74cgfogk3w4uw4si2grlnk5g54veyumw7g5yh6gdscywu
handlers.py: bafybeigzujgdroodhgcegao4pczlpmo3v4hyj6ydbonq5wnehwp7kl77lq
models.py: bafybeicohoprd4f6rxnt6zxgwzzb3djpyk4o72bepoty4lybnf7fdpkgbu
utils/__init__.py: bafybeiccdijaigu6e5p2iruwo5mkk224o7ywedc7nr6xeu5fpmhjqgk24e
utils/apis.py: bafybeigu73lfz3g3mc6iupisrvlsp3fyl4du3oqlyajgdpfvtqypddh3w4
Expand Down
9 changes: 9 additions & 0 deletions packages/valory/skills/task_submission_abci/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
)
FILENAME = "usage"
ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"
LAST_TX = "last_tx"


class TaskExecutionBaseBehaviour(BaseBehaviour, ABC):
Expand Down Expand Up @@ -103,6 +104,12 @@ def done_tasks(self) -> List[Dict[str, Any]]:
done_tasks = deepcopy(self.context.shared_state.get(DONE_TASKS, []))
return cast(List[Dict[str, Any]], done_tasks)

def set_tx(self, last_tx: str) -> None:
"""Signal that the transaction was prepared."""
now = time.time()
# store the tx hash and the time it was stored
self.context.shared_state[LAST_TX] = (last_tx, now)

def done_tasks_lock(self) -> threading.Lock:
"""Get done_tasks_lock."""
return self.context.shared_state[DONE_TASKS_LOCK]
Expand Down Expand Up @@ -227,6 +234,8 @@ def check_last_tx_status(self) -> bool:
# ref: https://github.com/valory-xyz/open-autonomy/blob/main/packages/valory/skills/transaction_settlement_abci/rounds.py#L432-L434
try:
final_tx_hash = self.synchronized_data.final_tx_hash
# added for healthcheck purposes
self.set_tx(final_tx_hash)
except Exception as e:
self.context.logger.error(e)
return False
Expand Down
4 changes: 2 additions & 2 deletions packages/valory/skills/task_submission_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license: Apache-2.0
aea_version: '>=1.0.0, <2.0.0'
fingerprint:
__init__.py: bafybeiholqak7ltw6bbmn2c5tn3j7xgzkdlfzp3kcskiqsvmxoih6m4muq
behaviours.py: bafybeigsgqxuwoacedl5yz36riqcl2j2bdohhpkq3m2u5c66rbvsan77ee
behaviours.py: bafybeib6gtwgaodrwuam5wpm2sgdo3h2j5czfm37ct6ndupapq6jrw5sli
dialogues.py: bafybeibmac3m5u5h6ucoyjr4dazay72dyga656wvjl6z6saapluvjo54ne
fsm_specification.yaml: bafybeidtmsmpunr3t77pshd3k2s6dd6hlvhze6inu3gj7xyvlg4wi3tnuu
handlers.py: bafybeibe5n7my2vd2wlwo73sbma65epjqc7kxgtittewlylcmvnmoxtxzq
Expand All @@ -32,7 +32,7 @@ protocols:
skills:
- valory/abstract_round_abci:0.1.0:bafybeibiw4oqwqvo4jccwz5fb73iardzychgvcl66tceiildohoju2ikti
- valory/transaction_settlement_abci:0.1.0:bafybeigh2vkt74jrad5gtsczrgqcuhcqe7jkgjy7jdw56yamlzwwnaymjy
- valory/task_execution:0.1.0:bafybeiafc6rgv46qd2hfl5fxqdegjyyb6f2fxhqxe33g6j342lutbb6fsm
- valory/task_execution:0.1.0:bafybeibv46pbo2qntbensudrrkx7dlrnmvufvijpqzqyn3wrztxmr3s6pq
behaviours:
main:
args: {}
Expand Down
Loading