Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for handling billing process for the service offered by the operator #18

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,330 changes: 2,233 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ actix-web = "4"
anyhow = "1.0.75"
clap = { version = "4.4.7", features = ["derive"] }
data-encoding = "2.5.0"
ethers = "2.0.11"
hex = "0.4.3"
k256 = { version = "0.13.2", features = ["ecdsa", "ecdsa-core"] }
openssl = { version = "0.10", features = ["vendored"] }
openssl = { version = "0.10.60", features = ["vendored"] }
rand = "0.8.5"
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
134 changes: 134 additions & 0 deletions src/billing_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::ops::DerefMut;

use crate::model::AppState;

use actix_web::web::{Data, Json};
use actix_web::{get, post, HttpResponse, Responder};
use k256::ecdsa::SigningKey;
use k256::elliptic_curve::generic_array::sequence::Lengthen;
use serde_json::json;
use tiny_keccak::{Hasher, Keccak};

#[derive(Debug, serde::Deserialize)]
pub struct SigningData {
nonce: String,
tx_hashes: Vec<String>,
}

#[get("/billing/inspect")]
pub async fn inspect_bill(appstate: Data<AppState>) -> impl Responder {
// Can replace with 'appstate.execution_costs.lock().unwrap().clone()' if turns out cloning is faster then Json creation!!
HttpResponse::Ok().json(json!({
"bill": *appstate.execution_costs.lock().unwrap(),
}))
}

#[get("/billing/latest")]
pub async fn get_last_bill_claim(appstate: Data<AppState>) -> impl Responder {
let mut last_bill_claim_guard = appstate.last_bill_claim.lock().unwrap();
let last_bill_claim = last_bill_claim_guard.deref_mut();

let Some(bill_data_hex) = &last_bill_claim.0 else {
return HttpResponse::BadRequest().body("No bill claimed yet!");
};

if let Some(signature) = &last_bill_claim.1 {
return HttpResponse::Ok().json(json!({
"bill_claim_data": bill_data_hex,
"signature": signature,
}));
}

let bill_claim_data = hex::decode(bill_data_hex);
let Ok(bill_claim_data) = bill_claim_data else {
return HttpResponse::InternalServerError().body(format!(
"Failed to decode claimed bill data: {}",
bill_claim_data.unwrap_err()
));
};

let signature = sign_data(bill_claim_data.as_slice(), &appstate.signer);
let Ok(signature) = signature else {
return HttpResponse::InternalServerError().body(format!(
"Failed to sign billing data: {}",
signature.unwrap_err()
));
};

let response = HttpResponse::Ok().json(json!({
"bill_claim_data": bill_data_hex,
"signature": signature,
}));

last_bill_claim.1 = Some(signature);
return response;
}

#[post("/billing/export")]
pub async fn export_bill(appstate: Data<AppState>, data: Json<SigningData>) -> impl Responder {
let signing_data = data.into_inner();
if signing_data.nonce.is_empty() {
return HttpResponse::BadRequest().body("Nonce must not be empty!");
}

vg-27 marked this conversation as resolved.
Show resolved Hide resolved
if signing_data.tx_hashes.is_empty() {
return HttpResponse::BadRequest().body("List of transaction hashes must not be empty!");
}

let mut bytes32_nonce = [0u8; 32];
if let Err(err) = hex::decode_to_slice(&signing_data.nonce, &mut bytes32_nonce) {
return HttpResponse::BadRequest()
.body(format!("Failed to decode nonce into 32 bytes: {}", err));
}

let mut bill_claim_data = bytes32_nonce.to_vec();
for tx_hash in signing_data.tx_hashes {
if let Some(cost) = appstate.execution_costs.lock().unwrap().remove(&tx_hash) {
let mut bytes32_tx_hash = [0u8; 32];
if let Err(_) = hex::decode_to_slice(&tx_hash[2..], &mut bytes32_tx_hash) {
continue;
}

bill_claim_data.append(&mut bytes32_tx_hash.to_vec());
bill_claim_data.append(&mut cost.to_be_bytes().to_vec());
}
}

if bill_claim_data.len() == 32 {
return HttpResponse::BadRequest()
.body("Given transaction hashes are not present in billing data");
}

let signature = sign_data(bill_claim_data.as_slice(), &appstate.signer);
let bill_claim_data = hex::encode(bill_claim_data.as_slice());

let Ok(signature) = signature else {
*appstate.last_bill_claim.lock().unwrap() = (Some(bill_claim_data), None);

return HttpResponse::InternalServerError().body(format!(
"Failed to sign billing data: {}",
signature.unwrap_err()
));
};

let response = HttpResponse::Ok().json(json!({
"bill_claim_data": bill_claim_data,
"signature": signature,
}));

*appstate.last_bill_claim.lock().unwrap() = (Some(bill_claim_data), Some(signature));
return response;
}

