diff --git a/Cargo.lock b/Cargo.lock index 1fe4e344..a9e011ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -446,6 +446,25 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.59.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", +] + [[package]] name = "bit-set" version = "0.5.2" @@ -783,6 +802,15 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom 7.1.0", +] + [[package]] name = "cfb-mode" version = "0.6.0" @@ -873,6 +901,17 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0fc239e0f6cb375d2402d48afb92f76f5404fd1df208a41930ec81eda078bea" +[[package]] +name = "clang-sys" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa66045b9cb23c2e9c1520732030608b02ee07e5cfaa5a521ec15ded7fa24c90" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clear_on_drop" version = "0.2.4" @@ -3023,6 +3062,16 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libloading" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afe203d669ec979b7128619bae5a63b7b42e9203c1b29146079ee05e2f604b52" +dependencies = [ + "cfg-if 1.0.0", + "winapi 0.3.9", +] + [[package]] name = "libm" version = "0.2.1" @@ -3109,6 +3158,18 @@ dependencies = [ "xorf", ] +[[package]] +name = "librocksdb-sys" +version = "6.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c309a9d2470844aceb9a4a098cf5286154d20596868b75a6b36357d2bb9ca25d" +dependencies = [ + "bindgen", + "cc", + "glob", + "libc", +] + [[package]] name = "libssh2-sys" version = "0.2.23" @@ -4001,6 +4062,12 @@ dependencies = [ "crypto-mac 0.11.1", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "pem" version = "0.8.3" @@ -4477,6 +4544,26 @@ dependencies = [ "warp", ] +[[package]] +name = "radicle-indexer-node" +version = "0.0.1" +dependencies = [ + "anyhow", + "argh", + "async-trait", + "git2", + "librad", + "rocksdb", + "serde", + "serde_json", + "shared", + "thiserror", + "tokio", + "tracing", + "url", + "warp", +] + [[package]] name = "radicle-keystore" version = "0.1.1" @@ -4873,6 +4960,16 @@ dependencies = [ "syn", ] +[[package]] +name = "rocksdb" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a62eca5cacf2c8261128631bed9f045598d40bfbe4b29f5163f0f802f8f44a7" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rpassword" version = "4.0.5" @@ -5274,6 +5371,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "shlex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" + [[package]] name = "signal-hook" version = "0.3.12" diff --git a/Cargo.toml b/Cargo.toml index 6750444e..ed0cd76a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "org-node", "git-server", "service-init", + "indexer-node", "shared", ] diff --git a/indexer-node/Cargo.toml b/indexer-node/Cargo.toml new file mode 100644 index 00000000..46449e60 --- /dev/null +++ b/indexer-node/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "radicle-indexer-node" +license = "MIT OR Apache-2.0" +version = "0.0.1" +authors = ["Alexis Sellier "] +edition = "2018" +build = "../build.rs" +default-run = "radicle-indexer-node" + +[dependencies] +shared = { path = "../shared", default-features = false } +async-trait = { version = "0.1" } +serde = { version = "1", features = ["derive"] } +serde_json = { version = "1" } +librad = { version = "0" } +tokio = { version = "1.2", features = ["macros", "rt", "sync"] } +thiserror = { version = "1" } +argh = { version = "0.1.4" } +tracing = "0.1" +anyhow = "1.0" +warp = { version = "0.3.1", features = ["tls"] } +rocksdb = "0.17.0" +url = { version = "*" } +git2 = "0.13" diff --git a/indexer-node/src/db.rs b/indexer-node/src/db.rs new file mode 100644 index 00000000..6feb10a3 --- /dev/null +++ b/indexer-node/src/db.rs @@ -0,0 +1,104 @@ +use rocksdb::{DBWithThreadMode, Direction, Error, IteratorMode, MultiThreaded}; + +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct Handle { + pub db: Arc>, +} + +impl Handle { + /// Creates a DB handle and stores its data at the path (folder). + pub fn from_path(path: &str) -> Result { + type Mdb = DBWithThreadMode; + let db = Mdb::open_default(path)?; + //let db = DB::open_default(path)?; + Ok(Handle { db: Arc::new(db) }) + } + + /// Key should hold alias::urn, and value holds the server + /// in which repo is hosted. + pub fn add_repository(&self, k: K, v: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let k = [b"repo::", k.as_ref()].concat(); + self.db.put(k, v) + } + + /// Iterates through all keys starting with prefix and returns (key, value). + fn iterate_prefix

