Skip to content

Commit

Permalink
feat(pipeline): gsub prosessor (#4121)
Browse files Browse the repository at this point in the history
* chore: add log http ingester scaffold

* chore: add some example code

* chore: add log inserter

* chore: add log handler file

* chore: add pipeline lib

* chore: import log handler

* chore: add pipelime http handler

* chore: add pipeline private table

* chore: add pipeline API

* chore: improve error handling

* chore: merge main

* chore: add multi content type support for log handler

* refactor: remove servers dep on pipeline

* refactor: move define_into_tonic_status to common-error

* refactor: bring in pipeline 3eb890c551b8d7f60c4491fcfec18966e2b210a4

* chore: fix typo

* refactor: bring in pipeline a95c9767d7056ab01dd8ca5fa1214456c6ffc72c

* chore: fix typo and license header

* refactor: move http event handler to a separate file

* chore: add test for pipeline

* chore: fmt

* refactor: bring in pipeline 7d2402701877901871dd1294a65ac937605a6a93

* refactor: move `pipeline_operator` to `pipeline` crate

* chore: minor update

* refactor: bring in pipeline 1711f4d46687bada72426d88cda417899e0ae3a4

* chore: add log

* chore: add log

* chore: remove open hook

* chore: minor update

* chore: fix fmt

* chore: minor update

* chore: rename desc for pipeline table

* refactor: remove updated_at in pipelines

* chore: add more content type support for log inserter api

* chore: introduce pipeline crate

* chore: update upload pipeline api

* chore: fix by pr commit

* chore: add some doc for pub fn/struct

* chore: some minro fix

* chore: add pipeline version support

* chore: impl log pipeline version

* gsub prosessor

* chore: add test

* chore: update commit

Co-authored-by: dennis zhuang <[email protected]>

---------

Co-authored-by: paomian <[email protected]>
Co-authored-by: shuiyisong <[email protected]>
Co-authored-by: shuiyisong <[email protected]>
Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
5 people authored Jun 17, 2024
1 parent 0aceebf commit 0fc18b6
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 0 deletions.
229 changes: 229 additions & 0 deletions src/pipeline/src/etl/processor/gsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use regex::Regex;

use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
};
use crate::etl::value::{Array, Map, Value};

pub(crate) const PROCESSOR_GSUB: &str = "gsub";

const REPLACEMENT_NAME: &str = "replacement";
const PATTERN_NAME: &str = "pattern";

/// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value
#[derive(Debug, Default)]
pub struct GsubProcessor {
fields: Fields,
pattern: Option<Regex>,
replacement: Option<String>,
ignore_missing: bool,
}

impl GsubProcessor {
fn with_fields(&mut self, fields: Fields) {
self.fields = fields;
}

fn with_ignore_missing(&mut self, ignore_missing: bool) {
self.ignore_missing = ignore_missing;
}

fn try_pattern(&mut self, pattern: &str) -> Result<(), String> {
self.pattern = Some(Regex::new(pattern).map_err(|e| e.to_string())?);
Ok(())
}

fn with_replacement(&mut self, replacement: impl Into<String>) {
self.replacement = Some(replacement.into());
}

fn check(self) -> Result<Self, String> {
if self.pattern.is_none() {
return Err("pattern is required".to_string());
}

if self.replacement.is_none() {
return Err("replacement is required".to_string());
}

Ok(self)
}

fn process_string_field(&self, val: &str, field: &Field) -> Result<Map, String> {
let replacement = self.replacement.as_ref().unwrap();
let new_val = self
.pattern
.as_ref()
.unwrap()
.replace_all(val, replacement)
.to_string();
let val = Value::String(new_val);

let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};

Ok(Map::one(key, val))
}

fn process_array_field(&self, arr: &Array, field: &Field) -> Result<Map, String> {
let key = match field.target_field {
Some(ref target_field) => target_field,
None => field.get_field(),
};

let re = self.pattern.as_ref().unwrap();
let replacement = self.replacement.as_ref().unwrap();

let mut result = Array::default();
for val in arr.iter() {
match val {
Value::String(haystack) => {
let new_val = re.replace_all(haystack, replacement).to_string();
result.push(Value::String(new_val));
}
_ => {
return Err(format!(
"{} processor: expect string or array string, but got {val:?}",
self.kind()
))
}
}
}

Ok(Map::one(key, Value::Array(result)))
}
}

impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
type Error = String;

fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self, Self::Error> {
let mut processor = GsubProcessor::default();

for (k, v) in value.iter() {
let key = k
.as_str()
.ok_or(format!("key must be a string, but got {k:?}"))?;
match key {
FIELD_NAME => {
processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?));
}
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
PATTERN_NAME => {
processor.try_pattern(&yaml_string(v, PATTERN_NAME)?)?;
}
REPLACEMENT_NAME => {
processor.with_replacement(yaml_string(v, REPLACEMENT_NAME)?);
}

IGNORE_MISSING_NAME => {
processor.with_ignore_missing(yaml_bool(v, IGNORE_MISSING_NAME)?);
}

_ => {}
}
}

