diff --git a/Cargo.lock b/Cargo.lock index 97a509658e09..ee7bde92e7b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5477,8 +5477,7 @@ dependencies = [ [[package]] name = "opensrv-mysql" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6b6a785aafb26a97c26078b9457e96cb238b386781583783a3a3d3de47fa841" +source = "git+https://github.com/MichaelScofield/opensrv.git?rev=1676c1d#1676c1d166cf33e7745e1b5db54b3fe2b7defec9" dependencies = [ "async-trait", "byteorder", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 8b8ed8ac5358..448549fdf609 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -54,7 +54,8 @@ lazy_static.workspace = true mime_guess = "2.0" once_cell.workspace = true openmetrics-parser = "0.4" -opensrv-mysql = "0.6" +# TODO(LFC): Wait for https://github.com/datafuselabs/opensrv/pull/60 +opensrv-mysql = { git = "https://github.com/MichaelScofield/opensrv.git", rev = "1676c1d" } opentelemetry-proto.workspace = true parking_lot = "0.12" pgwire = "0.17" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 665ecbaae1e3..4c21b9832693 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -408,6 +408,9 @@ pub enum Error { error: FromUtf8Error, location: Location, }, + + #[snafu(display("Failed to convert Mysql value, error: {}", err_msg))] + MysqlValueConversion { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -460,7 +463,8 @@ impl ErrorExt for Error { | PreparedStmtTypeMismatch { .. } | TimePrecision { .. } | UrlDecode { .. } - | IncompatibleSchema { .. } => StatusCode::InvalidArguments, + | IncompatibleSchema { .. } + | MysqlValueConversion { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 29ad263f224d..7dace0c415f7 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime}; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_query::Output; -use common_telemetry::{error, logging, tracing, warn}; +use common_telemetry::{debug, error, logging, tracing, warn}; use datatypes::prelude::ConcreteDataType; use itertools::Itertools; use opensrv_mysql::{ @@ -291,17 +291,16 @@ impl AsyncMysqlShim for MysqlInstanceShi let plan = match replace_params_with_values(&plan, param_types, ¶ms) { Ok(plan) => plan, Err(e) => { + if e.status_code().should_log_error() { + error!(e; "params: {}", params + .iter() + .map(|x| format!("({:?}, {:?})", x.value, x.coltype)) + .join(", ")); + } + w.error( ErrorKind::ER_TRUNCATED_WRONG_VALUE, - format!( - "err: {}, params: {}", - e.output_msg(), - params - .iter() - .map(|x| format!("({:?}, {:?})", x.value, x.coltype)) - .join(", ") - ) - .as_bytes(), + e.output_msg().as_bytes(), ) .await?; return Ok(()); @@ -420,6 +419,15 @@ fn replace_params_with_values( ) -> Result { debug_assert_eq!(param_types.len(), params.len()); + debug!( + "replace_params_with_values(param_types: {:#?}, params: {:#?})", + param_types, + params + .iter() + .map(|x| format!("({:?}, {:?})", x.value, x.coltype)) + .join(", ") + ); + let mut values = Vec::with_capacity(params.len()); for (i, param) in params.iter().enumerate() { diff --git a/src/servers/src/mysql/helper.rs b/src/servers/src/mysql/helper.rs index 4414ebb7a17b..e5d99a087ad1 100644 --- a/src/servers/src/mysql/helper.rs +++ b/src/servers/src/mysql/helper.rs @@ -15,12 +15,12 @@ use std::ops::ControlFlow; use std::time::Duration; -use chrono::{NaiveDate, NaiveDateTime}; +use chrono::NaiveDate; use common_query::prelude::ScalarValue; use datatypes::prelude::ConcreteDataType; use datatypes::value::{self, Value}; use itertools::Itertools; -use opensrv_mysql::{ParamValue, ValueInner}; +use opensrv_mysql::{to_naive_datetime, ParamValue, ValueInner}; use snafu::ResultExt; use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut}; use sql::statements::statement::Statement; @@ -171,9 +171,29 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result Ok(ScalarValue::Date64(Some( - NaiveDateTime::from(param.value).timestamp_millis(), - ))), + ValueInner::Datetime(_) => { + let timestamp_millis = to_naive_datetime(param.value) + .map_err(|e| { + error::MysqlValueConversionSnafu { + err_msg: e.to_string(), + } + .build() + })? + .timestamp_millis(); + + match t { + ConcreteDataType::DateTime(_) => Ok(ScalarValue::Date64(Some(timestamp_millis))), + ConcreteDataType::Timestamp(_) => Ok(ScalarValue::TimestampMillisecond( + Some(timestamp_millis), + None, + )), + _ => error::PreparedStmtTypeMismatchSnafu { + expected: t, + actual: param.coltype, + } + .fail(), + } + } ValueInner::Time(_) => Ok(ScalarValue::Time64Nanosecond(Some( Duration::from(param.value).as_millis() as i64, ))), diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index d3c7914b4291..2b747a554a75 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -60,6 +60,7 @@ macro_rules! sql_tests { test_postgres_auth, test_postgres_crud, test_postgres_parameter_inference, + test_mysql_prepare_stmt_insert_timestamp, ); )* }; @@ -579,3 +580,78 @@ pub async fn test_mysql_async_timestamp(store_type: StorageType) { let _ = fe_mysql_server.shutdown().await; guard.remove_all().await; } + +pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) { + let (addr, mut guard, server) = + setup_mysql_server(store_type, "test_mysql_prepare_stmt_insert_timestamp").await; + + let pool = MySqlPoolOptions::new() + .max_connections(2) + .connect(&format!("mysql://{addr}/public")) + .await + .unwrap(); + + sqlx::query("create table demo(i bigint, ts timestamp time index)") + .execute(&pool) + .await + .unwrap(); + + // Valid timestamp binary encoding: https://mariadb.com/kb/en/resultset-row/#timestamp-binary-encoding + + // Timestamp data length = 4, year-month-day(ymd) only: + sqlx::query("insert into demo values(?, ?)") + .bind(0) + .bind( + NaiveDate::from_ymd_opt(2023, 12, 19) + // Though hour, minute and second are provided, `sqlx` will not encode them if they are all zeroes, + // which is just what we desire here. + // See https://github.com/launchbadge/sqlx/blob/bb064e3789d68ad4e9affe7cba34944abb000f72/sqlx-core/src/mysql/types/chrono.rs#L186C22-L186C22 + .and_then(|x| x.and_hms_opt(0, 0, 0)) + .unwrap(), + ) + .execute(&pool) + .await + .unwrap(); + + // Timestamp data length = 7, ymd and hour-minute-second(hms): + sqlx::query("insert into demo values(?, ?)") + .bind(1) + .bind( + NaiveDate::from_ymd_opt(2023, 12, 19) + .and_then(|x| x.and_hms_opt(13, 19, 1)) + .unwrap(), + ) + .execute(&pool) + .await + .unwrap(); + + // Timestamp data length = 11, ymd, hms and microseconds: + sqlx::query("insert into demo values(?, ?)") + .bind(2) + .bind( + NaiveDate::from_ymd_opt(2023, 12, 19) + .and_then(|x| x.and_hms_micro_opt(13, 20, 1, 123456)) + .unwrap(), + ) + .execute(&pool) + .await + .unwrap(); + + let rows = sqlx::query("select i, ts from demo order by i") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(rows.len(), 3); + + let x: DateTime = rows[0].get(1); + assert_eq!(x.to_string(), "2023-12-19 00:00:00 UTC"); + + let x: DateTime = rows[1].get(1); + assert_eq!(x.to_string(), "2023-12-19 13:19:01 UTC"); + + let x: DateTime = rows[2].get(1); + assert_eq!(x.to_string(), "2023-12-19 13:20:01.123 UTC"); + + let _ = server.shutdown().await; + guard.remove_all().await; +}