-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce pelikan_bloomcache #468
base: master
Are you sure you want to change the base?
Changes from all commits
3206be9
0868694
305e366
d2e780a
ec6709f
0f86703
dcfc7fb
7634047
6846472
5b36a9d
2ab7c1e
eceae18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
// Copyright 2021 Twitter, Inc. | ||
// Licensed under the Apache License, Version 2.0 | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
use crate::units::MB; | ||
|
||
use serde::{Deserialize, Serialize}; | ||
|
||
const BLOOM_DEFAULT_SIZE: usize = 16 * MB; | ||
const BLOOM_DEFAULT_HASHES: usize = 64; | ||
|
||
fn size() -> usize { | ||
BLOOM_DEFAULT_SIZE | ||
} | ||
|
||
fn hashes() -> usize { | ||
BLOOM_DEFAULT_HASHES | ||
} | ||
|
||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct Bloom { | ||
/// The size of the bloom filter in bytes. | ||
#[serde(default = "size")] | ||
pub size: usize, | ||
|
||
/// The number of hash functions that are evaluated for each value inserted.F | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo? |
||
#[serde(default = "hashes")] | ||
pub hashes: usize, | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
} | ||
|
||
impl Default for Bloom { | ||
fn default() -> Self { | ||
Self { | ||
size: BLOOM_DEFAULT_SIZE, | ||
hashes: BLOOM_DEFAULT_HASHES, | ||
} | ||
} | ||
} | ||
|
||
pub trait BloomConfig { | ||
fn bloom(&self) -> &Bloom; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
// Copyright 2022 Twitter, Inc. | ||
// Licensed under the Apache License, Version 2.0 | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
use crate::*; | ||
|
||
use serde::{Deserialize, Serialize}; | ||
|
||
use std::io::Read; | ||
|
||
// constants to define default values | ||
const DAEMONIZE: bool = false; | ||
const PID_FILENAME: Option<String> = None; | ||
const DLOG_INTERVAL: usize = 500; | ||
|
||
// helper functions | ||
fn daemonize() -> bool { | ||
DAEMONIZE | ||
} | ||
|
||
fn pid_filename() -> Option<String> { | ||
PID_FILENAME | ||
} | ||
|
||
fn dlog_interval() -> usize { | ||
DLOG_INTERVAL | ||
} | ||
|
||
// struct definitions | ||
#[derive(Serialize, Deserialize, Debug)] | ||
pub struct BloomcacheConfig { | ||
// top-level | ||
#[serde(default = "daemonize")] | ||
daemonize: bool, | ||
#[serde(default = "pid_filename")] | ||
pid_filename: Option<String>, | ||
#[serde(default = "dlog_interval")] | ||
dlog_interval: usize, | ||
|
||
// application modules | ||
#[serde(default)] | ||
admin: Admin, | ||
#[serde(default)] | ||
server: Server, | ||
#[serde(default)] | ||
worker: Worker, | ||
#[serde(default)] | ||
time: Time, | ||
#[serde(default)] | ||
tls: Tls, | ||
#[serde(default)] | ||
bloom: Bloom, | ||
|
||
// ccommon | ||
#[serde(default)] | ||
buf: Buf, | ||
#[serde(default)] | ||
debug: Debug, | ||
#[serde(default)] | ||
klog: Klog, | ||
#[serde(default)] | ||
sockio: Sockio, | ||
#[serde(default)] | ||
tcp: Tcp, | ||
} | ||
|
||
impl AdminConfig for BloomcacheConfig { | ||
fn admin(&self) -> &Admin { | ||
&self.admin | ||
} | ||
} | ||
|
||
impl BufConfig for BloomcacheConfig { | ||
fn buf(&self) -> &Buf { | ||
&self.buf | ||
} | ||
} | ||
|
||
impl DebugConfig for BloomcacheConfig { | ||
fn debug(&self) -> &Debug { | ||
&self.debug | ||
} | ||
} | ||
|
||
impl KlogConfig for BloomcacheConfig { | ||
fn klog(&self) -> &Klog { | ||
&self.klog | ||
} | ||
} | ||
|
||
impl ServerConfig for BloomcacheConfig { | ||
fn server(&self) -> &Server { | ||
&self.server | ||
} | ||
} | ||
|
||
impl SockioConfig for BloomcacheConfig { | ||
fn sockio(&self) -> &Sockio { | ||
&self.sockio | ||
} | ||
} | ||
|
||
impl TcpConfig for BloomcacheConfig { | ||
fn tcp(&self) -> &Tcp { | ||
&self.tcp | ||
} | ||
} | ||
|
||
impl TimeConfig for BloomcacheConfig { | ||
fn time(&self) -> &Time { | ||
&self.time | ||
} | ||
} | ||
|
||
impl TlsConfig for BloomcacheConfig { | ||
fn tls(&self) -> &Tls { | ||
&self.tls | ||
} | ||
} | ||
|
||
impl WorkerConfig for BloomcacheConfig { | ||
fn worker(&self) -> &Worker { | ||
&self.worker | ||
} | ||
|
||
fn worker_mut(&mut self) -> &mut Worker { | ||
&mut self.worker | ||
} | ||
} | ||
|
||
impl BloomConfig for BloomcacheConfig { | ||
fn bloom(&self) -> &Bloom { | ||
&self.bloom | ||
} | ||
} | ||
|
||
// implementation | ||
impl BloomcacheConfig { | ||
pub fn load(file: &str) -> Result<Self, std::io::Error> { | ||
let mut file = std::fs::File::open(file)?; | ||
let mut content = String::new(); | ||
file.read_to_string(&mut content)?; | ||
match toml::from_str(&content) { | ||
Ok(t) => Ok(t), | ||
Err(e) => { | ||
error!("{}", e); | ||
Err(std::io::Error::new( | ||
std::io::ErrorKind::Other, | ||
"Error parsing config", | ||
)) | ||
} | ||
} | ||
} | ||
|
||
pub fn daemonize(&self) -> bool { | ||
self.daemonize | ||
} | ||
|
||
pub fn pid_filename(&self) -> Option<String> { | ||
self.pid_filename.clone() | ||
} | ||
|
||
pub fn dlog_interval(&self) -> usize { | ||
self.dlog_interval | ||
} | ||
} | ||
|
||
// trait implementations | ||
impl Default for BloomcacheConfig { | ||
fn default() -> Self { | ||
Self { | ||
daemonize: daemonize(), | ||
pid_filename: pid_filename(), | ||
dlog_interval: dlog_interval(), | ||
|
||
admin: Default::default(), | ||
server: Default::default(), | ||
worker: Default::default(), | ||
time: Default::default(), | ||
bloom: Default::default(), | ||
|
||
buf: Default::default(), | ||
debug: Default::default(), | ||
klog: Default::default(), | ||
sockio: Default::default(), | ||
tcp: Default::default(), | ||
tls: Default::default(), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,4 +18,6 @@ config = { path = "../config" } | |
protocol-common = { path = "../protocol/common" } | ||
protocol-memcache = { path = "../protocol/memcache" } | ||
protocol-ping = { path = "../protocol/ping" } | ||
seg = { path = "../storage/seg" } | ||
protocol-http = { path = "../protocol/http" } | ||
seg = { path = "../storage/seg" } | ||
bloom = { path = "../storage/bloom" } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would prefer alphabetical ordering |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Copyright 2021 Twitter, Inc. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2022 |
||
// Licensed under the Apache License, Version 2.0 | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
use protocol_common::Execute; | ||
use protocol_http::{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather not use this style here and would prefer two use statements. |
||
request::{ParseData, Request, RequestData}, | ||
Headers, Response, Storage, | ||
}; | ||
|
||
use crate::Bloom; | ||
|
||
impl Execute<ParseData, Response> for Bloom { | ||
fn execute(&mut self, data: &ParseData) -> Response { | ||
let request = match &data.0 { | ||
Ok(request) => request, | ||
Err(e) => return e.to_response(), | ||
}; | ||
|
||
let Request { headers, data } = request; | ||
match data { | ||
RequestData::Get(key) => self.get(key, headers), | ||
RequestData::Put(key, value) => self.put(key, value, headers), | ||
RequestData::Delete(key) => self.delete(key, headers), | ||
} | ||
} | ||
} | ||
|
||
impl Storage for Bloom { | ||
fn get(&mut self, key: &[u8], _headers: &Headers) -> Response { | ||
if self.data.contains(key) { | ||
Response::builder(200).body(b"") | ||
} else { | ||
Response::builder(404).body(b"") | ||
} | ||
} | ||
|
||
fn put(&mut self, key: &[u8], _value: &[u8], _headers: &Headers) -> Response { | ||
let exists = self.data.contains(key); | ||
self.data.insert(key); | ||
|
||
if exists { | ||
Response::builder(200).body(b"") | ||
} else { | ||
Response::builder(201).body(b"") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we us named status codes? Had to look this one up. |
||
} | ||
} | ||
|
||
fn delete(&mut self, _key: &[u8], _headers: &Headers) -> Response { | ||
let mut builder = Response::builder(405); | ||
builder.header("Content-Type", b"text/plain"); | ||
builder.body(b"DELETE method not supported") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// Copyright 2021 Twitter, Inc. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2022 |
||
// Licensed under the Apache License, Version 2.0 | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
use bloom::BloomFilter; | ||
use config::BloomConfig; | ||
|
||
use crate::EntryStore; | ||
|
||
mod http; | ||
|
||
/// A wrapper around [`bloom::BloomFilter`] which implements `EntryStore` | ||
/// protocol traits. | ||
pub struct Bloom { | ||
data: BloomFilter<[u8]>, | ||
} | ||
|
||
impl Bloom { | ||
/// Create a bloom filter storage based on the config. | ||
pub fn new<T: BloomConfig>(config: &T) -> Result<Self, std::io::Error> { | ||
// TODO: Validate the config here and return an error. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally we've been putting names on the TODOs - eg: |
||
|
||
let config = config.bloom(); | ||
Ok(Self { | ||
data: BloomFilter::new(config.size * 8, config.hashes), | ||
}) | ||
} | ||
} | ||
|
||
impl EntryStore for Bloom { | ||
fn clear(&mut self) { | ||
self.data.clear(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
[package] | ||
name = "pelikan_bloomcache" | ||
|
||
version = { workspace = true } | ||
edition = { workspace = true } | ||
homepage = { workspace = true } | ||
repository = { workspace = true } | ||
license = { workspace = true } | ||
|
||
[dependencies] | ||
backtrace = { workspace = true } | ||
clap = { workspace = true } | ||
common = { path = "../../common" } | ||
config = { path = "../../config" } | ||
entrystore = { path = "../../entrystore" } | ||
logger = { path = "../../logger" } | ||
protocol-http = { path = "../../protocol/http" } | ||
rustcommon-metrics = { workspace = true } | ||
server = { path = "../../core/server" } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
// Copyright 2021 Twitter, Inc. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2022 |
||
// Licensed under the Apache License, Version 2.0 | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
use config::*; | ||
use entrystore::Bloom; | ||
use logger::*; | ||
use protocol_http::{request::ParseData, RequestParser, Response}; | ||
use server::{Process, ProcessBuilder}; | ||
|
||
type Parser = RequestParser; | ||
type Storage = Bloom; | ||
|
||
pub struct BloomCache { | ||
process: Process, | ||
} | ||
|
||
impl BloomCache { | ||
/// Create a new bloom cache from the provided config. | ||
pub fn new(config: BloomcacheConfig) -> std::io::Result<Self> { | ||
common::metrics::init(); | ||
|
||
let log = configure_logging(&config); | ||
let storage = Storage::new(&config)?; | ||
let parser = Parser::new(); | ||
|
||
let builder = ProcessBuilder::<Parser, ParseData, Response, Storage>::new( | ||
&config, log, parser, storage, | ||
)? | ||
.version(env!("CARGO_PKG_VERSION")); | ||
|
||
Ok(Self { | ||
process: builder.spawn(), | ||
}) | ||
} | ||
|
||
/// Wait for all threads to complete. Blocks until the process has fully | ||
/// terminated. Under normal conditions, this will block indefinitely. | ||
pub fn wait(self) { | ||
self.process.wait() | ||
} | ||
|
||
/// Triggers a shutdown of the process and blocks until the process has | ||
/// fully terminated. This is more likely to be used for running integration | ||
/// tests or other automated testing. | ||
pub fn shutdown(self) { | ||
self.process.shutdown() | ||
} | ||
} | ||
|
||
common::metrics::test_no_duplicates!(); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
// Copyright 2021 Twitter, Inc. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2022 |
||
// Licensed under the Apache License, Version 2.0 | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
#[macro_use] | ||
extern crate logger; | ||
|
||
use backtrace::Backtrace; | ||
use clap::{App, Arg}; | ||
use config::BloomcacheConfig; | ||
use pelikan_bloomcache::BloomCache; | ||
use rustcommon_metrics::*; | ||
use server::PERCENTILES; | ||
|
||
fn main() { | ||
// custom panic hook to terminate whole process after unwinding | ||
std::panic::set_hook(Box::new(|s| { | ||
error!("{}", s); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change this to eprintln please. We don't guarantee the log is flushed before terminating and having the error message lost is not a great experience. |
||
eprintln!("{:?}", Backtrace::new()); | ||
std::process::exit(101); | ||
})); | ||
|
||
// parse command line options | ||
let matches = App::new(env!("CARGO_BIN_NAME")) | ||
.version(env!("CARGO_PKG_VERSION")) | ||
.version_short("v") | ||
.arg( | ||
Arg::with_name("stats") | ||
.short("s") | ||
.long("stats") | ||
.help("List all metrics in stats") | ||
.takes_value(false), | ||
) | ||
.arg( | ||
Arg::with_name("CONFIG") | ||
.help("Server configuration file") | ||
.index(1), | ||
) | ||
.get_matches(); | ||
|
||
if matches.is_present("stats") { | ||
println!("{:<31} {:<15} DESCRIPTION", "NAME", "TYPE"); | ||
|
||
let mut metrics = Vec::new(); | ||
|
||
for metric in &rustcommon_metrics::metrics() { | ||
let any = match metric.as_any() { | ||
Some(any) => any, | ||
None => { | ||
continue; | ||
} | ||
}; | ||
|
||
if any.downcast_ref::<Counter>().is_some() { | ||
metrics.push(format!("{:<31} counter", metric.name())); | ||
} else if any.downcast_ref::<Gauge>().is_some() { | ||
metrics.push(format!("{:<31} gauge", metric.name())); | ||
} else if any.downcast_ref::<Heatmap>().is_some() { | ||
for (label, _) in PERCENTILES { | ||
let name = format!("{}_{}", metric.name(), label); | ||
metrics.push(format!("{:<31} percentile", name)); | ||
} | ||
} else { | ||
continue; | ||
} | ||
} | ||
|
||
metrics.sort(); | ||
for metric in metrics { | ||
println!("{}", metric); | ||
} | ||
std::process::exit(0); | ||
} | ||
|
||
// load config from file | ||
let config = if let Some(file) = matches.value_of("CONFIG") { | ||
debug!("loading config: {}", file); | ||
match BloomcacheConfig::load(file) { | ||
Ok(c) => c, | ||
Err(e) => { | ||
eprintln!("error launching bloomcache: {}", e); | ||
std::process::exit(1); | ||
} | ||
} | ||
} else { | ||
Default::default() | ||
}; | ||
|
||
match BloomCache::new(config) { | ||
Ok(s) => s.wait(), | ||
Err(e) => { | ||
eprintln!("error launching bloomcache: {}", e); | ||
std::process::exit(1); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2022