Skip to content

Commit

Permalink
test: more on processors (#4493)
Browse files Browse the repository at this point in the history
* test: add date test

* test: add epoch test

* test: add letter test and complete some others

* test: add urlencoding test

* chore: typo
  • Loading branch information
shuiyisong authored Aug 4, 2024
1 parent cb4cffe commit 3b701d8
Show file tree
Hide file tree
Showing 8 changed files with 954 additions and 32 deletions.
138 changes: 138 additions & 0 deletions src/pipeline/tests/date.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod common;

use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;

const TEST_INPUT: &str = r#"
{
"input_str": "2024-06-27T06:13:36.991Z"
}"#;

const TEST_VALUE: Option<ValueData> =
Some(ValueData::TimestampNanosecondValue(1719468816991000000));

lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}

#[test]
fn test_parse_date() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;

let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}

#[test]
fn test_multi_formats() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;

let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}

#[test]
fn test_ignore_missing() {
let empty_input = r#"{}"#;

let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
transform:
- fields:
- input_str, ts
type: time
"#;

let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}

#[test]
fn test_timezone() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
timezone: 'Asia/Shanghai'
transform:
- fields:
- input_str, ts
type: time
"#;

let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampNanosecondValue(1719440016991000000))
);
}
150 changes: 146 additions & 4 deletions src/pipeline/tests/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ mod common;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};

fn make_string_column_schema(name: String) -> greptime_proto::v1::ColumnSchema {
common::make_column_schema(name, ColumnDataType::String, SemanticType::Field)
}

#[test]
fn test_dissect_pattern() {
let input_value_str = r#"
Expand All @@ -43,8 +47,8 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);

let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
Expand Down Expand Up @@ -91,8 +95,8 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);

let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
Expand All @@ -111,3 +115,141 @@ transform:
Some(StringValue("456".to_string()))
);
}

#[test]
fn test_ignore_missing() {
let empty_str = r#"{}"#;

let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{a} %{b}"
ignore_missing: true
transform:
- fields:
- a
- b
type: string
"#;

let output = common::parse_and_exec(empty_str, pipeline_yaml);

let expected_schema = vec![
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];

assert_eq!(output.schema, expected_schema);

assert_eq!(output.rows[0].values[0].value_data, None);
assert_eq!(output.rows[0].values[1].value_data, None);
}

#[test]
fn test_modifier() {
let empty_str = r#"
{
"str": "key1 key2 key3 key4 key5 key6 key7 key8"
}"#;

let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6} %{*key_7} %{&key_7}"
transform:
- fields:
- key1
- key2
- key3
- key5
- key7
type: string
"#;

let output = common::parse_and_exec(empty_str, pipeline_yaml);

let expected_schema = vec![
make_string_column_schema("key1".to_string()),
make_string_column_schema("key2".to_string()),
make_string_column_schema("key3".to_string()),
make_string_column_schema("key5".to_string()),
make_string_column_schema("key7".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];

assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("key1".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(StringValue("key2".to_string()))
);
assert_eq!(
output.rows[0].values[2].value_data,
Some(StringValue("key3 key4".to_string()))
);
assert_eq!(
output.rows[0].values[3].value_data,
Some(StringValue("key5".to_string()))
);
assert_eq!(
output.rows[0].values[4].value_data,
Some(StringValue("key8".to_string()))
);
}

#[test]
fn test_append_separator() {
let empty_str = r#"
{
"str": "key1 key2"
}"#;

let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{+key1} %{+key1}"
append_separator: "_"
transform:
- fields:
- key1
type: string
"#;

let output = common::parse_and_exec(empty_str, pipeline_yaml);

let expected_schema = vec![
make_string_column_schema("key1".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];

assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("key1_key2".to_string()))
);
}
Loading

0 comments on commit 3b701d8

Please sign in to comment.