Skip to content

Commit

Permalink
feat: inital handling of plugin startup and context management
Browse files Browse the repository at this point in the history
  • Loading branch information
j-lanson authored and alilleybrinker committed Aug 15, 2024
1 parent 51bcf77 commit 8c714d3
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions hipcheck/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ tabled = "0.15.0"
fs_extra = "1.3.0"
tonic = "0.12.1"
prost = "0.13.1"
rand = "0.8.5"
tokio = { version = "1.39.2", features = ["time"] }

# Exactly matching the version of rustls used by ureq
# Get rid of default features since we don't use the AWS backed crypto provider (we use ring).
Expand Down
2 changes: 2 additions & 0 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod git2_log_shim;
mod git2_rustls_transport;
mod log_bridge;
mod metric;
#[allow(unused)]
mod plugin;
mod report;
mod session;
mod setup;
Expand Down
125 changes: 125 additions & 0 deletions hipcheck/src/plugin/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use crate::hipcheck::plugin_client::PluginClient;
use crate::plugin::{HcPluginClient, Plugin, PluginContext};
use crate::{hc_error, Result, F64};
use rand::Rng;
use std::collections::HashSet;
use std::ops::Range;
use std::process::Command;
use tokio::time::{sleep_until, Duration, Instant};

pub struct PluginExecutor {
max_spawn_attempts: usize,
max_conn_attempts: usize,
port_range: Range<u16>,
backoff_interval: Duration,
jitter_percent: u8,
est_ports: HashSet<u16>,
}
impl PluginExecutor {
pub fn new(
max_spawn_attempts: usize,
max_conn_attempts: usize,
port_range: Range<u16>,
backoff_interval_micros: u64,
jitter_percent: u8,
) -> Result<Self> {
if jitter_percent > 100 {
return Err(hc_error!(
"jitter_percent must be <= 100, got {}",
jitter_percent
));
}
let backoff_interval = Duration::from_micros(backoff_interval_micros);
Ok(PluginExecutor {
max_spawn_attempts,
max_conn_attempts,
port_range,
backoff_interval,
jitter_percent,
est_ports: HashSet::new(),
})
}
fn get_available_port(&mut self) -> Result<u16> {
for i in self.port_range.start..self.port_range.end {
if !self.est_ports.contains(&i)
&& std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok()
{
return Ok(i);
}
}
Err(hc_error!("Failed to find available port"))
}
pub async fn start_plugin(&mut self, plugin: &Plugin) -> Result<PluginContext> {
let mut rng = rand::thread_rng();
// Plugin startup design has inherent TOCTOU flaws since we tell the plugin
// which port we expect it to bind to. We can try to ensure the port we pass
// on the cmdline is not already in use, but it is still possible for that
// port to become unavailable between our check and the plugin's bind attempt.
// Hence the need for subsequent attempts if we get unlucky
let mut spawn_attempts: usize = 0;
while spawn_attempts < self.max_spawn_attempts {
// Find free port for process. Don't retry if we fail since this means all
// ports in the desired range are already bound
let port = self.get_available_port()?;
let port_str = port.to_string();
// Spawn plugin process
let Ok(mut proc) = Command::new(&plugin.entrypoint)
.args(["--port", port_str.as_str()])
.spawn()
else {
spawn_attempts += 1;
continue;
};
// Attempt to connect to the plugin's gRPC server up to N times, using
// linear backoff with a percentage jitter.
let mut conn_attempts = 0;
let mut opt_grpc: Option<HcPluginClient> = None;
while conn_attempts < self.max_conn_attempts {
// Jitter could be positive or negative, so mult by 2 to cover both sides
let jitter: i32 = rng.gen_range(0..(2 * self.jitter_percent)) as i32;
// Then subtract by self.jitter_percent to center around 0, and add to 100%
let jitter_percent = 1.0 + ((jitter - (self.jitter_percent as i32)) as f64 / 100.0);
// Once we are confident this math works, we can remove this
if !(0.0..=2.0).contains(&jitter_percent) {
panic!("Math error! We should have better guardrails around PluginExecutor field values.");
}
// sleep_duration = (backoff * conn_attempts) * (1.0 +/- jitter_percent)
let mut sleep_duration: Duration = self
.backoff_interval
.saturating_mul(conn_attempts as u32)
.mul_f64(jitter_percent);
sleep_until(Instant::now() + sleep_duration).await;
if let Ok(grpc) =
PluginClient::connect(format!("http://127.0.0.1:{port_str}")).await
{
opt_grpc = Some(grpc);
break;
} else {
conn_attempts += 1;
}
}
// If opt_grpc is None, we did not manage to connect to the plugin. Kill it
// and try again
let Some(grpc) = opt_grpc else {
if let Err(e) = proc.kill() {
println!("Failed to kill child process for plugin: {e}");
}
spawn_attempts += 1;
continue;
};
self.est_ports.insert(port);
// We now have an open gRPC connection to our plugin process
return Ok(PluginContext {
plugin: plugin.clone(),
port,
grpc,
proc,
channel: None,
});
}
Err(hc_error!(
"Reached max spawn attempts for plugin {}",
plugin.name
))
}
}
19 changes: 19 additions & 0 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
mod manager;
mod types;

use crate::plugin::manager::*;
pub use crate::plugin::types::*;

