Skip to content

Commit

Permalink
Make serialization of date types fallible
Browse files Browse the repository at this point in the history
  • Loading branch information
bnaecker committed Oct 14, 2024
1 parent 2750cba commit ff0af17
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 26 deletions.
41 changes: 29 additions & 12 deletions oximeter/db/src/native/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,18 +470,24 @@ impl Precision {
}

/// Convert the provided datetime into a timestamp in the right precision.
pub(crate) fn scale(&self, value: DateTime<impl chrono::TimeZone>) -> i64 {
///
/// This returns `None` if the timestamp cannot be converted to an `i64`,
/// which is how ClickHouse stores the values.
pub(crate) fn scale(
&self,
value: DateTime<impl chrono::TimeZone>,
) -> Option<i64> {
match self.0 {
0 => value.timestamp(),
1 => value.timestamp_millis() / 100,
2 => value.timestamp_millis() / 10,
3 => value.timestamp_millis(),
4 => value.timestamp_micros() / 100,
5 => value.timestamp_micros() / 10,
6 => value.timestamp_micros(),
7 => value.timestamp_nanos_opt().unwrap() / 100,
8 => value.timestamp_nanos_opt().unwrap() / 10,
9 => value.timestamp_nanos_opt().unwrap(),
0 => Some(value.timestamp()),
1 => Some(value.timestamp_millis() / 100),
2 => Some(value.timestamp_millis() / 10),
3 => Some(value.timestamp_millis()),
4 => Some(value.timestamp_micros() / 100),
5 => Some(value.timestamp_micros() / 10),
6 => Some(value.timestamp_micros()),
7 => value.timestamp_nanos_opt().map(|x| x / 100),
8 => value.timestamp_nanos_opt().map(|x| x / 10),
9 => value.timestamp_nanos_opt(),
10.. => unreachable!(),
}
}
Expand Down Expand Up @@ -800,7 +806,8 @@ mod tests {
let now = Utc::now();
for precision in 0..=Precision::MAX {
let prec = Precision(precision);
let timestamp = prec.scale(now);
let timestamp =
prec.scale(now).expect("Current time should fit in an i64");
let conv = prec.as_conv(&Utc);
let recovered = conv(&Utc, timestamp);
let now_with_precision = now.trunc_subsecs(u16::from(prec.0));
Expand All @@ -816,6 +823,16 @@ mod tests {
}
}

#[test]
fn datetime64_scale_checks_range() {
assert_eq!(
Precision(9).scale(chrono::DateTime::<Utc>::MAX_UTC),
None,
"Should fail to scale a timestamp that doesn't fit in \
the range of an i64"
);
}

#[test]
fn parse_date_time() {
for (type_, s) in [
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/native/io/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn encode(block: Block, mut dst: &mut BytesMut) -> Result<(), Error> {
io::varuint::encode(block.n_columns, &mut dst);
io::varuint::encode(block.n_rows, &mut dst);
for (name, col) in block.columns {
io::column::encode(&name, col, &mut dst);
io::column::encode(&name, col, &mut dst)?;
}
Ok(())
}
Expand Down
108 changes: 95 additions & 13 deletions oximeter/db/src/native/io/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ use uuid::Uuid;
#[allow(deprecated)]
const EPOCH: NaiveDate = NaiveDate::from_ymd(1970, 01, 01);

// Maximum supported Date in ClickHouse.
//
// See https://clickhouse.com/docs/en/sql-reference/data-types/date
const MAX_DATE: &str = "2149-06-06";

// Maximum supported DateTime in ClickHouse.
//
// See https://clickhouse.com/docs/en/sql-reference/data-types/datetime.
const MAX_DATETIME: &str = "2106-02-07 06:28:15";

// Maximum supported DateTime64 in ClickHouse
//
// See https://clickhouse.com/docs/en/sql-reference/data-types/datetime64.
const MAX_DATETIME64: &str = "2299-12-31 23:59:59.99999999";

/// Helper macro to quickly and unsafely copy POD data from a message from the
/// ClickHouse server into our own column data types.
macro_rules! copyin_pod_values_raw {
Expand Down Expand Up @@ -182,7 +197,7 @@ fn decode_value_array(
// this has exactly `n_rows` chunks of 4 bytes each.
let timestamp = u32::from_le_bytes(chunk.try_into().unwrap());

// Safey: This only panics if the timestamp is out of range,
// Safety: This only panics if the timestamp is out of range,
// which is not possible as this is actually a u32.
values.push(tz.timestamp_opt(i64::from(timestamp), 0).unwrap());
}
Expand All @@ -208,7 +223,7 @@ fn decode_value_array(
// this has exactly `n_rows` chunks of 8 bytes each.
let timestamp = i64::from_le_bytes(chunk.try_into().unwrap());

// Safey: This only panics if the timestamp is out of range,
// Safety: This only panics if the timestamp is out of range,
// which is not possible as this is actually a u32.
values.push(conv(timezone, timestamp));
}
Expand Down Expand Up @@ -316,17 +331,24 @@ macro_rules! copyout_pod_values {
///
/// This panics if the data type is unsupported. Use `DataType::is_supported()`
/// to check that first.
pub fn encode(name: &str, column: Column, mut dst: &mut BytesMut) {
pub fn encode(
name: &str,
column: Column,
mut dst: &mut BytesMut,
) -> Result<(), Error> {
assert!(column.data_type.is_supported());
io::string::encode(name, &mut dst);
io::string::encode(column.data_type.to_string(), &mut dst);
// Encode the "custom serialization tag". See `decode` for details.
dst.put_u8(0);
encode_value_array(column.values, dst);
encode_value_array(column.values, dst)
}

/// Encode an array of values into a buffer.
fn encode_value_array(values: ValueArray, mut dst: &mut BytesMut) {
fn encode_value_array(
values: ValueArray,
mut dst: &mut BytesMut,
) -> Result<(), Error> {
match values {
ValueArray::UInt8(values) => dst.put(values.as_slice()),
ValueArray::UInt16(values) => copyout_pod_values!(u16, values, dst),
Expand Down Expand Up @@ -362,33 +384,59 @@ fn encode_value_array(values: ValueArray, mut dst: &mut BytesMut) {
ValueArray::Date(values) => {
// Dates are represented in ClickHouse as a 16-bit unsigned number
// of days since the UNIX epoch.
//
// Since these can be constructed from any `NaiveDate`, they can
// have wider values than ClickHouse supports. Check that here
// during conversion to the `u16` format.
dst.reserve(values.len() * std::mem::size_of::<u16>());
for value in values {
let days = value.signed_duration_since(EPOCH).num_days();
dst.put_u16_le(u16::try_from(days).unwrap());
let days =
u16::try_from(days).map_err(|_| Error::OutOfRange {
type_name: String::from("Date"),
min: EPOCH.to_string(),
max: MAX_DATE.to_string(),
value: value.to_string(),
})?;
dst.put_u16_le(days);
}
}
ValueArray::DateTime { values, .. } => {
// DateTimes are always little-endian u32s giving the UNIX
// timestamp.
for value in values {
// Safety: We only construct these today from a u32 in the first
// place, so this must also be safe.
dst.put_u32_le(u32::try_from(value.timestamp()).unwrap());
// DateTime's in ClickHouse must fit in a u32, so validate the
// range here.
let val = u32::try_from(value.timestamp()).map_err(|_| {
Error::OutOfRange {
type_name: String::from("DateTime"),
min: EPOCH.and_hms_opt(0, 0, 0).unwrap().to_string(),
max: MAX_DATETIME.to_string(),
value: value.to_string(),
}
})?;
dst.put_u32_le(val);
}
}
ValueArray::DateTime64 { precision, values, .. } => {
// DateTime64s are always encoded as i64s, in whatever
// resolution is defined by the column type itself.
dst.reserve(values.len() * std::mem::size_of::<i64>());
for value in values {
let timestamp = precision.scale(value);
let Some(timestamp) = precision.scale(value) else {
return Err(Error::OutOfRange {
type_name: String::from("DateTime64"),
min: EPOCH.to_string(),
max: MAX_DATETIME64.to_string(),
value: value.to_string(),
});
};
dst.put_i64_le(timestamp);
}
}
ValueArray::Nullable { is_null, values } => {
copyout_pod_values!(bool, is_null, dst);
encode_value_array(*values, dst);
encode_value_array(*values, dst)?;
}
ValueArray::Enum8 { values, .. } => {
copyout_pod_values!(i8, values, dst)
Expand All @@ -398,10 +446,11 @@ fn encode_value_array(values: ValueArray, mut dst: &mut BytesMut) {
// array, plus the flattened data itself.
encode_array_offsets(&arrays, dst);
for array in arrays {
encode_value_array(array, dst);
encode_value_array(array, dst)?;
}
}
}
Ok(())
}

// Encode the column offsets for an array column into the provided buffer.
Expand Down Expand Up @@ -577,6 +626,7 @@ mod tests {
),
(DataType::Ipv4, ValueArray::Ipv4(vec![Ipv4Addr::LOCALHOST])),
(DataType::Ipv6, ValueArray::Ipv6(vec![Ipv6Addr::LOCALHOST])),
(DataType::Date, ValueArray::Date(vec![now.date_naive()])),
(
DataType::DateTime(Tz::UTC),
ValueArray::DateTime { tz: Tz::UTC, values: vec![now] },
Expand Down Expand Up @@ -637,7 +687,7 @@ mod tests {
let n_rows = values.len();
let col = Column { values, data_type: typ.clone() };
let mut buf = BytesMut::new();
encode("foo", col.clone(), &mut buf);
encode("foo", col.clone(), &mut buf).unwrap();
let (name, decoded) = decode(&mut &buf[..], n_rows)
.expect("Should have succeeded in decoding full column")
.unwrap_or_else(|| {
Expand All @@ -651,4 +701,36 @@ mod tests {
);
}
}

#[test]
fn fail_to_encode_out_of_range_column() {
let max = Tz::from_utc_datetime(
&Tz::UTC,
&chrono::DateTime::<Tz>::MAX_UTC.naive_utc(),
);
let precision = Precision::new(9).unwrap();
// See https://clickhouse.com/docs/en/sql-reference/data-types/datetime
// and related pages for the supported ranges of these types.
for (typ, values) in [
(DataType::Date, ValueArray::Date(vec![max.date_naive()])),
(
DataType::DateTime(Tz::UTC),
ValueArray::DateTime { tz: Tz::UTC, values: vec![max] },
),
(
DataType::DateTime64(precision, Tz::UTC),
ValueArray::DateTime64 {
precision,
tz: Tz::UTC,
values: vec![max],
},
),
] {
let col = Column { values, data_type: typ.clone() };
let mut buf = BytesMut::new();
let err = encode("foo", col.clone(), &mut buf)
.expect_err("Should fail to encode date-like column with out of range value");
assert!(matches!(err, Error::OutOfRange { .. }));
}
}
}
3 changes: 3 additions & 0 deletions oximeter/db/src/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,7 @@ pub enum Error {

#[error("Cannot concatenate blocks with mismatched structure")]
MismatchedBlockStructure,

#[error("Value out of range for corresponding ClickHouse type")]
OutOfRange { type_name: String, min: String, max: String, value: String },
}

0 comments on commit ff0af17

Please sign in to comment.