Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for handling billing process for the service offered by the operator #18

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,330 changes: 2,233 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ actix-web = "4"
anyhow = "1.0.75"
clap = { version = "4.4.7", features = ["derive"] }
data-encoding = "2.5.0"
ethers = "2.0.11"
hex = "0.4.3"
k256 = { version = "0.13.2", features = ["ecdsa", "ecdsa-core"] }
openssl = { version = "0.10", features = ["vendored"] }
openssl = { version = "0.10.60", features = ["vendored"] }
rand = "0.8.5"
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
138 changes: 138 additions & 0 deletions src/billing_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use crate::model::AppState;

use actix_web::web::{Data, Json};
use actix_web::{get, post, HttpResponse, Responder};
use k256::ecdsa::SigningKey;
use k256::elliptic_curve::generic_array::sequence::Lengthen;
use serde_json::json;
use tiny_keccak::{Hasher, Keccak};

#[derive(Debug, serde::Deserialize)]
pub struct SigningData {
nonce: String,
tx_hashes: Vec<String>,
}

#[get("/billing/inspect")]
pub async fn inspect_bill(appstate: Data<AppState>) -> impl Responder {
HttpResponse::Ok().json(json!({
"bill": appstate.execution_costs.lock().unwrap().clone(),
}))
}

#[get("/billing/latest")]
pub async fn get_last_bill_claim(appstate: Data<AppState>) -> impl Responder {
let mut last_bill_claim_guard = appstate.last_bill_claim.lock().unwrap();
if last_bill_claim_guard.0.is_none() {
return HttpResponse::BadRequest().body("No bill claimed yet!");
}

if last_bill_claim_guard.1.is_some() {
return HttpResponse::Ok().json(json!({
"bill_claim_data": last_bill_claim_guard.0.clone().unwrap(),
"signature": last_bill_claim_guard.1.clone().unwrap(),
}));
}

let bill_claim_data = hex::decode(last_bill_claim_guard.0.clone().unwrap());
if bill_claim_data.is_err() {
return HttpResponse::InternalServerError().body(format!(
"Failed to decode claimed bill data: {}",
bill_claim_data.unwrap_err()
));
}

let bill_claim_data = bill_claim_data.unwrap();
let signature = sign_data(bill_claim_data.as_slice(), &appstate.signer).await;
if signature.is_err() {
return HttpResponse::InternalServerError().body(format!(
"Failed to sign billing data: {}",
signature.unwrap_err()
));
}

let bill_claim_data = hex::encode(bill_claim_data.as_slice());
vg-27 marked this conversation as resolved.
Show resolved Hide resolved
let signature = signature.unwrap();
last_bill_claim_guard.1 = Some(signature.clone());

HttpResponse::Ok().json(json!({
"bill_claim_data": bill_claim_data,
"signature": signature,
}))
}

