Skip to content

Commit

Permalink
graphman: create GraphQL API to execute commands (graphprotocol#5554)
Browse files Browse the repository at this point in the history
* graphman: define a store for execution data

* store: implement graphman store

* graphman: extract & refactor deployment info, pause, resume commands

* graphman: create graphql server to execute commands

* node: run graphman graphql server on startup

* graphman: use refactored commands in the cli

* graphman: document graphql api usage

* graphman: accept a list of deployments on restart command

* graphman: make docs clearer

* store: rename migration to make it latest
  • Loading branch information
isum authored Oct 1, 2024
1 parent 990ef4d commit 2509212
Show file tree
Hide file tree
Showing 75 changed files with 3,558 additions and 155 deletions.
341 changes: 337 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 27 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
resolver = "2"
members = [
"core",
"core/graphman",
"core/graphman_store",
"chain/*",
"graphql",
"node",
Expand All @@ -24,25 +26,46 @@ repository = "https://github.com/graphprotocol/graph-node"
license = "MIT OR Apache-2.0"

[workspace.dependencies]
diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono"] }
anyhow = "1.0"
async-graphql = { version = "7.0.6", features = ["chrono", "uuid"] }
async-graphql-axum = "7.0.6"
axum = "0.7.5"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "env"] }
diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
diesel_derives = "2.1.4"
diesel-dynamic-schema = "0.2.1"
diesel_derives = "2.1.4"
diesel_migrations = "2.1.0"
graph = { path = "./graph" }
graph-core = { path = "./core" }
graph-store-postgres = { path = "./store/postgres" }
graphman-server = { path = "./server/graphman" }
graphman = { path = "./core/graphman" }
graphman-store = { path = "./core/graphman_store" }
itertools = "0.13.0"
lazy_static = "1.5.0"
prost = "0.12.6"
prost-types = "0.12.6"
regex = "1.5.4"
reqwest = "0.12.5"
serde = { version = "1.0.126", features = ["rc"] }
serde_derive = "1.0.125"
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
serde_regex = "1.1.0"
serde_yaml = "0.9.21"
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
sqlparser = "0.46.0"
strum = { version = "0.26", features = ["derive"] }
syn = { version = "2.0.66", features = ["full"] }
test-store = { path = "./store/test-store" }
thiserror = "1.0.25"
tokio = { version = "1.38.0", features = ["full"] }
tonic = { version = "0.11.0", features = ["tls-roots", "gzip"] }
tonic-build = { version = "0.11.0", features = ["prost"] }
wasmtime = "15.0.1"
tower-http = { version = "0.5.2", features = ["cors"] }
wasmparser = "0.118.1"
clap = { version = "4.5.4", features = ["derive", "env"] }
wasmtime = "15.0.1"

# Incremental compilation on Rust 1.58 causes an ICE on build. As soon as graph node builds again, these can be removed.
[profile.test]
Expand Down
14 changes: 14 additions & 0 deletions core/graphman/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "graphman"
version.workspace = true
edition.workspace = true

[dependencies]
anyhow = { workspace = true }
diesel = { workspace = true }
graph = { workspace = true }
graph-store-postgres = { workspace = true }
graphman-store = { workspace = true }
itertools = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
81 changes: 81 additions & 0 deletions core/graphman/src/commands/deployment/info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use graph::blockchain::BlockPtr;
use graph::components::store::BlockNumber;
use graph::components::store::DeploymentId;
use graph::components::store::StatusStore;
use graph::data::subgraph::schema::SubgraphHealth;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::Store;
use itertools::Itertools;

use crate::deployment::Deployment;
use crate::deployment::DeploymentSelector;
use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;

#[derive(Clone, Debug)]
pub struct DeploymentStatus {
pub is_paused: Option<bool>,
pub is_synced: bool,
pub health: SubgraphHealth,
pub earliest_block_number: BlockNumber,
pub latest_block: Option<BlockPtr>,
pub chain_head_block: Option<BlockPtr>,
}

pub fn load_deployments(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
version: &DeploymentVersionSelector,
) -> Result<Vec<Deployment>, GraphmanError> {
let mut primary_conn = primary_pool.get()?;

crate::deployment::load_deployments(&mut primary_conn, &deployment, &version)
}

