Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SQLNESS SLEEP <DURATION_STRING> #67

Merged
merged 10 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sqlness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ readme = { workspace = true }
[dependencies]
async-trait = "0.1"
derive_builder = "0.11"
futures = "0.3.26"
hierarchical_hash_wheel_timer = "1.2.0"
minijinja = "1"
mysql = { version = "23.0.1", optional = true }
once_cell = "1.19.0"
postgres = { version = "0.19.7", optional = true }
prettydiff = { version = "0.6.2", default_features = false }
regex = "1.7.1"
serde_json = "1"
thiserror = "1.0"
toml = "0.5"
uuid = "1.8.0"
walkdir = "2.3"

[dev-dependencies]
Expand Down
14 changes: 8 additions & 6 deletions sqlness/src/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl Query {
where
W: Write,
{
let context = self.before_execute_intercept();
let context = self.before_execute_intercept().await;
for comment in &self.comment_lines {
writer.write_all(comment.as_bytes())?;
writer.write_all("\n".as_bytes())?;
Expand All @@ -160,7 +160,7 @@ impl Query {
.query(context.clone(), format!("{sql};"))
.await
.to_string();
self.after_execute_intercept(&mut result);
self.after_execute_intercept(&mut result).await;
self.write_result(writer, result)?;
}
}
Expand All @@ -172,19 +172,21 @@ impl Query {
///
/// Interceptors may change either the query to be displayed or the query to be executed,
/// so we need to return the query to caller.
fn before_execute_intercept(&mut self) -> QueryContext {
async fn before_execute_intercept(&mut self) -> QueryContext {
let mut context = QueryContext::default();

for interceptor in &self.interceptors {
interceptor.before_execute(&mut self.execute_query, &mut context);
interceptor
.before_execute(&mut self.execute_query, &mut context)
.await;
}

context
}

fn after_execute_intercept(&mut self, result: &mut String) {
async fn after_execute_intercept(&mut self, result: &mut String) {
for interceptor in &self.interceptors {
interceptor.after_execute(result);
interceptor.after_execute(result).await;
}
}

Expand Down
12 changes: 9 additions & 3 deletions sqlness/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ use crate::{
pub mod arg;
pub mod env;
pub mod replace;
pub mod sleep;
pub mod sort_result;
pub mod template;

pub type InterceptorRef = Box<dyn Interceptor>;
pub type InterceptorRef = Box<dyn Interceptor + Send + Sync>;

#[async_trait::async_trait]
pub trait Interceptor {
#[allow(unused_variables)]
fn before_execute(&self, execute_query: &mut Vec<String>, context: &mut QueryContext) {}
async fn before_execute(&self, execute_query: &mut Vec<String>, context: &mut QueryContext) {}

#[allow(unused_variables)]
fn after_execute(&self, result: &mut String) {}
async fn after_execute(&self, result: &mut String) {}
discord9 marked this conversation as resolved.
Show resolved Hide resolved
}

pub type InterceptorFactoryRef = Arc<dyn InterceptorFactory>;
Expand Down Expand Up @@ -93,6 +95,10 @@ fn builtin_interceptors() -> HashMap<String, InterceptorFactoryRef> {
template::PREFIX.to_string(),
Arc::new(TemplateInterceptorFactory {}) as _,
),
(
sleep::PREFIX.to_string(),
Arc::new(sleep::SleepInterceptorFactory {}) as _,
),
]
.into_iter()
.map(|(prefix, factory)| (prefix.to_string(), factory))
Expand Down
3 changes: 2 additions & 1 deletion sqlness/src/interceptor/arg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ pub struct ArgInterceptor {
args: Vec<(String, String)>,
}

#[async_trait::async_trait]
impl Interceptor for ArgInterceptor {
fn before_execute(&self, _: &mut Vec<String>, context: &mut QueryContext) {
async fn before_execute(&self, _: &mut Vec<String>, context: &mut QueryContext) {
for (key, value) in &self.args {
context.context.insert(key.to_string(), value.to_string());
}
Expand Down
3 changes: 2 additions & 1 deletion sqlness/src/interceptor/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ pub struct EnvInterceptor {
data: HashMap<String, String>,
}

#[async_trait::async_trait]
impl Interceptor for EnvInterceptor {
fn before_execute(&self, execute_query: &mut Vec<String>, _: &mut QueryContext) {
async fn before_execute(&self, execute_query: &mut Vec<String>, _: &mut QueryContext) {
for line in execute_query {
for (key, value) in &self.data {
let rendered = line.replace(key, value);
Expand Down
15 changes: 8 additions & 7 deletions sqlness/src/interceptor/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ pub struct ReplaceInterceptor {
replacement: String,
}

#[async_trait::async_trait]
impl Interceptor for ReplaceInterceptor {
fn after_execute(&self, result: &mut String) {
async fn after_execute(&self, result: &mut String) {
let re = Regex::new(&self.pattern).unwrap();
let replaced = re.replace_all(result, &self.replacement);
*result = replaced.to_string();
Expand Down Expand Up @@ -84,21 +85,21 @@ mod tests {
assert!(interceptor.is_err());
}

#[test]
fn replace_without_replacement() {
#[tokio::test]
async fn replace_without_replacement() {
let interceptor = ReplaceInterceptorFactory {}.try_new("0").unwrap();

let mut exec_result = "000010101".to_string();
interceptor.after_execute(&mut exec_result);
interceptor.after_execute(&mut exec_result).await;
assert_eq!(exec_result, "111".to_string());
}

#[test]
fn simple_replace() {
#[tokio::test]
async fn simple_replace() {
let interceptor = ReplaceInterceptorFactory {}.try_new("00 2").unwrap();

let mut exec_result = "0000010101".to_string();
interceptor.after_execute(&mut exec_result);
interceptor.after_execute(&mut exec_result).await;
assert_eq!(exec_result, "22010101".to_string());
}
}
109 changes: 109 additions & 0 deletions sqlness/src/interceptor/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 CeresDB Project Authors. Licensed under Apache-2.0.

use std::pin::Pin;
use std::task::Context;
use std::time::{Duration, Instant};

use futures::Future;
use hierarchical_hash_wheel_timer::thread_timer::*;
use hierarchical_hash_wheel_timer::*;
use once_cell::sync::Lazy;
use uuid::Uuid;

use crate::error::Result;
use crate::interceptor::{Interceptor, InterceptorFactory, InterceptorRef};
use crate::SqlnessError;

pub struct TemplateInterceptorFactory;

/// This create a thread dedicated to the timer wheel.
pub static TIMER_WHEEL: Lazy<
TimerWithThread<Uuid, OneShotClosureState<Uuid>, PeriodicClosureState<Uuid>>,
> = Lazy::new(TimerWithThread::for_uuid_closures);

pub const PREFIX: &str = "SLEEP";

/// Sleep for given milliseconds before executing the query.
///
/// # Example
/// ``` sql
/// -- SQLNESS SLEEP 1500
/// SELECT 1;
/// ```
///
/// Note that this implementation is not accurate and may be affected by the system load.
/// It is guaranteed that the sleep time is at least the given milliseconds, but the lag may be
/// longer.
#[derive(Debug)]
pub struct SleepInterceptor {
milliseconds: u64,
discord9 marked this conversation as resolved.
Show resolved Hide resolved
}

#[async_trait::async_trait]
impl Interceptor for SleepInterceptor {
async fn before_execute(
&self,
_execute_query: &mut Vec<String>,
_context: &mut crate::case::QueryContext,
) {
// impl a cross-runtime sleep
struct Sleep {
now: Instant,
milliseconds: u64,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let elapsed = self.now.elapsed().as_millis() as u64;
let remaining = self.milliseconds.saturating_sub(elapsed);
if elapsed < self.milliseconds {
let mut timer = TIMER_WHEEL.timer_ref();
let id = Uuid::new_v4();
// wait for the remaining time
let delay = Duration::from_millis(remaining);
let waker = cx.waker().clone();
timer.schedule_action_once(id, delay, move |_timer_id| {
waker.wake();
});
discord9 marked this conversation as resolved.
Show resolved Hide resolved
std::task::Poll::Pending
} else {
std::task::Poll::Ready(())
}
}
}
Sleep {
now: Instant::now(),
milliseconds: self.milliseconds,
}
.await;
}
}

pub struct SleepInterceptorFactory;

impl InterceptorFactory for SleepInterceptorFactory {
fn try_new(&self, ctx: &str) -> Result<InterceptorRef> {
let milliseconds = ctx
.parse::<u64>()
.map_err(|e| SqlnessError::InvalidContext {
prefix: PREFIX.to_string(),
msg: format!("Failed to parse milliseconds: {}", e),
})?;
Ok(Box::new(SleepInterceptor { milliseconds }))
}
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn wait_1500ms() {
let input = "1500";
let interceptor = SleepInterceptorFactory{}.try_new(input).unwrap();
let now = Instant::now();
interceptor.before_execute(&mut vec![], &mut crate::QueryContext::default()).await;
let elasped = now.elapsed().as_millis() as u64;
assert!(elasped >= 1500);
}
}
discord9 marked this conversation as resolved.
Show resolved Hide resolved
21 changes: 11 additions & 10 deletions sqlness/src/interceptor/sort_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ pub struct SortResultInterceptor {
ignore_tail: usize,
}

#[async_trait::async_trait]
impl Interceptor for SortResultInterceptor {
fn after_execute(&self, result: &mut String) {
async fn after_execute(&self, result: &mut String) {
let mut lines = result.lines().collect::<VecDeque<_>>();
let mut head = Vec::with_capacity(self.ignore_head);
let mut tail = Vec::with_capacity(self.ignore_tail);
Expand Down Expand Up @@ -113,8 +114,8 @@ mod tests {
assert!(interceptor.is_err());
}

#[test]
fn sort_result_full() {
#[tokio::test]
async fn sort_result_full() {
let interceptor = SortResultInterceptorFactory.try_new("").unwrap();

let cases = [
Expand Down Expand Up @@ -145,13 +146,13 @@ mod tests {
];

for (mut input, expected) in cases {
interceptor.after_execute(&mut input);
interceptor.after_execute(&mut input).await;
assert_eq!(input, expected);
}
}

#[test]
fn ignore_head_exceeds_length() {
#[tokio::test]
async fn ignore_head_exceeds_length() {
let interceptor = SortResultInterceptorFactory.try_new("10000").unwrap();

let mut exec_result = String::from(
Expand All @@ -160,12 +161,12 @@ mod tests {
\n1",
);
let expected = exec_result.clone();
interceptor.after_execute(&mut exec_result);
interceptor.after_execute(&mut exec_result).await;
assert_eq!(exec_result, expected);
}

#[test]
fn ignore_tail_exceeds_length() {
#[tokio::test]
async fn ignore_tail_exceeds_length() {
let interceptor = SortResultInterceptorFactory.try_new("0 10000").unwrap();

let mut exec_result = String::from(
Expand All @@ -174,7 +175,7 @@ mod tests {
\n1",
);
let expected = exec_result.clone();
interceptor.after_execute(&mut exec_result);
interceptor.after_execute(&mut exec_result).await;
assert_eq!(exec_result, expected);
}
}
Loading
Loading