Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: restructure server binaries #1941

Open
wants to merge 1 commit into
base: 01-24-fix_workflows_allow_op_ctx_to_do_all_the_things
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 74 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions Cargo.toml

Large diffs are not rendered by default.

30 changes: 17 additions & 13 deletions packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ impl Database for DatabaseFdbSqliteNats {
// Set up sqlite
let pulled_workflows = futures_util::stream::iter(partial_workflows)
.map(|partial| async move {
let pool = self.pools.sqlite(partial.workflow_id).await?;
let pool = self.pools.sqlite(db_name(partial.workflow_id)).await?;
sqlite::init(partial.workflow_id, &pool).await?;

// Fetch all events
Expand Down Expand Up @@ -1170,7 +1170,7 @@ impl Database for DatabaseFdbSqliteNats {
version: usize,
loop_location: Option<&Location>,
) -> WorkflowResult<Option<SignalData>> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;

let owned_filter = filter
.into_iter()
Expand Down Expand Up @@ -1840,7 +1840,7 @@ impl Database for DatabaseFdbSqliteNats {
body: &serde_json::value::RawValue,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

// Insert history event
self.query(|| async {
Expand Down Expand Up @@ -1901,7 +1901,7 @@ impl Database for DatabaseFdbSqliteNats {
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
if TAGGED_SIGNALS_ENABLED {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

// Insert history event
self.query(|| async {
Expand Down Expand Up @@ -1965,7 +1965,7 @@ impl Database for DatabaseFdbSqliteNats {
loop_location: Option<&Location>,
unique: bool,
) -> WorkflowResult<Uuid> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;

// Insert history event
self.query(|| async {
Expand Down Expand Up @@ -2127,7 +2127,7 @@ impl Database for DatabaseFdbSqliteNats {
res: Result<&serde_json::value::RawValue, &str>,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;
let input_hash = event_id.input_hash.to_be_bytes();

match res {
Expand Down Expand Up @@ -2233,7 +2233,7 @@ impl Database for DatabaseFdbSqliteNats {
body: &serde_json::value::RawValue,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2269,7 +2269,7 @@ impl Database for DatabaseFdbSqliteNats {
output: Option<&serde_json::value::RawValue>,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(workflow_id).await?;
let pool = self.pools.sqlite(db_name(workflow_id)).await?;

self.query(|| async {
// Attempt to use an existing connection
Expand Down Expand Up @@ -2443,7 +2443,7 @@ impl Database for DatabaseFdbSqliteNats {
deadline_ts: i64,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2474,7 +2474,7 @@ impl Database for DatabaseFdbSqliteNats {
location: &Location,
state: SleepState,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand All @@ -2501,7 +2501,7 @@ impl Database for DatabaseFdbSqliteNats {
version: usize,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2532,7 +2532,7 @@ impl Database for DatabaseFdbSqliteNats {
event_name: Option<&str>,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2563,7 +2563,7 @@ impl Database for DatabaseFdbSqliteNats {
version: usize,
loop_location: Option<&Location>,
) -> WorkflowResult<()> {
let pool = self.pools.sqlite(from_workflow_id).await?;
let pool = self.pools.sqlite(db_name(from_workflow_id)).await?;

self.query(|| async {
sql_execute!(
Expand Down Expand Up @@ -2594,3 +2594,7 @@ struct PartialWorkflow {
pub ray_id: Uuid,
pub input: Box<serde_json::value::RawValue>,
}

fn db_name(workflow_id: Uuid) -> String {
format!("{workflow_id}-internal")
}
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use crate::{
activity::Activity as ActivityTrait,
ctx::workflow::Loop,
ctx::*,
db,
db::{self, Database},
error::{WorkflowError, WorkflowResult},
executable::Executable,
history::removed::*,
Expand Down
1 change: 0 additions & 1 deletion packages/common/chirp-workflow/core/tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use chirp_workflow::db::Database;
use chirp_workflow::prelude::*;
use serde_json::json;
use uuid::Uuid;
Expand Down
4 changes: 2 additions & 2 deletions packages/common/migrate/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub async fn up(config: rivet_config::Config, services: &[SqlService]) -> Result
tracing::info!(sql_services = ?services.len(), "running sql migrations");

let server_config = config.server.as_ref().context("missing server")?;
let is_development = server_config.rivet.auth.access_kind
== rivet_config::config::rivet::AccessKind::Development;
// let is_development = server_config.rivet.auth.access_kind
// == rivet_config::config::rivet::AccessKind::Development;

let crdb = rivet_pools::db::crdb::setup(config.clone())
.await
Expand Down
9 changes: 4 additions & 5 deletions packages/common/pools/src/db/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use sqlx::{
Sqlite,
};
use tokio::sync::Mutex;
use uuid::Uuid;

use crate::Error;

Expand All @@ -16,7 +15,7 @@ pub type SqlitePool = sqlx::SqlitePool;
#[derive(Clone)]
pub struct SqlitePoolManager {
// TODO: Somehow remove old pools
pools: Arc<Mutex<HashMap<Uuid, SqlitePool>>>,
pools: Arc<Mutex<HashMap<String, SqlitePool>>>,
}

impl SqlitePoolManager {
Expand All @@ -27,10 +26,10 @@ impl SqlitePoolManager {
}

/// Get or creates an sqlite pool for the given key
pub async fn get(&self, key: Uuid) -> Result<SqlitePool, Error> {
pub async fn get(&self, key: &str) -> Result<SqlitePool, Error> {
let mut pools_guard = self.pools.lock().await;

let pool = if let Some(pool) = pools_guard.get(&key) {
let pool = if let Some(pool) = pools_guard.get(key) {
pool.clone()
} else {
// TODO: Hardcoded for testing
Expand Down Expand Up @@ -61,7 +60,7 @@ impl SqlitePoolManager {
// Run at the start of every connection
setup_pragma(&pool).await.map_err(Error::BuildSqlx)?;

pools_guard.insert(key, pool.clone());
pools_guard.insert(key.to_string(), pool.clone());

tracing::debug!(?key, "sqlite connected");

Expand Down
5 changes: 2 additions & 3 deletions packages/common/pools/src/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use global_error::{ensure_with, prelude::*, GlobalResult};
use rivet_config::Config;
use tokio_util::sync::{CancellationToken, DropGuard};
use uuid::Uuid;

use crate::{
db::sqlite::SqlitePoolManager, ClickHousePool, CrdbPool, Error, FdbPool, NatsPool, RedisPool,
Expand Down Expand Up @@ -168,8 +167,8 @@ impl Pools {
self.0.fdb.clone().ok_or(Error::MissingFdbPool)
}

pub async fn sqlite(&self, key: Uuid) -> Result<SqlitePool, Error> {
self.0.sqlite.get(key).await
pub async fn sqlite(&self, key: impl AsRef<str>) -> Result<SqlitePool, Error> {
self.0.sqlite.get(key.as_ref()).await
}

#[tracing::instrument(skip_all)]
Expand Down
Loading
Loading