Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add funding job #58

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

"""Operate app CLI module."""

import asyncio
import logging
import os
import traceback
Expand Down Expand Up @@ -125,14 +126,16 @@ def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-st

logger = setup_logger(name="operate")
operate = OperateApp(home=home, logger=logger)
app = FastAPI()

app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "PUT", "DELETE"],
)

funding_jobs: t.Dict[str, asyncio.Task] = {}

def with_retries(f: t.Callable) -> t.Callable:
"""Retries decorator."""

Expand Down Expand Up @@ -333,6 +336,16 @@ async def _create_services(request: Request) -> JSONResponse:
manager.stake_service_on_chain(hash=service.hash)
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)

# Start funding job
logger.info(f"Starting funding job for {service}")
loop = asyncio.get_running_loop()
funding_jobs[service.hash] = loop.create_task(
operate.service_manager().funding_job(
hash=service.hash,
loop=loop,
)
)
return JSONResponse(
content=operate.service_manager().create_or_load(hash=service.hash).json
)
Expand All @@ -354,6 +367,15 @@ async def _update_services(request: Request) -> JSONResponse:
manager.stake_service_on_chain(hash=service.hash)
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)

logger.info(f"Starting funding job for {service}")
loop = asyncio.get_running_loop()
funding_jobs[service.hash] = loop.create_task(
operate.service_manager().funding_job(
hash=service.hash,
loop=loop,
)
)
return JSONResponse(content=service.json)

@app.get("/api/services/{service}")
Expand Down Expand Up @@ -443,30 +465,34 @@ async def _build_service_locally(request: Request) -> JSONResponse:
@with_retries
async def _start_service_locally(request: Request) -> JSONResponse:
"""Create a service."""
deployment = (
operate.service_manager()
.create_or_load(
request.path_params["service"],
)
.deployment
)
service = request.path_params["service"]
deployment = operate.service_manager().create_or_load(service).deployment
operate.service_manager().fund_service(service)
deployment.build(force=True)
operate.service_manager().fund_service(hash=request.path_params["service"])
deployment.start()

# Start funding job
loop = asyncio.get_running_loop()
funding_jobs[service] = loop.create_task(
operate.service_manager().funding_job(
hash=service,
loop=loop,
)
)
return JSONResponse(content=deployment.json)

@app.post("/api/services/{service}/deployment/stop")
@with_retries
async def _stop_service_locally(request: Request) -> JSONResponse:
"""Create a service."""
deployment = (
operate.service_manager()
.create_or_load(
request.path_params["service"],
)
.deployment
)
service = request.path_params["service"]
deployment = operate.service_manager().create_or_load(service).deployment
deployment.stop()

logger.info(f"Cancelling funding job for {service}")
status = funding_jobs[service].cancel()
if not status:
logger.info(f"Funding job cancellation for {service} failed")
return JSONResponse(content=deployment.json)

@app.post("/api/services/{service}/deployment/delete")
Expand Down
11 changes: 11 additions & 0 deletions operate/ledger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,24 @@
from operate.types import ChainType, LedgerType


ETHEREUM_PUBLIC_RPC = "https://ethereum.publicnode.com"
GNOSIS_PUBLIC_RPC = "https://gnosis-rpc.publicnode.com"
GOERLI_PUBLIC_RPC = "https://ethereum-goerli.publicnode.com"
SOLANA_PUBLIC_RPC = "https://api.mainnet-beta.solana.com"

ETHEREUM_RPC = os.environ.get("DEV_RPC", "https://ethereum.publicnode.com")
GNOSIS_RPC = os.environ.get(
"DEV_RPC", "https://go.getblock.io/2a1fa1ade5d547ca86eab099c35ce2a7"
)
GOERLI_RPC = os.environ.get("DEV_RPC", "https://ethereum-goerli.publicnode.com")
SOLANA_RPC = os.environ.get("DEV_RPC", "https://api.mainnet-beta.solana.com")

PUBLIC_RPCS = {
ChainType.ETHEREUM: ETHEREUM_PUBLIC_RPC,
ChainType.GNOSIS: GNOSIS_PUBLIC_RPC,
ChainType.GOERLI: GOERLI_PUBLIC_RPC,
ChainType.SOLANA: SOLANA_PUBLIC_RPC,
}

DEFAULT_RPCS = {
ChainType.ETHEREUM: ETHEREUM_RPC,
Expand Down
43 changes: 39 additions & 4 deletions operate/services/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
# ------------------------------------------------------------------------------
"""Service manager."""

import asyncio
import logging
import typing as t
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

from aea.helpers.base import IPFSHash
from aea.helpers.logging import setup_logger
from autonomy.chain.base import registry_contracts

from operate.keys import Key, KeysManager
from operate.ledger import PUBLIC_RPCS
from operate.ledger.profiles import CONTRACTS, OLAS, STAKING
from operate.services.protocol import OnChainManager
from operate.services.service import (
Expand Down Expand Up @@ -355,12 +358,21 @@ def unstake_service_on_chain(self, hash: str) -> None:
service.chain_data.staked = False
service.store()

def fund_service(self, hash: str) -> None:
def fund_service(
self,
hash: str,
rpc: t.Optional[str] = None,
agent_fund_requirement: t.Optional[float] = None,
safe_fund_requirement: t.Optional[float] = None,
) -> None:
"""Fund service if required."""
service = self.create_or_load(hash=hash)
wallet = self.wallet_manager.load(ledger_type=service.ledger_config.type)
ledger_api = wallet.ledger_api(chain_type=service.ledger_config.chain)
agent_fund_requirement = service.chain_data.user_params.fund_requirements.agent
ledger_api = wallet.ledger_api(chain_type=service.ledger_config.chain, rpc=rpc)
agent_fund_requirement = (
agent_fund_requirement
or service.chain_data.user_params.fund_requirements.agent
)

for key in service.keys:
agent_balance = ledger_api.get_balance(address=key.address)
Expand All @@ -377,7 +389,10 @@ def fund_service(self, hash: str) -> None:
)

safe_balanace = ledger_api.get_balance(service.chain_data.multisig)
safe_fund_requirement = service.chain_data.user_params.fund_requirements.safe
safe_fund_requirement = (
safe_fund_requirement
or service.chain_data.user_params.fund_requirements.safe
)
self.logger.info(f"Safe {service.chain_data.multisig} balance: {safe_balanace}")
self.logger.info(f"Required balance: {safe_fund_requirement}")
if safe_balanace < safe_fund_requirement:
Expand All @@ -392,6 +407,26 @@ def fund_service(self, hash: str) -> None:
chain_type=service.ledger_config.chain,
)

async def funding_job(
self,
hash: str,
loop: t.Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""Start a background funding job."""
loop = loop or asyncio.get_event_loop()
service = self.create_or_load(hash=hash)
with ThreadPoolExecutor() as executor:
while True:
await loop.run_in_executor(
executor,
self.fund_service,
hash,
PUBLIC_RPCS[service.ledger_config.chain],
10000000000000000,
50000000000000000,
)
await asyncio.sleep(60)

def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment:
"""
Deploy service locally
Expand Down
Loading