From 91b5bdffb91ae0bc1ade3a953e498adfde983691 Mon Sep 17 00:00:00 2001 From: brianreicher Date: Mon, 9 Oct 2023 14:19:50 -0400 Subject: [PATCH] Ingestion build with pinenut --- ingestion/Cargo.toml | 9 ++++++++ ingestion/src/main.rs | 53 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 ingestion/Cargo.toml create mode 100644 ingestion/src/main.rs diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml new file mode 100644 index 0000000..a3060c7 --- /dev/null +++ b/ingestion/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "ingestion" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +redis = "0.19" \ No newline at end of file diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs new file mode 100644 index 0000000..6362591 --- /dev/null +++ b/ingestion/src/main.rs @@ -0,0 +1,53 @@ +use redis::{Client, Commands}; +use std::thread; + +pub struct RedisDriver { + client: redis::Client, +} + +impl RedisDriver { + pub fn new(host: &str, port: u16) -> Self { + let url = format!("redis://{}:{}", host, port); + let client = Client::open(url).expect("Failed to create Redis client"); + RedisDriver { client } + } + + pub fn set(&self, key: &str, value: &str) -> redis::RedisResult<()> { + let mut conn = self.client.get_connection()?; + conn.set(key, value)?; + Ok(()) + } + + pub fn get(&self, key: &str) -> redis::RedisResult> { + let mut conn = self.client.get_connection()?; + let result: Option = conn.get(key)?; + Ok(result) + } + + pub fn delete(&self, key: &str) -> redis::RedisResult<()> { + let mut conn = self.client.get_connection()?; + conn.del(key)?; + Ok(()) + } + + pub fn run_queries_in_parallel(&self, keys: Vec) -> Vec>> { + let mut handles = vec![]; + + for key in keys { + let client = self.client.clone(); + let handle = thread::spawn(move || { + let conn = client.get_connection(); + conn.and_then(|mut conn| conn.get(&key)) + }); + + handles.push(handle); + } + + let mut results = vec![]; + for handle in handles { + results.push(handle.join().unwrap()); + } + + results + } +}