From 08a0d4bf754f2058995691c2e2124338c6ac4404 Mon Sep 17 00:00:00 2001 From: Tiemo Bang Date: Wed, 17 Apr 2024 11:51:41 -0700 Subject: [PATCH] Helper function for SpanStore aggregation --- .../python_samples/spanstore_aggregate.py | 59 +++++++++++++++++++ baselines/src/sky_pie_baselines.rs | 48 +++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 baselines/python_samples/spanstore_aggregate.py diff --git a/baselines/python_samples/spanstore_aggregate.py b/baselines/python_samples/spanstore_aggregate.py new file mode 100644 index 0000000..03705e1 --- /dev/null +++ b/baselines/python_samples/spanstore_aggregate.py @@ -0,0 +1,59 @@ +from sky_pie_baselines import spanstore_aggregate +from pydantic import BaseModel +from datetime import datetime +from typing import Literal +from collections import defaultdict +import time + +class Request(BaseModel): + timestamp: datetime + op: Literal["read", "write", "evict"] + issue_region: str + obj_key: str + size: float + + +def spanstore_aggregate_py(requests, objects_in_access_set): + put_counts = defaultdict(int) + get_counts = defaultdict(int) + ingress_counts = defaultdict(float) + egress_counts = defaultdict(float) + + for i in range(len(requests)): + request = requests[i] + if request.obj_key in objects_in_access_set: + if request.op == "write": + put_counts[request.issue_region] += 1 + ingress_counts[request.issue_region] += request.size + elif request.op == "read": + get_counts[request.issue_region] += 1 + egress_counts[request.issue_region] += request.size + return put_counts, get_counts, ingress_counts, egress_counts + +if __name__ == "__main__": + no_objects = 10000 + objects_in_access_set = [ f"key{i}" for i in range(no_objects)] + region = ['gcp:europe-west1-b','azure:eastus','azure:westus','gcp:us-east1-b', 'gcp:us-west1-a', 'azure:westeurope', 'aws:us-east-1', 'aws:eu-west-1', 'azure:westus'] + timestamp = datetime.now() + requests = [ + Request(timestamp=timestamp, op="read" if (i % 2) == 0 else "write", issue_region=region[i % len(region)], obj_key=objects_in_access_set[i], size=float(i)) + for i in range(no_objects) + ] + #print(requests) + start1 = time.time_ns() + res1 = spanstore_aggregate(requests, objects_in_access_set) + end1 = time.time_ns() + duration1 = (end1 - start1) / 1e6 + print(f"Time taken by Rust function: {duration1} ms") + + start2 = time.time_ns() + res2 = spanstore_aggregate_py(requests, objects_in_access_set) + end2 = time.time_ns() + duration2 = (end2 - start2) / 1e6 + assert(res1 == res2) + print(f"Time taken by Python function: {duration2} ms") + print(f"Speedup: {duration2/duration1}") + #put_counts, get_counts, ingress_counts, egress_counts = res1 + #put_counts2, get_counts2, ingress_counts2, egress_counts2 = res2 + #print(put_counts, get_counts, ingress_counts, egress_counts) + #print(put_counts2, get_counts2, ingress_counts2, egress_counts2) \ No newline at end of file diff --git a/baselines/src/sky_pie_baselines.rs b/baselines/src/sky_pie_baselines.rs index ae76683..4d5257e 100644 --- a/baselines/src/sky_pie_baselines.rs +++ b/baselines/src/sky_pie_baselines.rs @@ -1,4 +1,6 @@ use pyo3::prelude::*; +use pyo3::types::{PyList, PyTuple}; +use std::collections::HashMap; mod optimizer; use optimizer::Optimizer; @@ -12,11 +14,57 @@ use workload::Workload; mod py_loader; use py_loader::PyLoader; + + +#[pyfunction] +fn spanstore_aggregate(requests: &PyList, objects_in_access_set: Vec<&str>) -> (HashMap,HashMap,HashMap,HashMap) /* PyResult */ { + + let mut put_counts = HashMap::::new(); + let mut get_counts = HashMap::::new(); + let mut ingress_counts = HashMap::::new(); + let mut egress_counts = HashMap::::new(); + + // Convert object_in_access_set to a HashSet + let objects_in_access_set: std::collections::HashSet<&str> = objects_in_access_set.iter().cloned().collect(); + + // Iterate over the list of requests and print them + for request in requests.iter() { + // Get request type + let obj_key = request.getattr("obj_key").unwrap().extract::().unwrap(); + if objects_in_access_set.contains(&obj_key.as_str()) { + let op = request.getattr("op").unwrap().extract::().unwrap(); + let issue_region = request.getattr("issue_region").unwrap().extract::().unwrap(); + let size = request.getattr("size").unwrap().extract::().unwrap(); + + if op == "write" { + let put_count = put_counts.entry(issue_region.clone()).or_insert(0); + *put_count += 1; + let ingress_count = ingress_counts.entry(issue_region.clone()).or_insert(0.0); + *ingress_count += size; + } else if op == "read" { + let get_count = get_counts.entry(issue_region.clone()).or_insert(0); + *get_count += 1; + let egress_count = egress_counts.entry(issue_region.clone()).or_insert(0.0); + *egress_count += size; + } + } + + } + + /* println!("put_counts: {:?} ", put_counts); + println!("get_counts: {:?}", get_counts); + println!("ingress_counts: {:?}", ingress_counts); + println!("egress_counts: {:?}", egress_counts); */ + + return (put_counts, get_counts, ingress_counts, egress_counts); //.into_py(); +} + #[pymodule] fn sky_pie_baselines(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_wrapped(wrap_pyfunction!(spanstore_aggregate))?; Ok(()) } \ No newline at end of file