Skip to content

Commit

Permalink
chore: migrate to diesel 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
PhotonQuantum committed Oct 11, 2022
1 parent 484afe7 commit ee9e519
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 54 deletions.
55 changes: 40 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions middlewares/delay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2021"
[dependencies]
chrono = "0.4"
color-eyre = "0.6"
diesel = { version = "1.4", features = ["chrono", "sqlite", "r2d2"] }
diesel_migrations = "1.4"
diesel = { version = "2.0", features = ["chrono", "sqlite", "r2d2"] }
diesel_migrations = "2.0"
eyre = "0.6"
figment = { version = "0.10", features = ["env"] }
futures-util = { version = "0.3" }
Expand Down
33 changes: 14 additions & 19 deletions middlewares/delay/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#![allow(clippy::extra_unused_lifetimes)]

use std::{fmt::Debug, io::Write};
use std::fmt::Debug;

use chrono::{NaiveDateTime, Utc};
use diesel::{
backend::Backend,
backend::{Backend, RawValue},
deserialize::FromSql,
serialize::{Output, ToSql},
serialize::{IsNull, Output, ToSql},
sql_types,
sqlite::Sqlite,
AsExpression,
FromSqlRow,
Insertable,
Expand Down Expand Up @@ -51,21 +52,19 @@ where
DB: Backend,
String: FromSql<sql_types::Text, DB>,
{
fn from_sql(bytes: Option<&DB::RawValue>) -> diesel::deserialize::Result<Self> {
fn from_sql(bytes: RawValue<'_, DB>) -> diesel::deserialize::Result<Self> {
let s = String::from_sql(bytes)?;
Ok(Self(serde_json::from_str(&s)?))
}
}

impl<T, DB> ToSql<sql_types::Text, DB> for Json<T>
impl<T> ToSql<sql_types::Text, Sqlite> for Json<T>
where
T: Serialize + Debug,
DB: Backend,
String: ToSql<sql_types::Text, DB>,
{
fn to_sql<W: Write>(&self, out: &mut Output<W, DB>) -> diesel::serialize::Result {
let s = serde_json::to_string(&self.0)?;
s.to_sql(out)
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> diesel::serialize::Result {
out.set_value(serde_json::to_string(&self.0)?);
Ok(IsNull::No)
}
}

Expand All @@ -78,19 +77,15 @@ where
DB: Backend,
String: FromSql<sql_types::Text, DB>,
{
fn from_sql(bytes: Option<&DB::RawValue>) -> diesel::deserialize::Result<Self> {
fn from_sql(bytes: RawValue<'_, DB>) -> diesel::deserialize::Result<Self> {
let s = String::from_sql(bytes)?;
Ok(Self(s.parse().unwrap()))
}
}

impl<DB> ToSql<sql_types::Text, DB> for MiddlewaresWrapper
where
DB: Backend,
String: ToSql<sql_types::Text, DB>,
{
fn to_sql<W: Write>(&self, out: &mut Output<W, DB>) -> diesel::serialize::Result {
let s = self.0.to_string();
s.to_sql(out)
impl ToSql<sql_types::Text, Sqlite> for MiddlewaresWrapper {
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> diesel::serialize::Result {
out.set_value(self.0.to_string());
Ok(IsNull::No)
}
}
11 changes: 7 additions & 4 deletions middlewares/delay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ extern crate diesel;
#[macro_use]
extern crate diesel_migrations;

use std::sync::Arc;
use std::{ops::DerefMut, sync::Arc};

use chrono::NaiveDateTime;
use diesel::{
r2d2::{ConnectionManager, Pool},
SqliteConnection,
};
use eyre::{Context, ContextCompat, Result};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use eyre::{eyre, Context, ContextCompat, Result};
use futures_util::StreamExt;
use sg_core::{
models::Event,
Expand All @@ -33,7 +34,7 @@ mod db;
mod scheduler;
mod schema;

embed_migrations!();
const MIGRATIONS: EmbeddedMigrations = embed_migrations!();

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -50,7 +51,9 @@ async fn main() -> Result<()> {
))
.wrap_err("Failed to connect to SQLite database")?;

embedded_migrations::run(&pool.get()?).wrap_err("Failed to run migration script")?;
if let Err(e) = pool.get()?.deref_mut().run_pending_migrations(MIGRATIONS) {
return Err(eyre!(e).wrap_err("Failed to run migrations"));
}

let mq = RabbitMQ::new(&config.amqp_url, &config.amqp_exchange)
.await
Expand Down
33 changes: 19 additions & 14 deletions middlewares/delay/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ impl Scheduler {
}

if persist {
let conn = self.pool.get().unwrap();
let mut conn = self.pool.get().unwrap();
let r = diesel::insert_into(delayed_messages::table())
.values(&msg)
.execute(&conn);
.execute(&mut conn);
match r {
Ok(count) if count == 0 => {
error!(
Expand All @@ -116,9 +116,9 @@ impl Scheduler {

pub fn remove_task(&self, task_id: i64) {
if self.delayed_messages.lock().remove(&task_id).is_some() {
let conn = self.pool.get().expect("No db conn available");
let mut conn = self.pool.get().expect("No db conn available");
if let Err(error) =
diesel::delete(delayed_messages.filter(id.eq(task_id))).execute(&conn)
diesel::delete(delayed_messages.filter(id.eq(task_id))).execute(&mut conn)
{
error!(?error, "Failed to remove task from database");
}
Expand All @@ -132,8 +132,8 @@ impl Scheduler {
}

pub fn load(self: &Arc<Self>) {
let conn = self.pool.get().expect("No db conn available");
let results = delayed_messages.load::<DelayedMessage>(&conn);
let mut conn = self.pool.get().expect("No db conn available");
let results = delayed_messages.load::<DelayedMessage>(&mut conn);
match results {
Ok(messages) => {
for message in messages {
Expand All @@ -147,10 +147,10 @@ impl Scheduler {
}

pub fn cleanup(&self) {
let conn = self.pool.get().expect("No db conn available");
let mut conn = self.pool.get().expect("No db conn available");
let r = diesel::delete(delayed_messages::table())
.filter(deliver_at.lt(now))
.execute(&conn);
.execute(&mut conn);
match r {
Ok(count) => {
info!(count = %count, "Removed misfired delayed messages from database");
Expand All @@ -164,22 +164,23 @@ impl Scheduler {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{ops::DerefMut, sync::Arc};

use chrono::Utc;
use diesel::{
r2d2::{ConnectionManager, Pool},
RunQueryDsl,
SqliteConnection,
};
use diesel_migrations::MigrationHarness;
use sg_core::{
models::Event,
mq::{mock::MockMQ, Middlewares},
};
use tokio::time::sleep;
use uuid::Uuid;

use crate::{delayed_messages, embedded_migrations, DelayedMessage, Scheduler};
use crate::{delayed_messages, DelayedMessage, Scheduler, MIGRATIONS};

#[derive(Debug, Eq, PartialEq)]
enum TestAction {
Expand Down Expand Up @@ -209,8 +210,12 @@ mod tests {
let db_path = temp_file.path().to_string_lossy().to_string();

// Prepare the db.
let pool = Pool::new(ConnectionManager::new(&db_path)).unwrap();
embedded_migrations::run(&pool.get().unwrap()).unwrap();
let pool = Pool::new(ConnectionManager::<SqliteConnection>::new(&db_path)).unwrap();
pool.get()
.unwrap()
.deref_mut()
.run_pending_migrations(MIGRATIONS)
.unwrap();

let mq = MockMQ::default();

Expand Down Expand Up @@ -274,8 +279,8 @@ mod tests {

// And we make sure the entry in db is removed.
let pool = Pool::new(ConnectionManager::<SqliteConnection>::new(&db_path)).unwrap();
let conn = pool.get().expect("No db conn available");
let results = delayed_messages.load::<DelayedMessage>(&conn).unwrap();
let mut conn = pool.get().expect("No db conn available");
let results = delayed_messages.load::<DelayedMessage>(&mut conn).unwrap();
assert!(
results.is_empty(),
"There should be no delayed messages in db"
Expand Down

0 comments on commit ee9e519

Please sign in to comment.