pub fn dummy() {
let plugin = Plugin {
name: "dummy".to_owned(),
entrypoint: "./dummy".to_owned(),
};
let manager = PluginExecutor::new(
/* max_spawn_attempts */ 3,
/* max_conn_attempts */ 5,
/* port_range */ 40000..u16::MAX,
/* backoff_interval_micros */ 1000,
/* jitter_percent */ 10,
);
}
149 changes: 149 additions & 0 deletions hipcheck/src/plugin/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use crate::hipcheck::plugin_client::PluginClient;
use crate::hipcheck::{
Configuration, ConfigurationResult as PluginConfigResult, ConfigurationStatus, Empty,
Schema as PluginSchema,
};
use crate::{hc_error, Result};
use serde_json::Value;
use std::collections::HashMap;
use std::ops::Not;
use std::process::Child;
use tonic::transport::Channel;

pub type HcPluginClient = PluginClient<Channel>;

#[derive(Clone, Debug)]
pub struct Plugin {
pub name: String,
pub entrypoint: String,
}

// Hipcheck-facing version of struct from crate::hipcheck
pub struct Schema {
pub query_name: String,
pub key_schema: Value,
pub output_schema: Value,
}
impl TryFrom<PluginSchema> for Schema {
type Error = crate::error::Error;
fn try_from(value: PluginSchema) -> Result<Self> {
let key_schema: Value = serde_json::from_str(value.key_schema.as_str())?;
let output_schema: Value = serde_json::from_str(value.output_schema.as_str())?;
Ok(Schema {
query_name: value.query_name,
key_schema,
output_schema,
})
}
}

// Hipcheck-facing version of struct from crate::hipcheck
pub struct ConfigurationResult {
pub status: ConfigurationStatus,
pub message: Option<String>,
}
impl TryFrom<PluginConfigResult> for ConfigurationResult {
type Error = crate::error::Error;
fn try_from(value: PluginConfigResult) -> Result<Self> {
let status: ConfigurationStatus = value.status.try_into()?;
let message = value.message.is_empty().not().then_some(value.message);
Ok(ConfigurationResult { status, message })
}
}
// hipcheck::ConfigurationStatus has an enum that captures both error and success
// scenarios. The below code allows interpreting the struct as a Rust Result. If
// the success variant was the status, Ok(()) is returned, otherwise the code
// is stuffed into a custom error type enum that equals the protoc-generated one
// minus the success variant.
impl ConfigurationResult {
pub fn as_result(&self) -> std::result::Result<(), ConfigError> {
let Ok(error) = self.status.try_into() else {
return Ok(());
};
Err(ConfigError::new(error, self.message.clone()))
}
}
pub enum ConfigErrorType {
Unknown = 0,
MissingRequiredConfig = 2,
UnrecognizedConfig = 3,
InvalidConfigValue = 4,
}
impl TryFrom<ConfigurationStatus> for ConfigErrorType {
type Error = crate::error::Error;
fn try_from(value: ConfigurationStatus) -> Result<Self> {
use ConfigErrorType::*;
use ConfigurationStatus::*;
Ok(match value as i32 {
x if x == ErrorUnknown as i32 => Unknown,
x if x == ErrorMissingRequiredConfiguration as i32 => MissingRequiredConfig,
x if x == ErrorUnrecognizedConfiguration as i32 => UnrecognizedConfig,
x if x == ErrorInvalidConfigurationValue as i32 => InvalidConfigValue,
x => {
return Err(hc_error!("status value '{}' is not an error", x));
}
})
}
}
pub struct ConfigError {
error: ConfigErrorType,
message: Option<String>,
}
impl ConfigError {
pub fn new(error: ConfigErrorType, message: Option<String>) -> Self {
ConfigError { error, message }
}
}

// State for managing an actively running plugin process
pub struct PluginContext {
pub plugin: Plugin,
pub port: u16,
pub grpc: HcPluginClient,
pub proc: Child,
pub channel: Option<String>,
}
// Redefinition of `grpc` field's functions with more useful types, additional
// error & sanity checking
impl PluginContext {
pub async fn get_query_schemas(&mut self) -> Result<Vec<Schema>> {
let mut res = self.grpc.get_query_schemas(Empty {}).await?;
let stream = res.get_mut();
let mut schema_builder: HashMap<String, PluginSchema> = HashMap::new();
while let Some(msg) = stream.message().await? {
// If we received a PluginSchema msg with this query name before,
// treat as a chunked msg and append its strings to existing entry
if let Some(existing) = schema_builder.get_mut(&msg.query_name) {
existing.key_schema.push_str(msg.key_schema.as_str());
existing.output_schema.push_str(msg.output_schema.as_str());
} else {
schema_builder.insert(msg.query_name.clone(), msg);
}
}
// Convert the aggregated PluginSchemas to Schema objects
schema_builder
.into_values()
.map(TryInto::try_into)
.collect()
}
pub async fn set_configuration(&mut self, conf: &Value) -> Result<ConfigurationResult> {
let conf_query = Configuration {
configuration: serde_json::to_string(&conf)?,
};
let res = self.grpc.set_configuration(conf_query).await?;
res.into_inner().try_into()
}
// TODO - the String in the result should be replaced with a structured
// type once the policy expression code is integrated
pub async fn get_default_policy_expression(&mut self) -> Result<String> {
let mut res = self.grpc.get_default_policy_expression(Empty {}).await?;
Ok(res.get_ref().policy_expression.to_owned())
}
}
impl Drop for PluginContext {
fn drop(&mut self) {
if let Err(e) = self.proc.kill() {
println!("Failed to kill child: {e}");
}
}
}

0 comments on commit 8c714d3

Please sign in to comment.