Skip to content

Commit

Permalink
feat: add dummy plugin and hidden cli command for invoking it
Browse files Browse the repository at this point in the history
  • Loading branch information
j-lanson authored and alilleybrinker committed Aug 23, 2024
1 parent 667d17b commit 79414b1
Show file tree
Hide file tree
Showing 13 changed files with 1,074 additions and 17 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
resolver = "2"

# Members of the workspace.
members = ["hipcheck", "hipcheck-macros", "xtask"]
members = ["hipcheck", "hipcheck-macros", "xtask", "plugins/dummy_rand_data"]

# Make sure Hipcheck is run with `cargo run`.
#
Expand Down
2 changes: 1 addition & 1 deletion hipcheck/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ tonic = "0.12.1"
prost = "0.13.1"
rand = "0.8.5"
kdl = "4.6.0"
tokio = { version = "1.39.2", features = ["rt", "sync", "time"] }
tokio = { version = "1.39.2", features = ["rt", "rt-multi-thread", "sync", "time"] }
futures = "0.3.30"
async-stream = "0.3.5"
num_enum = "0.7.3"
Expand Down
5 changes: 5 additions & 0 deletions hipcheck/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ pub enum FullCommands {
Ready,
Update(UpdateArgs),
Cache(CacheArgs),
Plugin,
PrintConfig,
PrintData,
PrintCache,
Expand All @@ -411,6 +412,7 @@ impl From<&Commands> for FullCommands {
Commands::Scoring => FullCommands::Scoring,
Commands::Update(args) => FullCommands::Update(args.clone()),
Commands::Cache(args) => FullCommands::Cache(args.clone()),
Commands::Plugin => FullCommands::Plugin,
}
}
}
Expand Down Expand Up @@ -439,6 +441,9 @@ pub enum Commands {
Update(UpdateArgs),
/// Manage Hipcheck cache
Cache(CacheArgs),
/// Execute temporary code for exercising plugin engine
#[command(hide = true)]
Plugin,
}

// If no subcommand matched, default to use of '-t <TYPE> <TARGET' syntax. In
Expand Down
44 changes: 44 additions & 0 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::cache::HcCache;
use crate::context::Context as _;
use crate::error::Error;
use crate::error::Result;
use crate::plugin::{HcPluginCore, Plugin, PluginExecutor, PluginWithConfig};
use crate::session::session::Session;
use crate::setup::{resolve_and_transform_source, SourceType};
use crate::shell::verbosity::Verbosity;
Expand Down Expand Up @@ -139,6 +140,7 @@ fn main() -> ExitCode {
Some(FullCommands::Ready) => cmd_ready(&config),
Some(FullCommands::Update(args)) => cmd_update(&args),
Some(FullCommands::Cache(args)) => return cmd_cache(args, &config),
Some(FullCommands::Plugin) => cmd_plugin(),
Some(FullCommands::PrintConfig) => cmd_print_config(config.config()),
Some(FullCommands::PrintData) => cmd_print_data(config.data()),
Some(FullCommands::PrintCache) => cmd_print_home(config.cache()),
Expand Down Expand Up @@ -637,6 +639,48 @@ fn check_github_token() -> StdResult<(), EnvVarCheckError> {
})
}

fn cmd_plugin() {
use tokio::runtime::Runtime;
let tgt_dir = "./target/debug";
let entrypoint = pathbuf![tgt_dir, "dummy_rand_data"];
let plugin = Plugin {
name: "rand_data".to_owned(),
entrypoint: entrypoint.display().to_string(),
};
let plugin_executor = PluginExecutor::new(
/* max_spawn_attempts */ 3,
/* max_conn_attempts */ 5,
/* port_range */ 40000..u16::MAX,
/* backoff_interval_micros */ 1000,
/* jitter_percent */ 10,
)
.unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(async move {
println!("Started executor");
let mut core = match HcPluginCore::new(
plugin_executor,
vec![PluginWithConfig(plugin, serde_json::json!(null))],
)
.await
{
Ok(c) => c,
Err(e) => {
println!("{e}");
return;
}
};
match core.run().await {
Ok(_) => {
println!("HcCore run completed");
}
Err(e) => {
println!("HcCore run failed with '{e}'");
}
};
});
}

