Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Client implementation + benchmark #2

Merged
merged 22 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,11 @@ Cargo.lock
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

.vscode
# vscode foler
.vscode/

# local parquet file location for client
client/parquet_files/

bench_files
.vscode
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# 15721-s24-cache2
15-721 Spring 2024 - Cache #2

# LRU Cache Server

This server implements a Least Recently Used (LRU) caching mechanism, providing a simple interface for fetching files from a simulated S3 storage and managing them within an LRU cache. The server is built using Rust and the Rocket framework.
Expand Down Expand Up @@ -85,4 +86,8 @@ root@node1:/data> curl -L http://node2:8000/s3/test1.txt # make sure -L flag is
- **CURL Command**:
```sh
curl -X POST http://localhost:8000/size/<new-size-in-bytes>
```
```

## Benchmark

To run benchmark, simple run `bench.sh`
30 changes: 30 additions & 0 deletions bench.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

echo $HOME

pip install pyarrow pandas

rm -rf bench_files
rm client/parquet_files/*

# Create dir to store benchmark parquet files named "bench_files"
mkdir -p bench_files

# This generates benchmark parquet files
python pqt_gen.py --num-rows 1000000 --num-cols 10 --num-files 4

cp bench_files/* server/tests/test_s3_files/

# When server is not available, we put the files on client side local cache
# cp bench_files/* /client/parquet_files/


cd client

# This build and run client/src/benchmark.rs, check the code for details
cargo build --bin benchmark

export CLIENT_FILES_DIR=$HOME/15721-s24-cache2/client/parquet_files/

RUST_BACKTRACE=1 cargo run --package client --bin benchmark

16 changes: 13 additions & 3 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
[package]
name = "echo_client"
name = "client"
version = "0.1.0"
edition = "2018"
authors = ["Your Name <[email protected]>"]
description = "Echo client application"
description = "Client application"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
clap = "3"
anyhow = "1.0"
parquet = "50.0.0"
arrow = "50.0.0"
log = "0.4"
env_logger = "0.11.1"
thiserror = "1.0"
env_logger = "0.11.1"
async-trait = "0.1"
rocket = { version = "0.5.0", features = ["json"] }
reqwest = { version = "0.11", features = ["stream", "json"] }

[[bin]]
name = "benchmark"
path = "src/benchmark.rs"
107 changes: 107 additions & 0 deletions client/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use client::client_api::{StorageClient, StorageRequest, TableId};
use client::storage_client::StorageClientImpl;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::Instant;

// This scans the bench_files dir to figure out which test files are present,
// then builds a map of TableId -> filename to init storage client(only when catalog is not available)
// and also generates workload based on table ids. Finally it runs the workload

#[tokio::main]
async fn main() {
// Call the helper function to create the map
let home = std::env::var("HOME").unwrap();
let bench_files_path = format!("{}/15721-s24-cache2/bench_files", home);
let map = create_table_file_map(&bench_files_path).unwrap();
let client = setup_client(map.clone());
let table_ids: Vec<TableId> = map.keys().cloned().collect();
let load = load_gen_allonce(table_ids.clone());
load_run(&client, load).await;
// let skewed_load = load_gen_skewed(table_ids);
// load_run(&client, skewed_load).await;
}

async fn load_run(client: &dyn StorageClient, requests: Vec<StorageRequest>) {
println!("Start running workload");
let start = Instant::now();
for req in requests {
let id = match req {
StorageRequest::Table(id) => id,
_ => panic!("Invalid request type"),
};
println!("Requesting data for table {:?}", id);

let res = client.request_data_sync(req).await;
assert!(res.is_ok());
println!("Received data for table {:?}", id);

// let local_cache_dir = StorageClientImpl::local_cache_path();
// // iterate files in local cache and delete them
// let entries = fs::read_dir(local_cache_dir).unwrap();
// for entry in entries {
// let entry = entry.unwrap();
// let path = entry.path();
// // if file name ends with "parquet"
// if let Some(file_name) = path.file_name() {
// if let Some(name) = file_name.to_str() {
// if name.ends_with("parquet") {
// fs::remove_file(path).unwrap();
// }
// }
// }
// }
}
let duration = start.elapsed();
println!("Time used: {:?}", duration);
}

// Generate a load of requests for all tables at once
fn load_gen_allonce(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
let mut requests = Vec::new();
for table_id in table_ids {
requests.push(StorageRequest::Table(table_id));
}
requests
}

// Generate a load of requests for all tables, but skewed
// This is not always used
#[allow(dead_code)]
fn load_gen_skewed(table_ids: Vec<TableId>) -> Vec<StorageRequest> {
// read a random table id twice, and a random table id zero times
let mut requests = Vec::new();
for table_id in &table_ids {
requests.push(StorageRequest::Table(table_id.clone()));
}
// remove last element
requests.pop();
requests.push(StorageRequest::Table(table_ids[0]));

requests
}

fn setup_client(table_file_map: HashMap<TableId, String>) -> StorageClientImpl {
StorageClientImpl::new_for_test(1, table_file_map)
}

fn create_table_file_map(directory: &str) -> Result<HashMap<TableId, String>, std::io::Error> {
let mut table_file_map: HashMap<TableId, String> = HashMap::new();
let dir = Path::new(directory);

// Read the directory entries
let entries = fs::read_dir(dir)?;

// Iterate over the entries
for (id, entry) in entries.enumerate() {
let entry = entry?;
if entry.path().is_file() {
// If the entry is a file, add it to the map with an incremental ID
let filename = entry.file_name().into_string().unwrap();
table_file_map.insert(id as TableId, filename);
}
}

Ok(table_file_map)
}
38 changes: 38 additions & 0 deletions client/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use anyhow::Result;
use arrow::record_batch::RecordBatch;
use tokio::sync::mpsc::Receiver;

/// Id types for table, column, and record. Need to be consistent among all components
/// (e.g. execution engine). We don't want to make any type generic here just for the id,
/// so we simply define them here. Might refine later.
pub type TableId = u64;
pub type ColumnId = u64;
pub type RecordId = u64;

/// [`StorageRequest`] specifies the requests that the execution engine might issue to
/// the storage node.
///
/// Currently we assume the execution engine only requests the whole table/column. We may
/// add `std::ops::RangeBounds` later to support range query from the execution engine.
pub enum StorageRequest {
/// Requests a whole table from the underlying storage.
Table(TableId),
/// Requests one or more columns from the underlying storage.
Columns(TableId, Vec<ColumnId>),
/// Requests one or more tuples from the underlying storage.
/// FIXME: Do we really need this?
Tuple(Vec<RecordId>),
}

/// [`StorageClient`] provides the interface for the execution engine to query data from the
/// storage node. It resolves the physical location of the tables/columns/tuples by querying
/// the catalog node, and then sends the request to the storage node to get the data from the
/// underlying storage.
#[async_trait::async_trait]
pub trait StorageClient: Send + Sync + 'static {
/// Returns the requested data as a stream.
async fn request_data(&self, request: StorageRequest) -> Result<Receiver<RecordBatch>>;

/// Returns all the requested data as a whole.
async fn request_data_sync(&self, request: StorageRequest) -> Result<Vec<RecordBatch>>;
}
64 changes: 0 additions & 64 deletions client/src/kv_store.rs

This file was deleted.

2 changes: 2 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod client_api;
pub mod storage_client;
Loading
Loading