From 5b1cc2d079a995c38373876dd3c74b1edb0d941b Mon Sep 17 00:00:00 2001 From: Ayush-Yadav Date: Wed, 31 Jan 2024 06:55:35 +0530 Subject: [PATCH] run server to retrieve bill info --- src/bill_handler.rs | 33 ++++++++++++++++++++ src/billing_job.rs | 73 --------------------------------------------- src/handler.rs | 20 ++++++++----- src/lib.rs | 2 +- src/main.rs | 51 +++++++++++-------------------- src/model.rs | 8 ++--- src/tests.rs | 14 ++------- src/workerd.rs | 36 +++++++++++----------- 8 files changed, 87 insertions(+), 150 deletions(-) create mode 100644 src/bill_handler.rs delete mode 100644 src/billing_job.rs diff --git a/src/bill_handler.rs b/src/bill_handler.rs new file mode 100644 index 0000000..de6311b --- /dev/null +++ b/src/bill_handler.rs @@ -0,0 +1,33 @@ +use crate::model::AppState; + +use actix_web::{web::Data, HttpResponse}; +use anyhow::Context; +use k256::elliptic_curve::generic_array::sequence::Lengthen; +use serde_json::json; +use tiny_keccak::{Hasher, Keccak}; + +pub async fn bill_data(appstate: Data) -> HttpResponse { + let mut costs_gaurd = appstate.execution_costs.lock().unwrap(); + let costs_map = costs_gaurd.clone(); + + let mut hash = [0u8; 32]; + let mut hasher_gaurd = appstate.billing_hasher.lock().unwrap(); + hasher_gaurd.clone().finalize(&mut hash); + let sign = appstate + .signer + .sign_prehash_recoverable(&hash) + .context("Failed to sign requests data"); + if sign.is_err() { + return HttpResponse::InternalServerError().body(format!("{:?}", sign.unwrap_err())); + } + let (rs, v) = sign.unwrap(); + let signature = hex::encode(rs.to_bytes().append(27 + v.to_byte()).as_slice()); + + costs_gaurd.clear(); + hasher_gaurd.clone_from(&Keccak::v256()); + + HttpResponse::Ok().json(json!({ + "execution_costs": costs_map, + "signature": signature, + })) +} diff --git a/src/billing_job.rs b/src/billing_job.rs deleted file mode 100644 index 7c12e1f..0000000 --- a/src/billing_job.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use crate::model::AppState; - -use actix_web::web::Data; -use anyhow::{anyhow, Context, Error}; -use ethers::{abi::Token, prelude::*}; -use k256::elliptic_curve::generic_array::sequence::Lengthen; -use tiny_keccak::{Hasher, Keccak}; - -pub async fn billing_scheduler(appstate: Data,) -> Result { - let provider = Provider::::try_from(&appstate.rpc) - .context("Unable to connect to the RPC")? - .interval(Duration::from_millis(1000)); - let wallet: LocalWallet = appstate.operator_wallet_key - .parse() - .context("Unable to parse operator private key")?; - - let client = SignerMiddleware::new(provider, wallet); - let contract = Contract::new( - appstate.billing_contract.as_str() - .parse::
() - .context("Unable to parse contract address")?, - appstate.abi.to_owned(), - Arc::new(client)); - - let mut costs_gaurd = appstate.execution_costs.lock().await; - let mut tx_hashes = Vec::new(); - let mut amounts = Vec::new(); - for (tx_hash, amount) in costs_gaurd.clone().iter() { - let mut bytes32_tx_hash = [0u8; 32]; - hex::decode_to_slice(&tx_hash[2..], &mut bytes32_tx_hash) - .context("failed to decode tx hash to bytes")?; - tx_hashes.push(bytes32_tx_hash); - - amounts.push(U256::from(amount.to_owned())); - } - - let mut hash = [0u8; 32]; - let mut hasher_gaurd = appstate.billing_hasher.lock().await; - hasher_gaurd.clone().finalize(&mut hash); - let (rs, v) = appstate.signer - .sign_prehash_recoverable(&hash) - .context("failed to sign requests")?; - let signature = hex::encode(rs - .to_bytes() - .append(27 + v.to_byte()) - .as_slice()); - - let tx_request = contract.method::<_, H256>( - "settle", - (tx_hashes, amounts, Token::Bytes(signature.into_bytes()))) - .context("failed to build transaction request for billing")?; - let pending_tx = tx_request - .send() - .await - .context("Error while sending the billing transaction")?; - - let tx_receipt = pending_tx - .confirmations(7) - .await - .context("failed to receive confirmation for billing")?; - let tx_hash = tx_receipt - .ok_or(anyhow!("Failed to parse transaction receipt!"))? - .transaction_hash - .to_string(); - - costs_gaurd.clear(); - hasher_gaurd.clone_from(&Keccak::v256()); - - Ok(tx_hash) -} \ No newline at end of file diff --git a/src/handler.rs b/src/handler.rs index 1ab93e3..4d27796 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -5,7 +5,7 @@ use std::time::{Duration, Instant}; use crate::{cgroups, model::AppState, workerd}; use actix_web::http::{header, StatusCode}; -use actix_web::{HttpRequest, HttpResponse, Responder, web}; +use actix_web::{web, HttpRequest, HttpResponse, Responder}; use anyhow::{anyhow, Context}; use tiny_keccak::Hasher; use tokio::time::timeout; @@ -68,7 +68,6 @@ pub async fn serverless( &appstate.rpc, &appstate.contract, &appstate.billing_contract, - &appstate.abi, ) .await { @@ -272,14 +271,19 @@ pub async fn serverless( let execution_time = execution_timer_end .duration_since(execution_timer_start) .as_millis(); - + // TODO: FIX THE VALUE OF FIXED COST AND CONVERSION RATE - let execution_cost = 1 + 2*execution_time; - let mut costs_guard = appstate.execution_costs.lock().await; - let amount = costs_guard.entry(tx_hash.to_string()).or_insert(0); - *amount += execution_cost; + let execution_cost = 1 + 2 * execution_time; + + appstate + .execution_costs + .lock() + .unwrap() + .entry(tx_hash.to_owned()) + .and_modify(|cost| *cost += execution_cost) + .or_insert(execution_cost); - appstate.billing_hasher.lock().await.update(&hash); + appstate.billing_hasher.lock().unwrap().update(&hash); return response; } diff --git a/src/lib.rs b/src/lib.rs index b53d19a..5334c5a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -pub mod billing_job; +pub mod bill_handler; pub mod cgroups; pub mod handler; pub mod model; diff --git a/src/main.rs b/src/main.rs index ec2bddb..b79f345 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,13 @@ use std::collections::HashMap; -use serverless::billing_job::billing_scheduler; use serverless::cgroups::Cgroups; use serverless::model::AppState; -use actix_web::{App, HttpServer, web}; +use actix_web::{web, App, HttpServer}; use anyhow::{anyhow, Context}; use clap::Parser; -use ethers::abi::Abi; use tiny_keccak::Keccak; use tokio::fs; -use tokio::time::{Duration, interval}; /// Simple program to greet a person #[derive(Parser, Debug)] @@ -19,6 +16,9 @@ struct Args { #[clap(long, value_parser, default_value = "6001")] port: u16, + #[clap(long, value_parser)] + bill_port: u16, // TODO: ADD THE DEFAULT PORT + #[clap(long, value_parser, default_value = "./runtime/")] runtime_path: String, @@ -40,10 +40,7 @@ struct Args { contract: String, #[clap(long, value_parser)] - billing_contract: String, - - #[clap(long, value_parser)] - operator_wallet_key: String, + billing_contract: String, // TODO: ADD A DEFAULT ADDRESS #[clap(long, value_parser)] signer: String, @@ -64,6 +61,7 @@ async fn main() -> anyhow::Result<()> { // println!("{:?}", response); let port: u16 = cli.port; + let bill_port: u16 = cli.bill_port; let cgroups = Cgroups::new().context("failed to construct cgroups")?; if cgroups.free.is_empty() { @@ -77,12 +75,6 @@ async fn main() -> anyhow::Result<()> { .as_slice(), ) .context("invalid signer key")?; - - let abi_json_string = fs::read_to_string("src/contract_abi.json") - .await - .context("failed to read contract ABI")?; - let abi = serde_json::from_str::(&abi_json_string) - .context("failed to deserialize ABI")?; let app_data = web::Data::new(AppState { cgroups: cgroups.into(), @@ -92,8 +84,6 @@ async fn main() -> anyhow::Result<()> { contract: cli.contract, billing_contract: cli.billing_contract, signer: signer, - abi: abi, - operator_wallet_key: cli.operator_wallet_key, execution_costs: HashMap::new().into(), billing_hasher: Keccak::v256().into(), }); @@ -110,23 +100,18 @@ async fn main() -> anyhow::Result<()> { println!("Server started on port {}", port); - server.await?; - - tokio::spawn(async move { - // TODO: FIX THE REGULAR INTERVAL - let mut interval = interval(Duration::from_secs(600)); - loop { - interval.tick().await; - - if !app_data_clone.execution_costs.lock().await.is_empty() { - - match billing_scheduler(app_data_clone.clone()).await { - Ok(tx_hash) => println!("Transaction sent for billing: {}", tx_hash), - Err(err) => println!("Error while sending billing transaction: {:?}", err), - } - } - } - }); + let bill_server = HttpServer::new(move || { + App::new() + .app_data(app_data_clone.clone()) + .default_service(web::to(serverless::bill_handler::bill_data)) + }) + .bind(("0.0.0.0", bill_port)) + .context(format!("could not bind to port {bill_port}"))? + .run(); + + println!("Bill Server started on port {}", bill_port); + + tokio::try_join!(server, bill_server)?; Ok(()) } diff --git a/src/model.rs b/src/model.rs index 3198e99..17544a0 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,14 +1,12 @@ -use std::sync::atomic::AtomicBool; use std::collections::HashMap; +use std::sync::{atomic::AtomicBool, Mutex}; use crate::cgroups::Cgroups; -use ethers::abi::Abi; use tiny_keccak::Keccak; -use tokio::sync::Mutex; pub struct AppState { - pub cgroups: std::sync::Mutex, + pub cgroups: Mutex, // IMPORTANT: we use Relaxed ordering here since we do not need to synchronize any memory // not even with reads/writes to the same atomic (we just serve a few more requests at worst) // be very careful adding more operations associated with the draining state @@ -18,8 +16,6 @@ pub struct AppState { pub contract: String, pub billing_contract: String, pub signer: k256::ecdsa::SigningKey, - pub abi: Abi, - pub operator_wallet_key: String, pub execution_costs: Mutex>, pub billing_hasher: Mutex, } diff --git a/src/tests.rs b/src/tests.rs index 87a421f..f2537b1 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -5,7 +5,6 @@ #[cfg(test)] pub mod serverlesstest { - use std::{collections::HashMap, fs::File, io::Read, sync::atomic::AtomicBool}; use crate::cgroups::Cgroups; use crate::handler; use crate::model::AppState; @@ -15,8 +14,8 @@ pub mod serverlesstest { error::Error, http, test, web, App, }; - use ethers::abi::Abi; use serde_json::json; + use std::{collections::HashMap, sync::atomic::AtomicBool}; use tiny_keccak::Keccak; fn new_app() -> App< @@ -27,12 +26,7 @@ pub mod serverlesstest { InitError = (), Error = Error, >, - > { - let mut abi_json = String::new(); - let mut file = File::open("src/contract_abi.json").unwrap(); - file.read_to_string(&mut abi_json).unwrap(); - let abi = serde_json::from_str::(&abi_json).unwrap(); - + > { App::new() .app_data(web::Data::new(AppState { cgroups: Cgroups::new().unwrap().into(), @@ -40,10 +34,8 @@ pub mod serverlesstest { runtime_path: "./runtime/".to_owned(), rpc: "https://sepolia-rollup.arbitrum.io/rpc".to_owned(), contract: "0x44fe06d2940b8782a0a9a9ffd09c65852c0156b1".to_owned(), - billing_contract: String::new(), // TODO: ADD BILLING CONTRACT FOR TESTS - abi: abi, + billing_contract: String::new(), // TODO: ADD BILLING CONTRACT FOR TESTS signer: k256::ecdsa::SigningKey::random(&mut rand::rngs::OsRng), - operator_wallet_key: String::new(), //TODO: ADD A WALLET KEY FOR RUNNING TESTS execution_costs: HashMap::new().into(), billing_hasher: Keccak::v256().into(), })) diff --git a/src/workerd.rs b/src/workerd.rs index 014d0e7..990758e 100644 --- a/src/workerd.rs +++ b/src/workerd.rs @@ -1,6 +1,6 @@ -use std::{convert::TryFrom, sync::Arc}; use std::process::Child; use std::time::{Duration, Instant}; +use std::{convert::TryFrom, sync::Arc}; use crate::cgroups::{Cgroups, CgroupsError}; @@ -13,10 +13,10 @@ use reqwest::Client; use serde_json::{json, Value}; use thiserror::Error; use tiny_keccak::{Hasher, Keccak}; -use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use tokio::time::sleep; +use tokio::{fs, fs::File}; #[derive(Error, Debug)] pub enum ServerlessError { @@ -53,25 +53,27 @@ pub enum ServerlessError { } async fn get_current_deposit( - tx_hash: &str, - rpc: &str, + tx_hash: &str, + rpc: &str, billing_contract_add: &str, - abi: &Abi, ) -> Result { - let provider = Provider::::try_from(rpc)? - .interval(Duration::from_millis(1000)); + let abi_json_string = fs::read_to_string("src/contract_abi.json").await?; + let abi = serde_json::from_str::(&abi_json_string)?; + + let provider = Provider::::try_from(rpc)?.interval(Duration::from_millis(1000)); let contract = Contract::new( - billing_contract_add.parse::
()?, - abi.to_owned(), - Arc::new(provider)); + billing_contract_add.parse::
()?, + abi.to_owned(), + Arc::new(provider), + ); let mut bytes32_tx_hash = [0u8; 32]; - hex::decode_to_slice(&tx_hash[2..], &mut bytes32_tx_hash)?; + hex::decode_to_slice(&tx_hash[2..], &mut bytes32_tx_hash)?; let req = contract.method::<_, U256>("balanceOf", bytes32_tx_hash)?; - let deposit = req.call().await?; + let deposit = req.call().await?; - Ok(deposit) + Ok(deposit) } async fn get_transaction_data(tx_hash: &str, rpc: &str) -> Result { @@ -100,7 +102,6 @@ pub async fn create_code_file( rpc: &str, contract: &str, billing_contract: &str, - abi: &Abi, ) -> Result<(), ServerlessError> { // get tx data let mut tx_data = match get_transaction_data(tx_hash, rpc).await?["result"].take() { @@ -129,12 +130,12 @@ pub async fn create_code_file( }?; // get tx deposit - let tx_deposit = get_current_deposit(tx_hash, rpc, billing_contract, abi) + let tx_deposit = get_current_deposit(tx_hash, rpc, billing_contract) .await .map_err(|_| ServerlessError::TxDepositNotFound)?; // TODO: FIX THE FIXED MINIMUM VALUE - if tx_deposit <= U256::from(200) { - return Err(ServerlessError::TxDepositNotEnough); + if tx_deposit <= U256::from(200) { + return Err(ServerlessError::TxDepositNotEnough); } // hex decode calldata by skipping to the code bytes @@ -249,7 +250,6 @@ pub async fn get_workerd_response( body: actix_web::web::Bytes, signer: &k256::ecdsa::SigningKey, host_header: &str, - ) -> Result<(HttpResponse, [u8; 32]), anyhow::Error> { let mut hasher = Keccak::v256(); hasher.update(b"|oyster-serverless-hasher|");