Skip to content

Commit

Permalink
Ingestion build with pinenut
Browse files Browse the repository at this point in the history
  • Loading branch information
brianreicher committed Oct 9, 2023
1 parent ccc6d5e commit 91b5bdf
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
9 changes: 9 additions & 0 deletions ingestion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "ingestion"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
redis = "0.19"
53 changes: 53 additions & 0 deletions ingestion/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use redis::{Client, Commands};
use std::thread;

pub struct RedisDriver {
client: redis::Client,
}

impl RedisDriver {
pub fn new(host: &str, port: u16) -> Self {
let url = format!("redis://{}:{}", host, port);
let client = Client::open(url).expect("Failed to create Redis client");
RedisDriver { client }
}

pub fn set(&self, key: &str, value: &str) -> redis::RedisResult<()> {
let mut conn = self.client.get_connection()?;
conn.set(key, value)?;
Ok(())
}

pub fn get(&self, key: &str) -> redis::RedisResult<Option<String>> {
let mut conn = self.client.get_connection()?;
let result: Option<String> = conn.get(key)?;
Ok(result)
}

pub fn delete(&self, key: &str) -> redis::RedisResult<()> {
let mut conn = self.client.get_connection()?;
conn.del(key)?;
Ok(())
}

pub fn run_queries_in_parallel(&self, keys: Vec<String>) -> Vec<redis::RedisResult<Option<String>>> {
let mut handles = vec![];

for key in keys {
let client = self.client.clone();
let handle = thread::spawn(move || {
let conn = client.get_connection();
conn.and_then(|mut conn| conn.get(&key))
});

handles.push(handle);
}

let mut results = vec![];
for handle in handles {
results.push(handle.join().unwrap());
}

results
}
}

0 comments on commit 91b5bdf

Please sign in to comment.