fn cmd_ready(config: &CliConfig) {
let ready = ReadyChecks {
hipcheck_version_check: check_hipcheck_version(),
Expand Down
5 changes: 4 additions & 1 deletion hipcheck/src/plugin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::Future;
use rand::Rng;
use std::collections::HashSet;
use std::ops::Range;
use std::process::Command;
use std::process::{Command, Stdio};
use tokio::time::{sleep_until, Duration, Instant};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -69,6 +69,9 @@ impl PluginExecutor {
// Spawn plugin process
let Ok(mut proc) = Command::new(&plugin.entrypoint)
.args(["--port", port_str.as_str()])
// @Temporary - directly forward stdout/stderr from plugin to shell
.stdout(std::io::stdout())
.stderr(std::io::stderr())
.spawn()
else {
spawn_attempts += 1;
Expand Down
29 changes: 26 additions & 3 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ mod manager;
mod parser;
mod types;

use crate::hipcheck::Query;
use crate::plugin::manager::*;
pub use crate::plugin::manager::*;
pub use crate::plugin::types::*;
use crate::Result;
use futures::future::join_all;
Expand Down Expand Up @@ -42,7 +41,7 @@ pub async fn initialize_plugins(
Ok(out)
}

struct HcPluginCore {
pub struct HcPluginCore {
executor: PluginExecutor,
plugins: HashMap<String, PluginTransport>,
}
Expand Down Expand Up @@ -79,4 +78,28 @@ impl HcPluginCore {
// Now we have a set of started and initialized plugins to interact with
Ok(HcPluginCore { executor, plugins })
}
// @Temporary
pub async fn run(&mut self) -> Result<()> {
let channel = self.plugins.get_mut("rand_data").unwrap();
match channel
.send(Query {
id: 1,
request: true,
publisher: "".to_owned(),
plugin: "".to_owned(),
query: "rand_data".to_owned(),
key: serde_json::json!(7),
output: serde_json::json!(null),
})
.await
{
Ok(q) => q,
Err(e) => {
println!("Failed: {e}");
}
};
let resp = channel.recv().await?;
println!("Plugin response: {resp:?}");
Ok(())
}
}
23 changes: 12 additions & 11 deletions hipcheck/src/plugin/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,16 @@ pub enum HcQueryResult {
Err(Error),
}

struct Query {
id: usize,
#[derive(Debug)]
pub struct Query {
pub id: usize,
// if false, response
request: bool,
publisher: String,
plugin: String,
query: String,
key: Value,
output: Value,
pub request: bool,
pub publisher: String,
pub plugin: String,
pub query: String,
pub key: Value,
pub output: Value,
}
impl TryFrom<PluginQuery> for Query {
type Error = Error;
Expand Down Expand Up @@ -283,14 +284,15 @@ impl PluginTransport {
pub fn name(&self) -> &str {
&self.ctx.plugin.name
}
async fn send(&mut self, query: Query) -> Result<()> {
pub async fn send(&mut self, query: Query) -> Result<()> {
let query: PluginQuery = query.try_into()?;
eprintln!("Sending query: {query:?}");
self.tx
.send(query)
.await
.map_err(|e| hc_error!("sending query failed: {}", e))
}
async fn recv(&mut self) -> Result<Option<Query>> {
pub async fn recv(&mut self) -> Result<Option<Query>> {
use QueryState::*;
let Some(mut raw) = self.rx.message().await? else {
// gRPC channel was closed
Expand All @@ -300,7 +302,6 @@ impl PluginTransport {
// As long as we expect successive chunks, keep receiving
if matches!(state, QueryReplyInProgress) {
while matches!(state, QueryReplyInProgress) {
println!("Retrieving next response");
let Some(next) = self.rx.message().await? else {
return Err(hc_error!(
"plugin gRPC channel closed while sending chunked message"
Expand Down
14 changes: 14 additions & 0 deletions plugins/dummy_rand_data/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "dummy_rand_data"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
clap = { version = "4.5.15", features = ["derive"] }
prost = "0.13.1"
rand = "0.8.5"
serde_json = "1.0.125"
tokio = { version = "1.39.2", features = ["rt"] }
tokio-stream = "0.1.15"
tonic = "0.12.1"
Loading

0 comments on commit 79414b1

Please sign in to comment.