Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix staking #68

Merged
merged 13 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion electron/install.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const process = require('process');
const { spawnSync } = require('child_process');
const Docker = require('dockerode');

const Version = "0.1.0rc9"
const Version = "0.1.0rc12"
const OperateDirectory = `${os.homedir()}/.operate`;
const VenvDir = `${OperateDirectory}/venv`;
const VersionFile = `${OperateDirectory}/version.txt`;
Expand Down
81 changes: 38 additions & 43 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def create_app( # pylint: disable=too-many-locals, unused-argument, too-many-st

logger = setup_logger(name="operate")
operate = OperateApp(home=home, logger=logger)
funding_jobs: t.Dict[str, asyncio.Task] = {}

def pull_latest_images() -> None:
"""Pull latest docker images."""
Expand All @@ -151,6 +152,29 @@ def pull_latest_images() -> None:
tag=TENDERMINT_IMAGE_VERSION,
)

def schedule_funding_job(service: str) -> None:
"""Schedule a funding job."""
logger.info(f"Starting funding job for {service}")
if service in funding_jobs:
logger.info(f"Cancelling existing funding job for {service}")
cancel_funding_job(service=service)

loop = asyncio.get_running_loop()
funding_jobs[service] = loop.create_task(
operate.service_manager().funding_job(
hash=service,
loop=loop,
)
)

def cancel_funding_job(service: str) -> None:
"""Cancel funding job."""
if service not in funding_jobs:
return
status = funding_jobs[service].cancel()
if not status:
logger.info(f"Funding job cancellation for {service} failed")

app = FastAPI(
on_startup=[
pull_latest_images,
Expand All @@ -162,8 +186,6 @@ def pull_latest_images() -> None:
allow_methods=["GET", "POST", "PUT", "DELETE"],
)

funding_jobs: t.Dict[str, asyncio.Task] = {}

def with_retries(f: t.Callable) -> t.Callable:
"""Retries decorator."""

Expand All @@ -183,14 +205,14 @@ async def _call(request: Request) -> JSONResponse:
else:
error["error"] = str(e)
errors.append(error)
return errors
return JSONResponse(content={"errors": errors}, status_code=500)
except Exception as e: # pylint: disable=broad-except
errors.append(
{"error": str(e), "traceback": traceback.format_exc()}
)
logger.error(f"Error {e}\n{traceback.format_exc()}")
retries += 1
return {"errors": errors}
return JSONResponse(content={"errors": errors}, status_code=500)

return _call

Expand Down Expand Up @@ -364,16 +386,8 @@ async def _create_services(request: Request) -> JSONResponse:
manager.stake_service_on_chain(hash=service.hash)
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)
schedule_funding_job(service=service.hash)

# Start funding job
logger.info(f"Starting funding job for {service}")
loop = asyncio.get_running_loop()
funding_jobs[service.hash] = loop.create_task(
operate.service_manager().funding_job(
hash=service.hash,
loop=loop,
)
)
return JSONResponse(
content=operate.service_manager().create_or_load(hash=service.hash).json
)
Expand All @@ -391,19 +405,11 @@ async def _update_services(request: Request) -> JSONResponse:
)
if template.get("deploy", False):
manager = operate.service_manager()
manager.deploy_service_onchain(hash=service.hash)
manager.deploy_service_onchain(hash=service.hash, update=True)
manager.stake_service_on_chain(hash=service.hash)
manager.fund_service(hash=service.hash)
manager.deploy_service_locally(hash=service.hash)

logger.info(f"Starting funding job for {service}")
loop = asyncio.get_running_loop()
funding_jobs[service.hash] = loop.create_task(
operate.service_manager().funding_job(
hash=service.hash,
loop=loop,
)
)
schedule_funding_job(service=service.hash)
return JSONResponse(content=service.json)

@app.get("/api/services/{service}")
Expand Down Expand Up @@ -494,20 +500,13 @@ async def _build_service_locally(request: Request) -> JSONResponse:
async def _start_service_locally(request: Request) -> JSONResponse:
"""Create a service."""
service = request.path_params["service"]
deployment = operate.service_manager().create_or_load(service).deployment
operate.service_manager().fund_service(service)
deployment.build(force=True)
deployment.start()

# Start funding job
loop = asyncio.get_running_loop()
funding_jobs[service] = loop.create_task(
operate.service_manager().funding_job(
hash=service,
loop=loop,
)
)
return JSONResponse(content=deployment.json)
manager = operate.service_manager()
manager.deploy_service_onchain(hash=service)
manager.stake_service_on_chain(hash=service)
manager.fund_service(hash=service)
manager.deploy_service_locally(hash=service, force=True)
schedule_funding_job(service=service)
return JSONResponse(content=manager.create_or_load(service).deployment)

@app.post("/api/services/{service}/deployment/stop")
@with_retries
Expand All @@ -516,12 +515,8 @@ async def _stop_service_locally(request: Request) -> JSONResponse:
service = request.path_params["service"]
deployment = operate.service_manager().create_or_load(service).deployment
deployment.stop()

if service in funding_jobs:
logger.info(f"Cancelling funding job for {service}")
status = funding_jobs[service].cancel()
if not status:
logger.info(f"Funding job cancellation for {service} failed")
logger.info(f"Cancelling funding job for {service}")
cancel_funding_job(service=service)
return JSONResponse(content=deployment.json)