#[post("/billing/export")]
pub async fn export_bill(appstate: Data<AppState>, data: Json<SigningData>) -> impl Responder {
let signing_data = data.into_inner();
if signing_data.nonce.is_empty() {
return HttpResponse::BadRequest().body("Nonce must not be empty!");
}

vg-27 marked this conversation as resolved.
Show resolved Hide resolved
if signing_data.tx_hashes.is_empty() {
return HttpResponse::BadRequest().body("List of transaction hashes must not be empty!");
}

let mut bytes32_nonce = [0u8; 32];
if let Err(err) = hex::decode_to_slice(&signing_data.nonce, &mut bytes32_nonce) {
return HttpResponse::BadRequest()
.body(format!("Failed to decode nonce into 32 bytes: {}", err));
}

let mut bill_claim_data = bytes32_nonce.to_vec();
for tx_hash in signing_data.tx_hashes {
if let Some(cost) = appstate.execution_costs.lock().unwrap().remove(&tx_hash) {
let mut bytes32_tx_hash = [0u8; 32];
if let Err(_) = hex::decode_to_slice(&tx_hash[2..], &mut bytes32_tx_hash) {
continue;
}

bill_claim_data.append(&mut bytes32_tx_hash.to_vec());
bill_claim_data.append(&mut cost.to_be_bytes().to_vec());
}
}

if bill_claim_data.len() == 32 {
return HttpResponse::BadRequest()
.body("Given transaction hashes are not present in billing data");
}

let signature = sign_data(bill_claim_data.as_slice(), &appstate.signer).await;
if signature.is_err() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If signature is failing here, then it will fail again when billing/latest will be called. There is no change in parameters or other context data. Signature failing here seems fatal error to me. Double check where it can fail and how developers has suggested to handle those errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the error conditions and possibilities while signing prehash messages and saw if we're using a valid secret scalar key for signing (that I suppose we must be using) and a valid hashing algorithm that gives out a 32-byte hash (which is the case for our Keccak::v256() hasher) then it is very rare that an error would occur. Have added the error checks just in case something weird happens (because of outdated crate version or something).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw If we look more closely, there is an aggressive exit happening when serving the requests as well after failing to sign the requests/response data. So even after getting a successful response from the worker serving the app, we're throwing an error response to the user of the app. Is this an acceptable behaviour? @roshanr95 @vg-27

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if signing is throwing exception then it will be there for every sign call. That means this application is not usable at all in that case, in production.

Then I think, if you just return exception here, that would be sufficient. Any exception due to implementation change, would be hit during testing phase of release.

You can remove the special handling here, it looks like you are trying to recover from error, but it's not. Kinda misleading.

Copy link
Member Author

@Ayush-Yadav Ayush-Yadav Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just handling it the way it was done when the serverless requests are getting processed by the enclave. I think that is more concerning since it is part of the code already in production. So shouldn't we add an exception here as well:-

pub async fn get_workerd_response(

port: u16,
req: HttpRequest,
body: actix_web::web::Bytes,
signer: &k256::ecdsa::SigningKey,
host_header: &str,

) -> Result<HttpResponse, anyhow::Error> {

let mut hasher = Keccak::v256();
hasher.update(b"|oyster-serverless-hasher|");

let timestamp = std::time::SystemTime::now()
    .duration_since(std::time::UNIX_EPOCH)?
    .as_secs();
hasher.update(b"|timestamp|");
hasher.update(&timestamp.to_be_bytes());

hasher.update(b"|request|");
hasher.update(b"|method|");
hasher.update(req.method().to_string().as_bytes());
hasher.update(b"|pathandquery|");
hasher.update(
    req.uri()
        .path_and_query()
        .map(|x| x.as_str())
        .unwrap_or("")
        .as_bytes(),
);
hasher.update(b"|host|");
hasher.update(host_header.as_bytes());
hasher.update(b"|body|");
hasher.update(&body);

let port_str = port.to_string();
let req_url = "http://127.0.0.1:".to_string() + &port_str + "/";
let client = reqwest::Client::new();
let response = req
    .headers()
    .into_iter()
    .fold(
        client.request(req.method().clone(), req_url),
        |req, header| req.header(header.0.clone(), header.1.clone()),
    )
    .body(body)
    .send()
    .await?;
hasher.update(b"|response|");

let mut actix_resp = response.headers().into_iter().fold(
    HttpResponse::build(response.status()),
    |mut resp, header| {
        resp.append_header((header.0.clone(), header.1.clone()));
        resp
    },
);
let response_body = response.bytes().await?;

hasher.update(b"|body|");
hasher.update(&response_body);

let mut hash = [0u8; 32];
hasher.finalize(&mut hash);

let (rs, v) = signer.sign_prehash_recoverable(&hash)?;` 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here in this case ? is used to pass on exception. No explicit handling required. Even for your case, I was suggesting just respond back with InternalError. And that would be fine.

Copy link
Member

@roshanr95 roshanr95 Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If signature is failing here, then it will fail again when billing/latest will be called. There is no change in parameters or other context data. Signature failing here seems fatal error to me. Double check where it can fail and how developers has suggested to handle those errors.

Depends on why it can fail and if it's consistent or not. For example, pretty sure it generates a random number inside, which can probably fail inconsistently.

Storing the last response is mainly just a safeguard I guess, if we don't even want to spend time really figuring out the failure conditions.

Copy link
Member

@roshanr95 roshanr95 Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw If we look more closely, there is an aggressive exit happening when serving the requests as well after failing to sign the requests/response data. So even after getting a successful response from the worker serving the app, we're throwing an error response to the user of the app. Is this an acceptable behaviour? @roshanr95 @vg-27

Yes, responses are insecure without a signature. No point in returning a response if not signed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, pretty sure it generates a random number inside, which can probably fail inconsistently.

Yeah, that's right. Plain ECDSA signature uses random number. But here implementation seems to be using RFC6979, where k is computed deterministically.

Anyways, if there is possibility of inconsistent error. Let's keep the backup of billing data.

appstate
.last_bill_claim
.lock()
.unwrap()
.0
.clone_from(&Some(hex::encode(bill_claim_data.as_slice())));

return HttpResponse::InternalServerError().body(format!(
"Failed to sign billing data: {}",
signature.unwrap_err()
));
}

let signature = signature.unwrap();
let bill_claim_data = hex::encode(bill_claim_data.as_slice());

let mut last_bill_claim_guard = appstate.last_bill_claim.lock().unwrap();
last_bill_claim_guard.0 = Some(bill_claim_data.clone());
last_bill_claim_guard.1 = Some(signature.clone());

HttpResponse::Ok().json(json!({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to assume that nonce is supposed to be unique? If yes, then can we save this data with given nonce as key. So that if in a case, client loses the bill, it can query bill again by providing the nonce. And maybe delete the cache after some timeout.

To avoid the case, where client got disconnected from enclave.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes nonce is supposed to be unique based on which the contract will catch duplication of bill data. We can store this data in cache for some time but I think once the client receives the data, they are supposed to claim the bill first thing and even if not that they are atleast supposed to store the bill data until they do. I don't know if we should populate the enclave with more data and affect their performance or capacity just to handle this. @roshanr95 Please comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the contract will deduplicate receipts based on the nonce

Storing all, or even some, of them is probably overkill, can maybe just store the very last one that the client can then requery (/billing/latest). Assuming proper persistence (and API usage) is the responsibility of the client, the only real loss vector that remains is network issues while the response is being transmitted, just the last one should be fine to handle this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also protect against duplicate requests by checking the nonce and seeing if it has changed since the last one 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required inside the enclave? Our major aim is to not allow duplicate requests to the billing contract because that's where the actual transactions happen. If client is sending a duplicate nonce to the enclave then it's their fault because then the enclave will think some balances have been settled when in fact the contract will reject it ,leading to lost bill info. If we were to check nonce inside the enclave then we'll have to check inside the list of all received yet and not just the last one which is probably an overkill.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Nonce check should also be enforced. Also storing the last bill is sufficient to handle the network issue.

"bill_claim_data": bill_claim_data,
"signature": signature,
}))
}

async fn sign_data(data: &[u8], signer: &SigningKey) -> Result<String, anyhow::Error> {
roshanr95 marked this conversation as resolved.
Show resolved Hide resolved
let mut hasher = Keccak::v256();
hasher.update(data);

let mut hash = [0u8; 32];
hasher.finalize(&mut hash);

let (rs, v) = signer.sign_prehash_recoverable(&hash)?;
let signature = hex::encode(rs.to_bytes().append(27 + v.to_byte()).as_slice());

return Ok(signature);
}
21 changes: 21 additions & 0 deletions src/contracts/billing_contract_abi.json
roshanr95 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[
{
"inputs": [
{
"internalType": "bytes32",
"name": "_txhash",
"type": "bytes32"
}
],
"name": "balanceOf",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
}
]
53 changes: 28 additions & 25 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{cgroups, model::AppState, workerd};
use std::io::{BufRead, BufReader};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

use crate::model::AppState;
use crate::{cgroups, workerd};

use actix_web::http::{header, StatusCode};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use anyhow::{anyhow, Context};
use std::io::{BufRead, BufReader};
use std::sync::atomic::Ordering;
use std::time::Duration;
// use std::time::Instant;
use tokio::time::timeout;

pub async fn serverless(
Expand Down Expand Up @@ -66,12 +67,14 @@ pub async fn serverless(
workerd_runtime_path,
&appstate.rpc,
&appstate.contract,
&appstate.billing_contract,
)
.await
{
use workerd::ServerlessError::*;
return match err {
CalldataRetrieve(_)
| TxDepositNotEnough
| TxNotFound
| InvalidTxToType
| InvalidTxToValue(_, _)
Expand All @@ -91,7 +94,7 @@ pub async fn serverless(
};
}

// let execution_timer_start = Instant::now();
let execution_timer_start = Instant::now();

// reserve cgroup
let cgroup = appstate.cgroups.lock().unwrap().reserve();
Expand Down Expand Up @@ -153,22 +156,13 @@ pub async fn serverless(

use workerd::ServerlessError::*;
return match err {
CalldataRetrieve(_)
| TxNotFound
| InvalidTxToType
| InvalidTxToValue(_, _)
| InvalidTxCalldataType
| BadCalldata(_) => HttpResponse::BadRequest().body(format!(
ConfigFileCreate(_) => HttpResponse::InternalServerError().body(format!(
"{:?}",
anyhow!(err).context("failed to create code file")
)),
CodeFileCreate(_) => HttpResponse::InternalServerError().body(format!(
"{:?}",
anyhow!(err).context("failed to create code file")
anyhow!(err).context("failed to create config file")
)),
_ => HttpResponse::InternalServerError().body(format!(
"{:?}",
anyhow!(err).context("unexpected error while trying to create code file")
anyhow!(err).context("unexpected error while trying to create config file")
)),
};
}
Expand Down Expand Up @@ -250,6 +244,22 @@ pub async fn serverless(
.context("CRITICAL: failed to clean up code file")
.unwrap_or_else(|err| println!("{err:?}"));

let execution_timer_end = Instant::now();
let execution_time = execution_timer_end
.duration_since(execution_timer_start)
.as_millis();

// TODO: FIX THE VALUE OF FIXED COST AND CONVERSION RATE
let execution_cost = 1 + 2 * execution_time;

appstate
.execution_costs
.lock()
.unwrap()
.entry(tx_hash.to_owned())
.and_modify(|cost| *cost += execution_cost)
.or_insert(execution_cost);

if let Err(err) = response {
return HttpResponse::RequestTimeout()
.body(format!("{:?}", anyhow!(err).context("worker timed out")));
Expand All @@ -264,12 +274,5 @@ pub async fn serverless(
}
let response = response.unwrap();

// let execution_timer_end = Instant::now();
// let execution_time = execution_timer_end
// .duration_since(execution_timer_start)
// .as_millis()
// .to_string();
// println!("Execution time: {}ms", execution_time);

return response;
}
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
pub mod billing_handler;
pub mod cgroups;
pub mod handler;
pub mod model;
mod tests;
pub mod workerd;

use ethers::contract::abigen;
use ethers::providers::{Http, Provider};

abigen!(
BillingContract,
"src/contracts/billing_contract_abi.json",
derives(serde::Serialize, serde::Deserialize)
);

type BillContract = BillingContract<Provider<Http>>;
54 changes: 48 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use actix_web::{web, App, HttpServer};
use anyhow::{anyhow, Context};
use clap::Parser;
use tokio::fs;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use serverless::cgroups::Cgroups;
use serverless::model::AppState;
use serverless::BillingContract;

use actix_web::{web, App, HttpResponse, HttpServer};
use anyhow::{anyhow, Context};
use clap::Parser;
use ethers::providers::{Http, Provider};
use ethers::types::Address;
use tokio::fs;

/// Simple program to greet a person
#[derive(Parser, Debug)]
Expand All @@ -13,6 +20,9 @@ struct Args {
#[clap(long, value_parser, default_value = "6001")]
port: u16,

#[clap(long, value_parser)]
billing_port: u16, // TODO: ADD THE DEFAULT PORT

#[clap(long, value_parser, default_value = "./runtime/")]
runtime_path: String,

Expand All @@ -33,6 +43,9 @@ struct Args {
)]
contract: String,

#[clap(long, value_parser)]
billing_contract: String, // TODO: ADD A DEFAULT ADDRESS

#[clap(long, value_parser)]
signer: String,
}
Expand All @@ -52,6 +65,7 @@ async fn main() -> anyhow::Result<()> {
// println!("{:?}", response);

let port: u16 = cli.port;
let billing_port: u16 = cli.billing_port;

let cgroups = Cgroups::new().context("failed to construct cgroups")?;
if cgroups.free.is_empty() {
Expand All @@ -66,14 +80,28 @@ async fn main() -> anyhow::Result<()> {
)
.context("invalid signer key")?;

let rpc_provider = Provider::<Http>::try_from(&cli.rpc)
.context("Failed to connect to the rpc")?
.interval(Duration::from_millis(1000));
let billing_contract = BillingContract::new(
cli.billing_contract
.parse::<Address>()
.context("Failed to parse billing contract address")?,
Arc::new(rpc_provider),
);

let app_data = web::Data::new(AppState {
cgroups: cgroups.into(),
running: std::sync::atomic::AtomicBool::new(true),
runtime_path: cli.runtime_path,
rpc: cli.rpc,
contract: cli.contract,
signer,
signer: signer,
billing_contract: billing_contract,
execution_costs: HashMap::new().into(),
last_bill_claim: (None, None).into(),
});
let app_data_clone = app_data.clone();

let server = HttpServer::new(move || {
App::new()
Expand All @@ -86,7 +114,21 @@ async fn main() -> anyhow::Result<()> {

println!("Server started on port {}", port);

server.await?;
let billing_server = HttpServer::new(move || {
App::new()
.app_data(app_data_clone.clone())
.service(serverless::billing_handler::inspect_bill)
.service(serverless::billing_handler::get_last_bill_claim)
.service(serverless::billing_handler::export_bill)
.default_service(web::to(HttpResponse::NotFound))
})
.bind(("0.0.0.0", billing_port))
.context(format!("could not bind to port {billing_port}"))?
.run();

println!("Billing Server started on port {}", billing_port);

tokio::try_join!(server, billing_server)?;

Ok(())
}
Loading