fn sign_data(data: &[u8], signer: &SigningKey) -> Result<String, anyhow::Error> {
let mut hasher = Keccak::v256();
hasher.update(data);

let mut hash = [0u8; 32];
hasher.finalize(&mut hash);

let (rs, v) = signer.sign_prehash_recoverable(&hash)?;
let signature = hex::encode(rs.to_bytes().append(27 + v.to_byte()).as_slice());

return Ok(signature);
}
21 changes: 21 additions & 0 deletions src/contracts/billing_contract_abi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[
{
"inputs": [
{
"internalType": "bytes32",
"name": "_txhash",
"type": "bytes32"
}
],
"name": "balanceOf",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
}
]
53 changes: 28 additions & 25 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{cgroups, model::AppState, workerd};
use std::io::{BufRead, BufReader};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

use crate::model::AppState;
use crate::{cgroups, workerd};

use actix_web::http::{header, StatusCode};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use anyhow::{anyhow, Context};
use std::io::{BufRead, BufReader};
use std::sync::atomic::Ordering;
use std::time::Duration;
// use std::time::Instant;
use tokio::time::timeout;

pub async fn serverless(
Expand Down Expand Up @@ -66,12 +67,14 @@ pub async fn serverless(
workerd_runtime_path,
&appstate.rpc,
&appstate.contract,
&appstate.billing_contract,
)
.await
{
use workerd::ServerlessError::*;
return match err {
CalldataRetrieve(_)
| TxDepositNotEnough
| TxNotFound
| InvalidTxToType
| InvalidTxToValue(_, _)
Expand All @@ -91,7 +94,7 @@ pub async fn serverless(
};
}

// let execution_timer_start = Instant::now();
let execution_timer_start = Instant::now();

// reserve cgroup
let cgroup = appstate.cgroups.lock().unwrap().reserve();
Expand Down Expand Up @@ -153,22 +156,13 @@ pub async fn serverless(

use workerd::ServerlessError::*;
return match err {
CalldataRetrieve(_)
| TxNotFound
| InvalidTxToType
| InvalidTxToValue(_, _)
| InvalidTxCalldataType
| BadCalldata(_) => HttpResponse::BadRequest().body(format!(
ConfigFileCreate(_) => HttpResponse::InternalServerError().body(format!(
"{:?}",
anyhow!(err).context("failed to create code file")
)),
CodeFileCreate(_) => HttpResponse::InternalServerError().body(format!(
"{:?}",
anyhow!(err).context("failed to create code file")
anyhow!(err).context("failed to create config file")
)),
_ => HttpResponse::InternalServerError().body(format!(
"{:?}",
anyhow!(err).context("unexpected error while trying to create code file")
anyhow!(err).context("unexpected error while trying to create config file")
)),
};
}
Expand Down Expand Up @@ -250,6 +244,22 @@ pub async fn serverless(
.context("CRITICAL: failed to clean up code file")
.unwrap_or_else(|err| println!("{err:?}"));

let execution_timer_end = Instant::now();
let execution_time = execution_timer_end
.duration_since(execution_timer_start)
.as_millis();

// TODO: FIX THE VALUE OF FIXED COST AND CONVERSION RATE
let execution_cost = 1 + 2 * execution_time;

appstate
.execution_costs
.lock()
.unwrap()
.entry(tx_hash.to_owned())
.and_modify(|cost| *cost += execution_cost)
.or_insert(execution_cost);

if let Err(err) = response {
return HttpResponse::RequestTimeout()
.body(format!("{:?}", anyhow!(err).context("worker timed out")));
Expand All @@ -264,12 +274,5 @@ pub async fn serverless(
}
let response = response.unwrap();

// let execution_timer_end = Instant::now();
// let execution_time = execution_timer_end
// .duration_since(execution_timer_start)
// .as_millis()
// .to_string();
// println!("Execution time: {}ms", execution_time);

return response;
}
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
pub mod billing_handler;
pub mod cgroups;
pub mod handler;
pub mod model;
mod tests;
pub mod workerd;

use ethers::contract::abigen;
use ethers::providers::{Http, Provider};

abigen!(
BillingContract,
"src/contracts/billing_contract_abi.json",
derives(serde::Serialize, serde::Deserialize)
);

type BillContract = BillingContract<Provider<Http>>;
53 changes: 47 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use actix_web::{web, App, HttpServer};
use anyhow::{anyhow, Context};
use clap::Parser;
use tokio::fs;
use std::collections::HashMap;
use std::sync::Arc;

use serverless::cgroups::Cgroups;
use serverless::model::AppState;
use serverless::BillingContract;

use actix_web::{web, App, HttpResponse, HttpServer};
use anyhow::{anyhow, Context};
use clap::Parser;
use ethers::providers::{Http, Provider, ProviderExt};
use ethers::types::Address;
use tokio::fs;

/// Simple program to greet a person
#[derive(Parser, Debug)]
Expand All @@ -13,6 +19,9 @@ struct Args {
#[clap(long, value_parser, default_value = "6001")]
port: u16,

#[clap(long, value_parser)]
billing_port: u16, // TODO: ADD THE DEFAULT PORT

#[clap(long, value_parser, default_value = "./runtime/")]
runtime_path: String,

Expand All @@ -33,6 +42,9 @@ struct Args {
)]
contract: String,

#[clap(long, value_parser)]
billing_contract: String, // TODO: ADD A DEFAULT ADDRESS

#[clap(long, value_parser)]
signer: String,
}
Expand All @@ -52,6 +64,7 @@ async fn main() -> anyhow::Result<()> {
// println!("{:?}", response);

let port: u16 = cli.port;
let billing_port: u16 = cli.billing_port;

let cgroups = Cgroups::new().context("failed to construct cgroups")?;
if cgroups.free.is_empty() {
Expand All @@ -66,14 +79,28 @@ async fn main() -> anyhow::Result<()> {
)
.context("invalid signer key")?;

let rpc_provider = Provider::<Http>::try_connect(&cli.rpc)
.await
.context("Failed to connect to the rpc")?;
let billing_contract = BillingContract::new(
cli.billing_contract
.parse::<Address>()
.context("Failed to parse billing contract address")?,
Arc::new(rpc_provider),
);

let app_data = web::Data::new(AppState {
cgroups: cgroups.into(),
running: std::sync::atomic::AtomicBool::new(true),
runtime_path: cli.runtime_path,
rpc: cli.rpc,
contract: cli.contract,
signer,
signer: signer,
billing_contract: billing_contract,
execution_costs: HashMap::new().into(),
last_bill_claim: (None, None).into(),
});
let app_data_clone = app_data.clone();

let server = HttpServer::new(move || {
App::new()
Expand All @@ -86,7 +113,21 @@ async fn main() -> anyhow::Result<()> {

println!("Server started on port {}", port);

server.await?;
let billing_server = HttpServer::new(move || {
App::new()
.app_data(app_data_clone.clone())
.service(serverless::billing_handler::inspect_bill)
.service(serverless::billing_handler::get_last_bill_claim)
.service(serverless::billing_handler::export_bill)
.default_service(web::to(HttpResponse::NotFound))
})
.bind(("0.0.0.0", billing_port))
.context(format!("could not bind to port {billing_port}"))?
.run();

println!("Billing Server started on port {}", billing_port);

tokio::try_join!(server, billing_server)?;

Ok(())
}
Loading