@app.post("/api/services/{service}/deployment/delete")
Expand Down
73 changes: 50 additions & 23 deletions operate/services/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from operate.keys import Key, KeysManager
from operate.ledger import PUBLIC_RPCS
from operate.ledger.profiles import CONTRACTS, OLAS, STAKING
from operate.services.protocol import OnChainManager
from operate.services.protocol import OnChainManager, StakingState
from operate.services.service import (
Deployment,
OnChainData,
Expand Down Expand Up @@ -137,16 +137,20 @@ def create_or_load(
on_chain_user_params=on_chain_user_params,
)

def deploy_service_onchain(self, hash: str) -> None:
def deploy_service_onchain( # pylint: disable=too-many-statements
self,
hash: str,
update: bool = False,
) -> None:
"""
Deploy as service on-chain

:param hash: Service hash
:param update: Update the existing deployment
"""
self.logger.info("Loading service")
service = self.create_or_load(hash=hash)
user_params = service.chain_data.user_params
update = service.chain_data.token != -1
keys = service.keys or [
self.keys_manager.get(self.keys_manager.create())
for _ in range(service.helper.config.number_of_agents)
Expand All @@ -158,11 +162,29 @@ def deploy_service_onchain(self, hash: str) -> None:
):
raise ValueError("No staking slots available")

if service.chain_data.token > -1:
self.logger.info("Syncing service state")
info = ocm.info(token_id=service.chain_data.token)
service.chain_data.on_chain_state = OnChainState(info["service_state"])
service.chain_data.instances = info["instances"]
service.chain_data.multisig = info["multisig"]
service.store()
self.logger.info(f"Service state: {service.chain_data.on_chain_state.name}")

if user_params.use_staking:
self.logger.info("Checking staking compatibility")
required_olas = (
user_params.olas_cost_of_bond + user_params.olas_required_to_stake
)
if service.chain_data.on_chain_state in (
OnChainState.NOTMINTED,
OnChainState.MINTED,
):
required_olas = (
user_params.olas_cost_of_bond + user_params.olas_required_to_stake
)
elif service.chain_data.on_chain_state == OnChainState.ACTIVATED:
required_olas = user_params.olas_required_to_stake
else:
required_olas = 0

balance = (
registry_contracts.erc20.get_instance(
ledger_api=ocm.ledger_api,
Expand All @@ -171,23 +193,12 @@ def deploy_service_onchain(self, hash: str) -> None:
.functions.balanceOf(ocm.crypto.address)
.call()
)

if balance < required_olas:
raise ValueError(
"You don't have enough olas to stake, "
f"required olas: {required_olas}; your balance {balance}"
)

if service.chain_data.token > -1:
self.logger.info("Syncing service state")
info = ocm.info(token_id=service.chain_data.token)
service.chain_data.on_chain_state = OnChainState(info["service_state"])
service.chain_data.instances = info["instances"]
service.chain_data.multisig = info["multisig"]

service.store()
self.logger.info(f"Service state: {service.chain_data.on_chain_state.name}")

if service.chain_data.on_chain_state == OnChainState.NOTMINTED:
self.logger.info("Minting service")
service.chain_data.token = t.cast(
Expand Down Expand Up @@ -233,6 +244,11 @@ def deploy_service_onchain(self, hash: str) -> None:
service_id=service.chain_data.token,
instances=instances,
agents=[user_params.agent_id for _ in instances],
token=(
OLAS[service.ledger_config.chain]
if user_params.use_staking
else None
),
)
service.chain_data.on_chain_state = OnChainState.REGISTERED
service.keys = keys
Expand Down Expand Up @@ -323,15 +339,20 @@ def stake_service_on_chain(self, hash: str) -> None:
self.logger.info("Cannot stake service, `use_staking` is set to false")
return

if service.chain_data.staked:
self.logger.info("Cannot stake service, it's already staked")
return

if service.chain_data.on_chain_state != OnChainState.DEPLOYED:
self.logger.info("Cannot stake service, it's not in deployed state")
return

ocm = self.get_on_chain_manager(service=service)
state = ocm.staking_status(
service_id=service.chain_data.token,
staking_contract=STAKING[service.ledger_config.chain],
)
if state == StakingState.STAKED:
service.chain_data.staked = True
service.store()
return

ocm.stake(
service_id=service.chain_data.token,
service_registry=CONTRACTS[service.ledger_config.chain]["service_registry"],
Expand All @@ -351,11 +372,17 @@ def unstake_service_on_chain(self, hash: str) -> None:
self.logger.info("Cannot unstake service, `use_staking` is set to false")
return

if not service.chain_data.staked:
ocm = self.get_on_chain_manager(service=service)
state = ocm.staking_status(
service_id=service.chain_data.token,
staking_contract=STAKING[service.ledger_config.chain],
)
if state != StakingState.STAKED:
self.logger.info("Cannot unstake service, it's not staked")
service.chain_data.staked = False
service.store()
return

ocm = self.get_on_chain_manager(service=service)
ocm.unstake(
service_id=service.chain_data.token,
staking_contract=STAKING[service.ledger_config.chain],
Expand Down
12 changes: 12 additions & 0 deletions operate/services/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,15 @@ def unstake(self, service_id: int, staking_contract: str) -> None:
service_id=service_id,
staking_contract=staking_contract,
)

def staking_status(self, service_id: int, staking_contract: str) -> StakingState:
"""Stake the service"""
self._patch()
return StakingManager(
key=self.wallet.key_path,
password=self.wallet.password,
chain_type=self.chain_type,
).status(
service_id=service_id,
staking_contract=staking_contract,
)
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@
"start": "electron .",
"build": "rm -rf dist/ && electron-builder build"
},
"version": "0.1.0-rc9"
"version": "0.1.0-rc12"
}
Loading
Loading