Skip to content

Commit

Permalink
Merge pull request #79 from jjcomer/redis
Browse files Browse the repository at this point in the history
Add redis caching and ability to configure db caches
  • Loading branch information
jjcomer authored Nov 29, 2023
2 parents b596974 + 72129b6 commit f7386f1
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 305 deletions.
402 changes: 215 additions & 187 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ doc = false

[package]
name = 'hogan'
version = '0.13.1'
version = '0.13.2'
authors = [
'Jonathan Morley <[email protected]>',
'Josh Comer <[email protected]>',
Expand All @@ -27,6 +27,7 @@ log = '0.4'
lru_time_cache = '0.11'
lru = '0.12'
parking_lot = '0.12'
redis = '0.23'
riker = '0.4'
riker-patterns = '0.4'
serde_derive = '1.0'
Expand All @@ -38,11 +39,12 @@ tempfile = '3'
thiserror = '1.0'
url = '2'
walkdir = '2'
which = '4.4'
which = '5.0'
zip = '0.6'


[dependencies.rusqlite]
version = '0.29'
version = '0.30'
features = ['bundled']

[dependencies.git2]
Expand Down
8 changes: 6 additions & 2 deletions src/app/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,17 @@ pub enum AppCommand {

///Filepath to the embedded db for storing environments. Will be created if it doesn't exist. If not provided a
/// random temp directory will be created
#[structopt(long = "db", value_name = "PATH", default_value = "hogan.db")]
db_path: String,
#[structopt(long = "db", value_name = "PATH")]
db_path: Option<String>,

///Maximum age of db entries based on number of days
#[structopt(long = "db-max-age", value_name = "DAYS", default_value = "90")]
db_max_age: usize,

///Connection string for redis server. Off by default. Will use "db-max-age" as TTL value
#[structopt(long = "redis", value_name = "CONNECTION")]
redis_connection: Option<String>,

///The delay between background fetches against the git repo
#[structopt(
long = "fetch-poller",
Expand Down
48 changes: 34 additions & 14 deletions src/app/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use crate::app::datadogstatsd::{CustomMetrics, DdMetrics};
use crate::app::fetch_actor;
use crate::app::head_actor;
use crate::storage::cache::{Cache, CleanupActor};
use crate::storage::{lru, multi, sqlite};
use crate::storage::{lru, multi, redis, sqlite};
use actix_web::dev::Service;
use actix_web::middleware::Logger;
use actix_web::{get, middleware, post, web, HttpResponse, HttpServer};
use anyhow::{Context, Result};
use hogan::config::{ConfigDir, EnvironmentDescription};
use hogan::error::HoganError;
use itertools::Itertools;
use parking_lot::Mutex;
use regex::Regex;
use riker::actors::ActorSystem;
Expand Down Expand Up @@ -91,7 +92,8 @@ pub fn start_up_server(
environments_regex: Regex,
datadog: bool,
environment_pattern: String,
db_path: String,
db_path: Option<String>,
redis_connection: Option<String>,
db_max_age: usize,
fetch_poller: u64,
allow_fetch: bool,
Expand All @@ -117,25 +119,43 @@ pub fn start_up_server(
fetch_poller,
);

let cache = Arc::new(multi::MultiCache::new(vec![
Box::new(lru::LruEnvCache::new("lru", cache_size)?),
Box::new(sqlite::SqliteCache::new(&db_path)),
]));
info!("Adding LRU cache of size {}", cache_size);
let mut caches: Vec<Box<dyn Cache + Send + Sync>> =
vec![Box::new(lru::LruEnvCache::new("lru", cache_size)?)];

if let Some(connection_string) = redis_connection {
info!("Adding redis cache at: {}", connection_string);
caches.push(Box::new(redis::RedisCache::new(
"redis",
&connection_string,
db_max_age * 24 * 60 * 60,
)?));
}

CleanupActor::init_db_cleanup_system(
&actor_system,
&[Arc::new(Box::new(sqlite::SqliteCache::new(&db_path)))],
db_max_age,
dd_metrics.clone(),
);
if let Some(db_path) = db_path {
info!("Adding sqlite cache at: {}", db_path);
let sqlite_cache = Box::new(sqlite::SqliteCache::new(&db_path));
caches.push(sqlite_cache.clone());

info!("Starting db cleanup system");
CleanupActor::init_db_cleanup_system(
&actor_system,
&[Arc::new(sqlite_cache.clone())],
db_max_age,
dd_metrics.clone(),
);
}

let cache_order = caches.iter().map(|c| c.id()).join(", ");
info!("Caching order: {}", cache_order);

let cache = Arc::new(multi::MultiCache::new(caches));

let write_lock = Mutex::new(0);

info!("Starting server on {}:{}", address, port);

let state = ServerState {
//environments,
//environment_listings,
cache,
config_dir,
write_lock,
Expand Down
4 changes: 2 additions & 2 deletions src/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fn detach_head(repo: &Repository, sha: &str) -> Result<()> {
}

pub fn ext_fetch(path: &Path, remote: &str) -> Result<()> {
info!("Fetching {}", remote);
debug!("Fetching {}", remote);
let mut fetch_cmd = Command::new("git")
.current_dir(path.to_str().unwrap())
.args(["fetch", "--prune", remote])
Expand All @@ -135,7 +135,7 @@ pub fn ext_fetch(path: &Path, remote: &str) -> Result<()> {
}

pub fn ext_maintenance(path: &Path) -> Result<()> {
info!("Performing maintenance");
debug!("Performing maintenance");
let mut maintenance_cmd = Command::new("git")
.current_dir(path.to_str().unwrap())
.args(["maintenance", "run", "--auto"])
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ fn main() -> Result<()> {
db_max_age,
fetch_poller,
allow_fetch,
redis_connection,
} => {
server::start_up_server(
common,
Expand All @@ -60,6 +61,7 @@ fn main() -> Result<()> {
datadog,
environment_pattern,
db_path,
redis_connection,
db_max_age,
fetch_poller,
allow_fetch,
Expand Down
96 changes: 96 additions & 0 deletions src/storage/cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::app::datadogstatsd::CustomMetrics;
use crate::app::datadogstatsd::DdMetrics;
use anyhow::Result;
use compression::prelude::*;
use hogan::config::Environment;
use hogan::config::EnvironmentDescription;
use riker::actors::*;
use serde::Deserialize;
use serde::Serialize;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

Expand All @@ -16,6 +19,99 @@ pub trait Cache {
fn write_env_listing(&self, sha: &str, data: &[EnvironmentDescription]) -> Result<()>;
}

#[derive(Default, Serialize, Deserialize, Debug)]
pub struct WritableEnvironment {
pub config_data: String,
pub environment: String,
pub environment_type: Option<String>,
}

impl From<&Environment> for WritableEnvironment {
fn from(environment: &Environment) -> Self {
WritableEnvironment {
config_data: environment.config_data.to_string(),
environment: environment.environment.to_owned(),
environment_type: environment.environment_type.to_owned(),
}
}
}

impl From<WritableEnvironment> for Environment {
fn from(environment: WritableEnvironment) -> Self {
Environment {
config_data: serde_json::from_str(&environment.config_data).unwrap(),
environment: environment.environment.to_owned(),
environment_type: environment.environment_type.to_owned(),
}
}
}

#[derive(Default, Serialize, Deserialize, Debug)]
pub struct WritableEnvironmentListing {
pub environments: Vec<EnvironmentDescription>,
}

impl From<&[EnvironmentDescription]> for WritableEnvironmentListing {
fn from(environments: &[EnvironmentDescription]) -> Self {
Self {
environments: environments.to_owned(),
}
}
}

fn compress_data(data: Vec<u8>) -> Result<Vec<u8>> {
let compressed_data = data
.into_iter()
.encode(&mut BZip2Encoder::new(6), Action::Finish)
.collect::<Result<Vec<_>, _>>()?;

Ok(compressed_data)
}

fn decompress_data(data: Vec<u8>) -> Result<Vec<u8>> {
let decompressed_data = data
.into_iter()
.decode(&mut BZip2Decoder::new())
.collect::<Result<Vec<_>, _>>()?;

Ok(decompressed_data)
}

pub fn serialize_env(data: &Environment) -> Result<Vec<u8>> {
let writable_data: WritableEnvironment = data.into();
let encoded_data = bincode::serialize(&writable_data)?;
let compressed_data = compress_data(encoded_data)?;

Ok(compressed_data)
}

pub fn deserialize_env(data: Vec<u8>) -> Result<Environment> {
let decompressed_data = decompress_data(data)?;
let decoded: WritableEnvironment = match bincode::deserialize(&decompressed_data) {
Ok(environment) => environment,
Err(e) => {
return Err(e.into());
}
};
Ok(decoded.into())
}

pub fn serialize_env_listing(data: &[EnvironmentDescription]) -> Result<Vec<u8>> {
let writable_data: WritableEnvironmentListing = data.into();
let encoded_data = bincode::serialize(&writable_data)?;
let compressed_data = compress_data(encoded_data)?;
Ok(compressed_data)
}

pub fn deserialize_env_listing(data: Vec<u8>) -> Result<Vec<EnvironmentDescription>> {
let decompressed_data = decompress_data(data)?;
let decoded: WritableEnvironmentListing = match bincode::deserialize(&decompressed_data) {
Ok(environment) => environment,
Err(e) => return Err(e.into()),
};
Ok(decoded.environments)
}

#[derive(Debug, Clone)]
pub struct ExecuteCleanup {}

Expand Down
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod cache;
pub mod lru;
pub mod multi;
pub mod sqlite;
pub mod redis;
96 changes: 96 additions & 0 deletions src/storage/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use super::cache;
use crate::storage::cache::Cache;
use anyhow::Result;
use hogan::config::Environment;
use hogan::config::EnvironmentDescription;
use redis;

use redis::Commands as _;
use std::sync::Arc;

pub struct RedisCache {
client: redis::Client,
id: String,
ttl: usize,
}

impl RedisCache {
pub fn new(id: &str, connection_string: &str, ttl: usize) -> Result<Self> {
let client = redis::Client::open(connection_string)?;
Ok(RedisCache {
client,
id: id.to_owned(),
ttl,
})
}
}

impl Cache for RedisCache {
fn id(&self) -> &str {
&self.id
}

fn clean(&self, _max_age: usize) -> Result<()> {
Ok(())
}

fn read_env(&self, env: &str, sha: &str) -> Result<Option<Arc<Environment>>> {
let key = gen_env_key(sha, env);
let mut connection = self.client.get_connection()?;
let serialized_data: Vec<u8> = match connection.get(&key) {
Ok(data) => {
connection.expire(&key, self.ttl)?;
data
}
Err(_e) => return Ok(None),
};

let data = cache::deserialize_env(serialized_data)?;

Ok(Some(Arc::new(data)))
}

fn write_env(&self, env: &str, sha: &str, data: &Environment) -> Result<()> {
let key = gen_env_key(sha, env);
let serialized_data = cache::serialize_env(data)?;
let mut connection = self.client.get_connection()?;

let opts = redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(self.ttl));

connection.set_options(&key, serialized_data, opts)?;
debug!("Wrote env {} to redis.", key);
Ok(())
}

fn read_env_listing(&self, sha: &str) -> Result<Option<Arc<Vec<EnvironmentDescription>>>> {
let key = gen_env_listing_key(sha);
let mut connection = self.client.get_connection()?;
let serialized_data: Vec<u8> = match connection.get(&key) {
Ok(data) => {
connection.expire(&key, self.ttl)?;
data
}
Err(_e) => return Ok(None),
};
let data = cache::deserialize_env_listing(serialized_data)?;
Ok(Some(Arc::new(data)))
}

fn write_env_listing(&self, sha: &str, data: &[EnvironmentDescription]) -> Result<()> {
let key = gen_env_listing_key(sha);
let serialized_data = cache::serialize_env_listing(data)?;
let mut connection = self.client.get_connection()?;

let opts = redis::SetOptions::default().with_expiration(redis::SetExpiry::EX(self.ttl));
connection.set_options(key, serialized_data, opts)?;
Ok(())
}
}

fn gen_env_key(sha: &str, env: &str) -> String {
format!("env::{}::{}", sha, env)
}

fn gen_env_listing_key(sha: &str) -> String {
format!("listing::{}", sha)
}
Loading

0 comments on commit f7386f1

Please sign in to comment.