Skip to content

Commit

Permalink
Helper function for SpanStore aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiemo Bang committed Apr 17, 2024
1 parent ce5fd2e commit 08a0d4b
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 0 deletions.
59 changes: 59 additions & 0 deletions baselines/python_samples/spanstore_aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from sky_pie_baselines import spanstore_aggregate
from pydantic import BaseModel
from datetime import datetime
from typing import Literal
from collections import defaultdict
import time

class Request(BaseModel):
timestamp: datetime
op: Literal["read", "write", "evict"]
issue_region: str
obj_key: str
size: float


def spanstore_aggregate_py(requests, objects_in_access_set):
put_counts = defaultdict(int)
get_counts = defaultdict(int)
ingress_counts = defaultdict(float)
egress_counts = defaultdict(float)

for i in range(len(requests)):
request = requests[i]
if request.obj_key in objects_in_access_set:
if request.op == "write":
put_counts[request.issue_region] += 1
ingress_counts[request.issue_region] += request.size
elif request.op == "read":
get_counts[request.issue_region] += 1
egress_counts[request.issue_region] += request.size
return put_counts, get_counts, ingress_counts, egress_counts

if __name__ == "__main__":
no_objects = 10000
objects_in_access_set = [ f"key{i}" for i in range(no_objects)]
region = ['gcp:europe-west1-b','azure:eastus','azure:westus','gcp:us-east1-b', 'gcp:us-west1-a', 'azure:westeurope', 'aws:us-east-1', 'aws:eu-west-1', 'azure:westus']
timestamp = datetime.now()
requests = [
Request(timestamp=timestamp, op="read" if (i % 2) == 0 else "write", issue_region=region[i % len(region)], obj_key=objects_in_access_set[i], size=float(i))
for i in range(no_objects)
]
#print(requests)
start1 = time.time_ns()
res1 = spanstore_aggregate(requests, objects_in_access_set)
end1 = time.time_ns()
duration1 = (end1 - start1) / 1e6
print(f"Time taken by Rust function: {duration1} ms")

start2 = time.time_ns()
res2 = spanstore_aggregate_py(requests, objects_in_access_set)
end2 = time.time_ns()
duration2 = (end2 - start2) / 1e6
assert(res1 == res2)
print(f"Time taken by Python function: {duration2} ms")
print(f"Speedup: {duration2/duration1}")
#put_counts, get_counts, ingress_counts, egress_counts = res1
#put_counts2, get_counts2, ingress_counts2, egress_counts2 = res2
#print(put_counts, get_counts, ingress_counts, egress_counts)
#print(put_counts2, get_counts2, ingress_counts2, egress_counts2)
48 changes: 48 additions & 0 deletions baselines/src/sky_pie_baselines.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use pyo3::prelude::*;
use pyo3::types::{PyList, PyTuple};
use std::collections::HashMap;

mod optimizer;
use optimizer::Optimizer;
Expand All @@ -12,11 +14,57 @@ use workload::Workload;
mod py_loader;
use py_loader::PyLoader;



#[pyfunction]
fn spanstore_aggregate(requests: &PyList, objects_in_access_set: Vec<&str>) -> (HashMap<String, i64>,HashMap<String, i64>,HashMap<String, f64>,HashMap<String, f64>) /* PyResult<PyTuple> */ {

let mut put_counts = HashMap::<String, i64>::new();
let mut get_counts = HashMap::<String, i64>::new();
let mut ingress_counts = HashMap::<String, f64>::new();
let mut egress_counts = HashMap::<String, f64>::new();

// Convert object_in_access_set to a HashSet
let objects_in_access_set: std::collections::HashSet<&str> = objects_in_access_set.iter().cloned().collect();

// Iterate over the list of requests and print them
for request in requests.iter() {
// Get request type
let obj_key = request.getattr("obj_key").unwrap().extract::<String>().unwrap();
if objects_in_access_set.contains(&obj_key.as_str()) {
let op = request.getattr("op").unwrap().extract::<String>().unwrap();
let issue_region = request.getattr("issue_region").unwrap().extract::<String>().unwrap();
let size = request.getattr("size").unwrap().extract::<f64>().unwrap();

if op == "write" {
let put_count = put_counts.entry(issue_region.clone()).or_insert(0);
*put_count += 1;
let ingress_count = ingress_counts.entry(issue_region.clone()).or_insert(0.0);
*ingress_count += size;
} else if op == "read" {
let get_count = get_counts.entry(issue_region.clone()).or_insert(0);
*get_count += 1;
let egress_count = egress_counts.entry(issue_region.clone()).or_insert(0.0);
*egress_count += size;
}
}

}

/* println!("put_counts: {:?} ", put_counts);
println!("get_counts: {:?}", get_counts);
println!("ingress_counts: {:?}", ingress_counts);
println!("egress_counts: {:?}", egress_counts); */

return (put_counts, get_counts, ingress_counts, egress_counts); //.into_py();
}

#[pymodule]
fn sky_pie_baselines(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<ProfitBasedOptimizer>()?;
m.add_class::<KmeansOptimizer>()?;
m.add_class::<Workload>()?;
m.add_class::<PyLoader>()?;
m.add_wrapped(wrap_pyfunction!(spanstore_aggregate))?;
Ok(())
}

0 comments on commit 08a0d4b

Please sign in to comment.