processor.check()
}
}

impl crate::etl::processor::Processor for GsubProcessor {
fn kind(&self) -> &str {
PROCESSOR_GSUB
}

fn ignore_missing(&self) -> bool {
self.ignore_missing
}

fn fields(&self) -> &Fields {
&self.fields
}

fn exec_field(&self, val: &Value, field: &Field) -> Result<Map, String> {
match val {
Value::String(val) => self.process_string_field(val, field),
Value::Array(arr) => self.process_array_field(arr, field),
_ => Err(format!(
"{} processor: expect string or array string, but got {val:?}",
self.kind()
)),
}
}
}

#[cfg(test)]
mod tests {
use crate::etl::field::Field;
use crate::etl::processor::gsub::GsubProcessor;
use crate::etl::processor::Processor;
use crate::etl::value::{Map, Value};

#[test]
fn test_string_value() {
let mut processor = GsubProcessor::default();
processor.try_pattern(r"\d+").unwrap();
processor.with_replacement("xxx");

let field = Field::new("message");
let val = Value::String("123".to_string());
let result = processor.exec_field(&val, &field).unwrap();

assert_eq!(
result,
Map::one("message", Value::String("xxx".to_string()))
);
}

#[test]
fn test_array_string_value() {
let mut processor = GsubProcessor::default();
processor.try_pattern(r"\d+").unwrap();
processor.with_replacement("xxx");

let field = Field::new("message");
let val = Value::Array(
vec![
Value::String("123".to_string()),
Value::String("456".to_string()),
]
.into(),
);
let result = processor.exec_field(&val, &field).unwrap();

assert_eq!(
result,
Map::one(
"message",
Value::Array(
vec![
Value::String("xxx".to_string()),
Value::String("xxx".to_string())
]
.into()
)
)
);
}
}
3 changes: 3 additions & 0 deletions src/pipeline/src/etl/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod csv;
pub mod date;
pub mod dissect;
pub mod epoch;
pub mod gsub;
pub mod letter;
pub mod regex;
pub mod urlencoding;
Expand All @@ -29,6 +30,7 @@ use csv::CsvProcessor;
use date::DateProcessor;
use dissect::DissectProcessor;
use epoch::EpochProcessor;
use gsub::GsubProcessor;
use letter::LetterProcessor;
use regex::RegexProcessor;
use urlencoding::UrlEncodingProcessor;
Expand Down Expand Up @@ -163,6 +165,7 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<Arc<dyn Processor>, String>
date::PROCESSOR_DATE => Arc::new(DateProcessor::try_from(value)?),
dissect::PROCESSOR_DISSECT => Arc::new(DissectProcessor::try_from(value)?),
epoch::PROCESSOR_EPOCH => Arc::new(EpochProcessor::try_from(value)?),
gsub::PROCESSOR_GSUB => Arc::new(GsubProcessor::try_from(value)?),
letter::PROCESSOR_LETTER => Arc::new(LetterProcessor::try_from(value)?),
regex::PROCESSOR_REGEX => Arc::new(RegexProcessor::try_from(value)?),
urlencoding::PROCESSOR_URL_ENCODING => Arc::new(UrlEncodingProcessor::try_from(value)?),
Expand Down
12 changes: 12 additions & 0 deletions src/pipeline/src/etl/value/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl std::ops::Deref for Array {
}
}

impl std::ops::DerefMut for Array {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.values
}
}

impl IntoIterator for Array {
type Item = Value;

Expand All @@ -54,3 +60,9 @@ impl IntoIterator for Array {
self.values.into_iter()
}
}

impl From<Vec<Value>> for Array {
fn from(values: Vec<Value>) -> Self {
Array { values }
}
}
70 changes: 70 additions & 0 deletions src/pipeline/tests/gsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use greptime_proto::v1::value::ValueData::TimestampMillisecondValue;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};

#[test]
fn test_gsub() {
let input_value_str = r#"
[
{
"reqTimeSec": "1573840000.000"
}
]
"#;
let input_value: Value = serde_json::from_str::<serde_json::Value>(input_value_str)
.expect("failed to parse input value")
.try_into()
.expect("failed to convert input value");

let pipeline_yaml = r#"
---
description: Pipeline for Akamai DataStream2 Log
processors:
- gsub:
field: reqTimeSec
pattern: "\\."
replacement: ""
- epoch:
field: reqTimeSec
resolution: millisecond
ignore_missing: true
transform:
- field: reqTimeSec
type: epoch, millisecond
index: timestamp
"#;

let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let output = pipeline.exec(input_value).expect("failed to exec pipeline");

let expected_schema = vec![ColumnSchema {
column_name: "reqTimeSec".to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
}];

assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(TimestampMillisecondValue(1573840000000))
);
}

0 comments on commit 0fc18b6

Please sign in to comment.