Skip to content

Commit

Permalink
Merge pull request superfly#260 from superfly/corro-subs
Browse files Browse the repository at this point in the history
Adds an info and list command for getting more details on subscriptions
  • Loading branch information
somtochiama authored Sep 20, 2024
2 parents 30b65ba + 91b82ef commit 6fa8d9e
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/corro-admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ tokio-serde = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
tripwire = { path = "../tripwire" }
rangemap = { workspace = true }
rangemap = { workspace = true }
uuid = { workspace = true }
67 changes: 67 additions & 0 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::LengthDelimitedCodec;
use tracing::{debug, error, info, warn};
use tripwire::Tripwire;
use uuid::Uuid;

#[derive(Debug, thiserror::Error)]
pub enum AdminError {
Expand Down Expand Up @@ -96,6 +97,7 @@ pub enum Command {
Locks { top: usize },
Cluster(ClusterCommand),
Actor(ActorCommand),
Subs(SubsCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -104,6 +106,15 @@ pub enum SyncCommand {
ReconcileGaps,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SubsCommand {
Info {
hash: Option<String>,
id: Option<Uuid>,
},
List,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClusterCommand {
Rejoin,
Expand Down Expand Up @@ -521,6 +532,62 @@ async fn handle_conn(

send_success(&mut stream).await;
}
Command::Subs(SubsCommand::List) => {
let handles = agent.subs_manager().get_handles();
let uuid_to_hash = handles
.iter()
.map(|(k, v)| {
json!({
"id": k,
"hash": v.hash(),
"sql": v.sql().lines().map(|c| c.trim()).collect::<Vec<_>>().join(" "),
})
})
.collect::<Vec<_>>();

send(&mut stream, Response::Json(serde_json::json!(uuid_to_hash))).await;
send_success(&mut stream).await;
}
Command::Subs(SubsCommand::Info { hash, id }) => {
let matcher_handle = match (hash, id) {
(Some(hash), _) => agent.subs_manager().get_by_hash(&hash),
(None, Some(id)) => agent.subs_manager().get(&id),
(None, None) => {
send_error(&mut stream, "specify hash or id for subscription").await;
continue;
}
};
match matcher_handle {
Some(matcher) => {
let statements = matcher
.cached_stmts()
.iter()
.map(|(table, stmts)| {
json!({
table: stmts.new_query(),
})
})
.collect::<Vec<_>>();
send(
&mut stream,
Response::Json(serde_json::json!({
"id": matcher.id(),
"hash": matcher.hash(),
"path": matcher.subs_path(),
"last_change_id": matcher.last_change_id_sent(),
"original_query": matcher.sql().lines().map(|c| c.trim()).collect::<Vec<_>>().join(" "),
"statements": statements,
})),
)
.await;
send_success(&mut stream).await;
}
None => {
send_error(&mut stream, "unknown subscription hash or id").await;
continue;
}
};
}
},
Ok(None) => {
debug!("done with admin conn");
Expand Down
38 changes: 38 additions & 0 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ impl SubsManager {
self.0.read().get_by_query(sql)
}

pub fn get_by_hash(&self, hash: &str) -> Option<MatcherHandle> {
self.0.read().get_by_hash(hash)
}

pub fn get_handles(&self) -> BTreeMap<Uuid, MatcherHandle> {
self.0.read().handles.clone()
}

pub fn get_or_insert(
&self,
sql: &str,
Expand Down Expand Up @@ -328,6 +336,13 @@ impl InnerSubsManager {
.and_then(|id| self.handles.get(id).cloned())
}

pub fn get_by_hash(&self, hash: &str) -> Option<MatcherHandle> {
self.handles
.values()
.find(|x| x.inner.hash == hash)
.cloned()
}

fn remove(&mut self, id: &Uuid) -> Option<MatcherHandle> {
let handle = self.handles.remove(id)?;
self.queries.remove(&handle.inner.sql);
Expand Down Expand Up @@ -364,6 +379,9 @@ struct InnerMatcherHandle {
cancel: CancellationToken,
changes_tx: mpsc::Sender<(MatchCandidates, CrsqlDbVersion)>,
last_change_rx: watch::Receiver<ChangeId>,
// some state from the matcher so we can take a look later
subs_path: String,
cached_statements: HashMap<String, MatcherStmt>,
}

type MatchCandidates = IndexMap<TableName, IndexSet<Vec<u8>>>;
Expand All @@ -373,6 +391,10 @@ impl MatcherHandle {
self.inner.id
}

pub fn sql(&self) -> &String {
&self.inner.sql
}

pub fn hash(&self) -> &str {
&self.inner.hash
}
Expand All @@ -385,6 +407,14 @@ impl MatcherHandle {
&self.inner.col_names
}

pub fn subs_path(&self) -> &String {
&self.inner.subs_path
}

pub fn cached_stmts(&self) -> &HashMap<String, MatcherStmt> {
&self.inner.cached_statements
}

pub async fn cleanup(self) {
self.inner.cancel.cancel();
info!(sub_id = %self.inner.id, "Canceled subscription");
Expand Down Expand Up @@ -593,6 +623,12 @@ pub struct MatcherStmt {
temp_query: String,
}

impl MatcherStmt {
pub fn new_query(self: &Self) -> &String {
return &self.new_query;
}
}

const CHANGE_ID_COL: &str = "id";
const CHANGE_TYPE_COL: &str = "type";

Expand Down Expand Up @@ -819,6 +855,8 @@ impl Matcher {
cancel: cancel.clone(),
last_change_rx,
changes_tx,
cached_statements: statements.clone(),
subs_path: sub_path.to_string(),
}),
state: state.clone(),
};
Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ impl From<SyncStateV1> for SyncMessage {
}
}


// generates a `SyncMessage` to tell another node what versions we're missing
#[tracing::instrument(skip_all, level = "debug")]
pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncStateV1 {
Expand Down
30 changes: 30 additions & 0 deletions crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,19 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> {
info!("Exited with code: {:?}", exit.code());
std::process::exit(exit.code().unwrap_or(1));
}
Command::Subs(SubsCommand::Info { hash, id }) => {
let mut conn = AdminConn::connect(cli.admin_path()).await?;
conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::Info {
hash: hash.clone(),
id: *id,
}))
.await?;
}
Command::Subs(SubsCommand::List) => {
let mut conn = AdminConn::connect(cli.admin_path()).await?;
conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::List))
.await?;
}
}

Ok(())
Expand Down Expand Up @@ -686,6 +699,10 @@ enum Command {
/// DB-related commands
#[command(subcommand)]
Db(DbCommand),

/// Subscription related commands
#[command(subcommand)]
Subs(SubsCommand),
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -769,3 +786,16 @@ enum DbCommand {
/// Acquires the lock on the DB
Lock { cmd: String },
}

#[derive(Subcommand)]
enum SubsCommand {
/// List all subscriptions on a node
List,
/// Get information on a subscription
Info {
#[arg(long)]
hash: Option<String>,
#[arg(long)]
id: Option<Uuid>,
},
}

0 comments on commit 6fa8d9e

Please sign in to comment.