pub fn load_deployment_statuses(
store: Arc<Store>,
deployments: &[Deployment],
) -> Result<HashMap<i32, DeploymentStatus>, GraphmanError> {
use graph::data::subgraph::status::Filter;

let deployment_ids = deployments
.iter()
.map(|deployment| DeploymentId::new(deployment.id))
.collect_vec();

let deployment_statuses = store
.status(Filter::DeploymentIds(deployment_ids))?
.into_iter()
.map(|status| {
let id = status.id.0;

let chain = status
.chains
.get(0)
.ok_or_else(|| {
GraphmanError::Store(anyhow!(
"deployment status has no chains on deployment '{id}'"
))
})?
.to_owned();

Ok((
id,
DeploymentStatus {
is_paused: status.paused,
is_synced: status.synced,
health: status.health,
earliest_block_number: chain.earliest_block_number.to_owned(),
latest_block: chain.latest_block.map(|x| x.to_ptr()),
chain_head_block: chain.chain_head_block.map(|x| x.to_ptr()),
},
))
})
.collect::<Result<_, GraphmanError>>()?;

Ok(deployment_statuses)
}
3 changes: 3 additions & 0 deletions core/graphman/src/commands/deployment/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod info;
pub mod pause;
pub mod resume;
84 changes: 84 additions & 0 deletions core/graphman/src/commands/deployment/pause.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::sync::Arc;

use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::components::store::StoreEvent;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use thiserror::Error;

use crate::deployment::DeploymentSelector;
use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;

pub struct ActiveDeployment {
locator: DeploymentLocator,
site: Site,
}

#[derive(Debug, Error)]
pub enum PauseDeploymentError {
#[error("deployment '{0}' is already paused")]
AlreadyPaused(String),

#[error(transparent)]
Common(#[from] GraphmanError),
}

impl ActiveDeployment {
pub fn locator(&self) -> &DeploymentLocator {
&self.locator
}
}

pub fn load_active_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<ActiveDeployment, PauseDeploymentError> {
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment(
&mut primary_conn,
deployment,
&DeploymentVersionSelector::All,
)?
.locator();

let mut catalog_conn = catalog::Connection::new(primary_conn);

let site = catalog_conn
.locate_site(locator.clone())
.map_err(GraphmanError::from)?
.ok_or_else(|| {
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
})?;

let (_, is_paused) = catalog_conn
.assignment_status(&site)
.map_err(GraphmanError::from)?
.ok_or_else(|| {
GraphmanError::Store(anyhow!("assignment status not found for '{locator}'"))
})?;

if is_paused {
return Err(PauseDeploymentError::AlreadyPaused(locator.to_string()));
}

Ok(ActiveDeployment { locator, site })
}

pub fn pause_active_deployment(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
active_deployment: ActiveDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get()?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn.pause_subgraph(&active_deployment.site)?;
catalog_conn.send_store_event(&notification_sender, &StoreEvent::new(changes))?;

Ok(())
}
84 changes: 84 additions & 0 deletions core/graphman/src/commands/deployment/resume.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::sync::Arc;

use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::prelude::StoreEvent;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use thiserror::Error;

use crate::deployment::DeploymentSelector;
use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;

pub struct PausedDeployment {
locator: DeploymentLocator,
site: Site,
}

#[derive(Debug, Error)]
pub enum ResumeDeploymentError {
#[error("deployment '{0}' is not paused")]
NotPaused(String),

#[error(transparent)]
Common(#[from] GraphmanError),
}

impl PausedDeployment {
pub fn locator(&self) -> &DeploymentLocator {
&self.locator
}
}

pub fn load_paused_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<PausedDeployment, ResumeDeploymentError> {
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment(
&mut primary_conn,
deployment,
&DeploymentVersionSelector::All,
)?
.locator();

let mut catalog_conn = catalog::Connection::new(primary_conn);

let site = catalog_conn
.locate_site(locator.clone())
.map_err(GraphmanError::from)?
.ok_or_else(|| {
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
})?;

let (_, is_paused) = catalog_conn
.assignment_status(&site)
.map_err(GraphmanError::from)?
.ok_or_else(|| {
GraphmanError::Store(anyhow!("assignment status not found for '{locator}'"))
})?;

if !is_paused {
return Err(ResumeDeploymentError::NotPaused(locator.to_string()));
}

Ok(PausedDeployment { locator, site })
}

pub fn resume_paused_deployment(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
paused_deployment: PausedDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get()?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn.resume_subgraph(&paused_deployment.site)?;
catalog_conn.send_store_event(&notification_sender, &StoreEvent::new(changes))?;

Ok(())
}
1 change: 1 addition & 0 deletions core/graphman/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod deployment;
Loading

0 comments on commit 2509212

Please sign in to comment.