Skip to content

Commit

Permalink
fix: correctly decode mysql timestamp binary value (#2946)
Browse files Browse the repository at this point in the history
* fix: correctly decode mysql timestamp binary value

* fix: ut

* fix: resolve PR comments
  • Loading branch information
MichaelScofield authored Dec 19, 2023
1 parent bbcac3a commit c7b3677
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 19 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ lazy_static.workspace = true
mime_guess = "2.0"
once_cell.workspace = true
openmetrics-parser = "0.4"
opensrv-mysql = "0.6"
# TODO(LFC): Wait for https://github.com/datafuselabs/opensrv/pull/60
opensrv-mysql = { git = "https://github.com/MichaelScofield/opensrv.git", rev = "1676c1d" }
opentelemetry-proto.workspace = true
parking_lot = "0.12"
pgwire = "0.17"
Expand Down
6 changes: 5 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ pub enum Error {
error: FromUtf8Error,
location: Location,
},

#[snafu(display("Failed to convert Mysql value, error: {}", err_msg))]
MysqlValueConversion { err_msg: String, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -460,7 +463,8 @@ impl ErrorExt for Error {
| PreparedStmtTypeMismatch { .. }
| TimePrecision { .. }
| UrlDecode { .. }
| IncompatibleSchema { .. } => StatusCode::InvalidArguments,
| IncompatibleSchema { .. }
| MysqlValueConversion { .. } => StatusCode::InvalidArguments,

InfluxdbLinesWrite { source, .. }
| PromSeriesWrite { source, .. }
Expand Down
28 changes: 18 additions & 10 deletions src/servers/src/mysql/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime};
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::{error, logging, tracing, warn};
use common_telemetry::{debug, error, logging, tracing, warn};
use datatypes::prelude::ConcreteDataType;
use itertools::Itertools;
use opensrv_mysql::{
Expand Down Expand Up @@ -291,17 +291,16 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let plan = match replace_params_with_values(&plan, param_types, &params) {
Ok(plan) => plan,
Err(e) => {
if e.status_code().should_log_error() {
error!(e; "params: {}", params
.iter()
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
.join(", "));
}

w.error(
ErrorKind::ER_TRUNCATED_WRONG_VALUE,
format!(
"err: {}, params: {}",
e.output_msg(),
params
.iter()
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
.join(", ")
)
.as_bytes(),
e.output_msg().as_bytes(),
)
.await?;
return Ok(());
Expand Down Expand Up @@ -420,6 +419,15 @@ fn replace_params_with_values(
) -> Result<LogicalPlan> {
debug_assert_eq!(param_types.len(), params.len());

debug!(
"replace_params_with_values(param_types: {:#?}, params: {:#?})",
param_types,
params
.iter()
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
.join(", ")
);

let mut values = Vec::with_capacity(params.len());

for (i, param) in params.iter().enumerate() {
Expand Down
30 changes: 25 additions & 5 deletions src/servers/src/mysql/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
use std::ops::ControlFlow;
use std::time::Duration;

use chrono::{NaiveDate, NaiveDateTime};
use chrono::NaiveDate;
use common_query::prelude::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{self, Value};
use itertools::Itertools;
use opensrv_mysql::{ParamValue, ValueInner};
use opensrv_mysql::{to_naive_datetime, ParamValue, ValueInner};
use snafu::ResultExt;
use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
use sql::statements::statement::Statement;
Expand Down Expand Up @@ -171,9 +171,29 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
let date: common_time::Date = NaiveDate::from(param.value).into();
Ok(ScalarValue::Date32(Some(date.val())))
}
ValueInner::Datetime(_) => Ok(ScalarValue::Date64(Some(
NaiveDateTime::from(param.value).timestamp_millis(),
))),
ValueInner::Datetime(_) => {
let timestamp_millis = to_naive_datetime(param.value)
.map_err(|e| {
error::MysqlValueConversionSnafu {
err_msg: e.to_string(),
}
.build()
})?
.timestamp_millis();

match t {
ConcreteDataType::DateTime(_) => Ok(ScalarValue::Date64(Some(timestamp_millis))),
ConcreteDataType::Timestamp(_) => Ok(ScalarValue::TimestampMillisecond(
Some(timestamp_millis),
None,
)),
_ => error::PreparedStmtTypeMismatchSnafu {
expected: t,
actual: param.coltype,
}
.fail(),
}
}
ValueInner::Time(_) => Ok(ScalarValue::Time64Nanosecond(Some(
Duration::from(param.value).as_millis() as i64,
))),
Expand Down
76 changes: 76 additions & 0 deletions tests-integration/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ macro_rules! sql_tests {
test_postgres_auth,
test_postgres_crud,
test_postgres_parameter_inference,
test_mysql_prepare_stmt_insert_timestamp,
);
)*
};
Expand Down Expand Up @@ -579,3 +580,78 @@ pub async fn test_mysql_async_timestamp(store_type: StorageType) {
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}

pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) {
let (addr, mut guard, server) =
setup_mysql_server(store_type, "test_mysql_prepare_stmt_insert_timestamp").await;

let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();

sqlx::query("create table demo(i bigint, ts timestamp time index)")
.execute(&pool)
.await
.unwrap();

// Valid timestamp binary encoding: https://mariadb.com/kb/en/resultset-row/#timestamp-binary-encoding

// Timestamp data length = 4, year-month-day(ymd) only:
sqlx::query("insert into demo values(?, ?)")
.bind(0)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
// Though hour, minute and second are provided, `sqlx` will not encode them if they are all zeroes,
// which is just what we desire here.
// See https://github.com/launchbadge/sqlx/blob/bb064e3789d68ad4e9affe7cba34944abb000f72/sqlx-core/src/mysql/types/chrono.rs#L186C22-L186C22
.and_then(|x| x.and_hms_opt(0, 0, 0))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();

// Timestamp data length = 7, ymd and hour-minute-second(hms):
sqlx::query("insert into demo values(?, ?)")
.bind(1)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
.and_then(|x| x.and_hms_opt(13, 19, 1))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();

// Timestamp data length = 11, ymd, hms and microseconds:
sqlx::query("insert into demo values(?, ?)")
.bind(2)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
.and_then(|x| x.and_hms_micro_opt(13, 20, 1, 123456))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();

let rows = sqlx::query("select i, ts from demo order by i")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 3);

let x: DateTime<Utc> = rows[0].get(1);
assert_eq!(x.to_string(), "2023-12-19 00:00:00 UTC");

let x: DateTime<Utc> = rows[1].get(1);
assert_eq!(x.to_string(), "2023-12-19 13:19:01 UTC");

let x: DateTime<Utc> = rows[2].get(1);
assert_eq!(x.to_string(), "2023-12-19 13:20:01.123 UTC");

let _ = server.shutdown().await;
guard.remove_all().await;
}

0 comments on commit c7b3677

Please sign in to comment.