(&self, prefix: P) -> impl Iterator + '_ + where + P: AsRef<[u8]> + 'static, + { + self.db + .iterator(IteratorMode::From(prefix.as_ref(), Direction::Forward)) + .into_iter() + .take_while(move |(k, _)| k.starts_with(prefix.as_ref())) + // This is safe because inputs are checked on insertion + // and leaking the Box leaves sole ownership to String. + .map(|(k, v)| unsafe { + let k = Box::leak(k); + let v = Box::leak(v); + ( + String::from_raw_parts(k.as_mut_ptr(), k.len(), k.len()), + String::from_raw_parts(v.as_mut_ptr(), v.len(), v.len()), + ) + }) + } + + /// Iterates through all keys starting *from* prefix and returns (key, value). + pub fn iterate_from_prefix

(&self, prefix: P) -> impl Iterator + '_ + where + P: AsRef<[u8]> + 'static, + { + self.db + .iterator(IteratorMode::From(prefix.as_ref(), Direction::Forward)) + .into_iter() + // This is safe because inputs are checked on insertion + // and leaking the Box leaves sole ownership to String. + .map(|(k, v)| unsafe { + let k = Box::leak(k); + let v = Box::leak(v); + ( + String::from_raw_parts(k.as_mut_ptr(), k.len(), k.len()), + String::from_raw_parts(v.as_mut_ptr(), v.len(), v.len()), + ) + }) + } + + /// Lists all repositories' alias::urn (key) and servers (value) hosting them. + pub fn list_repositories(&self) -> impl Iterator + '_ { + let prefix = b"repo::"; + self.iterate_prefix(prefix) + } + + /* + /// Lists all repositories starting with alias::. + pub fn repos_starting_with(&self, alias: &str) -> impl Iterator + '_ { + let prefix = [b"repo::", alias.as_bytes()].concat(); + self.iterate_prefix(prefix) + } + + /// Lists all users starting with username::. + pub fn users_starting_with( + &self, + username: &str, + ) -> impl Iterator + '_ { + let prefix = [b"user::", username.as_bytes()].concat(); + self.iterate_prefix(prefix) + } + + /// Key should hold username::urn, and value holds the server + /// in which user is active. + pub fn add_user(&self, k: K, v: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let k = [b"user::", k.as_ref()].concat(); + self.db.put(k, v) + } + */ +} diff --git a/indexer-node/src/error.rs b/indexer-node/src/error.rs new file mode 100644 index 00000000..36704882 --- /dev/null +++ b/indexer-node/src/error.rs @@ -0,0 +1,14 @@ +/// Errors that may occur when interacting with [`librad::net::peer::Peer`]. +#[derive(Debug, thiserror::Error)] +#[allow(dead_code)] +pub enum Error { + /// The entity was not found. + #[error("entity not found")] + NotFound, + + /// An error occured while parsing URL. + #[error(transparent)] + ParseError(#[from] url::ParseError), +} + +impl warp::reject::Reject for Error {} diff --git a/indexer-node/src/git.rs b/indexer-node/src/git.rs new file mode 100644 index 00000000..5110fa6e --- /dev/null +++ b/indexer-node/src/git.rs @@ -0,0 +1,22 @@ +use std::process::Command; + +pub fn git>( + repo: &std::path::Path, + args: impl IntoIterator, +) -> Result { + let output = Command::new("git").current_dir(repo).args(args).output()?; + + if output.status.success() { + let out = if output.stdout.is_empty() { + &output.stderr + } else { + &output.stdout + }; + return Ok(String::from_utf8_lossy(out).into()); + } + + Err(anyhow::Error::new(std::io::Error::new( + std::io::ErrorKind::Other, + String::from_utf8_lossy(&output.stderr), + ))) +} diff --git a/indexer-node/src/lib.rs b/indexer-node/src/lib.rs new file mode 100644 index 00000000..6dfc5a06 --- /dev/null +++ b/indexer-node/src/lib.rs @@ -0,0 +1,274 @@ +#![allow(clippy::if_same_then_else)] +mod error; + +use error::Error; + +use std::net; +use std::path::PathBuf; + +use serde_json::json; +use warp::hyper::StatusCode; +use warp::reply; +use warp::reply::Json; +use warp::{self, filters::BoxedFilter, path, Filter, Rejection, Reply}; + +mod git; + +pub const VERSION: &str = env!("CARGO_PKG_VERSION"); +pub const PAGINATION: usize = 5; + +mod db; + +#[derive(Debug, Clone)] +pub struct Options { + pub root: PathBuf, + pub listen: net::SocketAddr, + pub tls_cert: Option, + pub tls_key: Option, + pub theme: String, +} + +#[derive(Debug, Clone)] +pub struct Context { + root: PathBuf, + handle: db::Handle, +} + +/// Run the HTTP API. +pub async fn run(options: Options) { + let db_path = options.root.join("indexer_rocksdb_storage"); + let handle = db::Handle::from_path(db_path.to_str().unwrap()).unwrap(); + + let ctx = Context { + root: options.root, + handle, + }; + + let v1 = path("v1"); + let projects = path("projects").and(repo_filters(ctx.clone())); + let sync = path("sync").and( + warp::get() + .map(move || ctx.clone()) + .and(path("seed")) + .and(path::param::()) + .and(path("namespace")) + .and(path::param::()) + .and(path::end()) + .and_then(sync_handler), + ); + + let routes = path::end() + .and_then(root_handler) + .or(v1.and(projects)) + .or(v1.and(sync)) + .recover(recover) + .with(warp::cors().allow_any_origin()) + .with(warp::log("indexer-node::api")); + + let server = warp::serve(routes); + + if let (Some(cert), Some(key)) = (options.tls_cert, options.tls_key) { + server + .tls() + .cert_path(cert) + .key_path(key) + .run(options.listen) + .await + } else { + server.run(options.listen).await + } +} + +async fn root_handler() -> Result { + let response = json!({ + "message": "Welcome!", + "service": "radicle-indexer-node", + "version": VERSION, + "path": "/", + "links": [ + { + "href": "/v1/projects", + "rel": "projects", + "type": "GET" + }, + { + "href": "/v1/orgs", + "rel": "orgs", + "type": "GET" + }, + { + "href": "/v1/users", + "rel": "users", + "type": "GET" + } + ] + }); + Ok(warp::reply::json(&response)) +} + +async fn recover(err: Rejection) -> Result { + let status = if err.is_not_found() { + StatusCode::NOT_FOUND + } else if let Some(Error::NotFound) = err.find::() { + StatusCode::NOT_FOUND + } else { + // Log the non-standard errors. + tracing::error!("Error: {:?}", err); + + StatusCode::BAD_REQUEST + }; + let res = reply::json(&json!({ + "error": status.canonical_reason(), + "code": status.as_u16() + })); + + Ok(reply::with_header( + reply::with_status(res, status), + "Content-Type", + "application/json", + )) +} + +/// Combination of all repo filters. +fn repo_filters(ctx: Context) -> BoxedFilter<(impl Reply,)> { + project_root_filter(ctx.clone()) + .or(project_by_id_filter(ctx.clone())) + .or(project_from_filter(ctx)) + .boxed() +} + +/// `GET /` +fn project_root_filter( + ctx: Context, +) -> impl Filter + Clone { + warp::get() + .map(move || ctx.clone()) + .and(path::end()) + .and_then(project_root_handler) + .boxed() +} + +/// `GET /:project` +fn project_by_id_filter( + ctx: Context, +) -> impl Filter + Clone { + warp::get() + .map(move || ctx.clone()) + .and(path::param::()) + .and(path::end()) + .and_then(project_by_id_handler) +} + +/// `GET /from/:start` +fn project_from_filter( + ctx: Context, +) -> impl Filter + Clone { + warp::get() + .map(move || ctx.clone()) + .and(path("from")) + .and(path::param::()) + .and(path::end()) + .and_then(project_from_handler) +} + +/// List all projects +async fn project_root_handler(ctx: Context) -> Result { + let projects: Vec<_> = ctx + .handle + .list_repositories() + .take(PAGINATION) + .map(|repo| { + json!({ + repo.0: repo.1, + }) + }) + .collect(); + + Ok(warp::reply::json(&projects)) +} + +/// List single project by id +async fn project_by_id_handler(ctx: Context, id: String) -> Result { + let projects: Vec<_> = ctx + .handle + .iterate_from_prefix(format!("repo::{}", id)) + .take(1) + .take_while(|(k, _)| k.starts_with("repo::")) + .map(|repo| { + json!({ + repo.0: repo.1 + }) + }) + .collect(); + + Ok(warp::reply::json(&projects)) +} + +/// List next projects from key prefix `from` +async fn project_from_handler(ctx: Context, from: String) -> Result { + let projects: Vec<_> = ctx + .handle + .iterate_from_prefix(from) + .take(PAGINATION) + .take_while(|(k, _)| k.starts_with("repo::")) + .map(|repo| { + json!({ + repo.0: repo.1 + }) + }) + .collect(); + + Ok(warp::reply::json(&projects)) +} + +/// `GET /sync/seed/:seed/namespace/:namespace/` +/// Handle sync requests +async fn sync_handler(ctx: Context, seed: String, namespace: String) -> Result { + let seed = format!("http://{}", seed); + let seed = url::Url::parse(&seed).map_err(Error::from)?; + let url = seed.join(&namespace).map_err(Error::from)?; + let repo = ctx.root.join("git"); + + let result = git::git( + &repo, + [ + "fetch", + "--force", + url.as_str(), + &format!("refs/rad/id:refs/namespaces/{}/refs/rad/id", namespace), + ], + ) + .is_ok(); + + if result { + sync_with_db(&ctx, &namespace); + } + + let response = json!({ "success": result }); + Ok(warp::reply::json(&response)) +} + +fn sync_with_db(ctx: &Context, namespace: &str) { + // Only sync if `namespace` is a Project identity. + // Identities of delegates are supposed to have been pushed beforehand. + use librad::git::Urn; + let urn = Urn::try_from_id(namespace).unwrap(); + + use librad::git::storage::ReadOnly; + use librad::paths::Paths; + + let paths = Paths::from_root(&ctx.root).unwrap(); + let storage = ReadOnly::open(&paths).expect("failed to read storage paths"); + + let identity = librad::git::identities::project::get(&storage, &urn); + + if identity.is_err() { + // not a project + return; + } + + let identity = identity.unwrap().unwrap(); + let _result = ctx + .handle + .add_repository(namespace, format!("{:?}", identity)); +} diff --git a/indexer-node/src/main.rs b/indexer-node/src/main.rs new file mode 100644 index 00000000..c48e8f28 --- /dev/null +++ b/indexer-node/src/main.rs @@ -0,0 +1,64 @@ +use std::net; +use std::path::PathBuf; + +use argh::FromArgs; + +use shared::LogFmt; + +use radicle_indexer_node as node; + +/// Radicle Super Node. +#[derive(FromArgs)] +pub struct Options { + /// listen on the following address for HTTP connections (default: 0.0.0.0:8779) + #[argh(option, default = "std::net::SocketAddr::from(([0, 0, 0, 0], 8779))")] + pub listen: net::SocketAddr, + + /// radicle root path, for key and git storage + #[argh(option)] + pub root: PathBuf, + + /// TLS certificate path + #[argh(option)] + pub tls_cert: Option, + + /// TLS key path + #[argh(option)] + pub tls_key: Option, + + /// syntax highlight theme + #[argh(option, default = r#"String::from("base16-ocean.dark")"#)] + pub theme: String, + + /// either "plain" or "gcp" (gcp available only when compiled-in) + #[argh(option, default = "LogFmt::Plain")] + pub log_format: LogFmt, +} + +impl Options { + pub fn from_env() -> Options { + argh::from_env() + } +} + +impl From for node::Options { + fn from(other: Options) -> Self { + Self { + root: other.root, + tls_cert: other.tls_cert, + tls_key: other.tls_key, + listen: other.listen, + theme: other.theme, + } + } +} + +#[tokio::main] +async fn main() { + let options = Options::from_env(); + + shared::init_logger(options.log_format); + tracing::info!("version {}-{}", env!("CARGO_PKG_VERSION"), env!("GIT_HEAD")); + + node::run(options.into()).await; +}