Skip to content

Commit

Permalink
[INLONG-10968][SDK] Support Inlong Transform operator annotation (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Sep 3, 2024
1 parent cb4b61b commit f1285ed
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* AndOperator
*
*/
@TransformOperator(values = AndExpression.class)
public class AndOperator implements ExpressionOperator {

private final ExpressionOperator left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* EqualsToOperator
*
*/
@TransformOperator(values = EqualsTo.class)
public class EqualsToOperator implements ExpressionOperator {

private final ValueParser left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* GreaterThanEqualsOperator
*
*/
@TransformOperator(values = GreaterThanEquals.class)
public class GreaterThanEqualsOperator implements ExpressionOperator {

private final ValueParser left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* GreaterThanOperator
*
*/
@TransformOperator(values = GreaterThan.class)
public class GreaterThanOperator implements ExpressionOperator {

private final ValueParser left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* MinorThanEqualsOperator
*
*/
@TransformOperator(values = MinorThanEquals.class)
public class MinorThanEqualsOperator implements ExpressionOperator {

private final ValueParser left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* MinorThanOperator
*
*/
@TransformOperator(values = MinorThan.class)
public class MinorThanOperator implements ExpressionOperator {

private final ValueParser left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* NotEqualsToOperator
*
*/
@TransformOperator(values = NotEqualsTo.class)
public class NotEqualsToOperator implements ExpressionOperator {

private final ValueParser left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* NotOperator
*
*/
@TransformOperator(values = NotExpression.class)
public class NotOperator implements ExpressionOperator {

private final ExpressionOperator node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,83 @@
import org.apache.inlong.sdk.transform.process.parser.ParserTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
import net.sf.jsqlparser.expression.NotExpression;
import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
import org.apache.commons.lang.ObjectUtils;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;

import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Map;
import java.util.Set;

/**
* OperatorTools
*
*/
@Slf4j
public class OperatorTools {

private static final String OPERATOR_PATH = "org.apache.inlong.sdk.transform.process.operator";

private final static Map<Class<?>, Class<?>> operatorMap = Maps.newConcurrentMap();

public static final String ROOT_KEY = "$root";

public static final String CHILD_KEY = "$child";

static {
init();
}

private static void init() {
Reflections reflections = new Reflections(OPERATOR_PATH, Scanners.TypesAnnotated);
Set<Class<?>> clazzSet = reflections.getTypesAnnotatedWith(TransformOperator.class);
for (Class<?> clazz : clazzSet) {
if (ExpressionOperator.class.isAssignableFrom(clazz)) {
TransformOperator annotation = clazz.getAnnotation(TransformOperator.class);
if (annotation == null) {
continue;
}
Class<?>[] values = annotation.values();
for (Class<?> value : values) {
operatorMap.compute(value, (key, former) -> {
if (former != null) {
log.warn("find a conflict for parser class [{}], the former one is [{}], new one is [{}]",
key, former.getName(), clazz.getName());
}
return clazz;
});
}
}
}
}

public static ExpressionOperator getTransformOperator(Expression expr) {
Class<?> clazz = operatorMap.get(expr.getClass());
if (clazz == null) {
return null;
}
try {
Constructor<?> constructor = clazz.getDeclaredConstructor(expr.getClass());
return (ExpressionOperator) constructor.newInstance(expr);
} catch (NoSuchMethodException e) {
log.error("transform operator {} needs one constructor that accept one params whose type is {}",
clazz.getName(), expr.getClass().getName(), e);
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static ExpressionOperator buildOperator(Expression expr) {
if (expr instanceof AndExpression) {
return new AndOperator((AndExpression) expr);
} else if (expr instanceof OrExpression) {
return new OrOperator((OrExpression) expr);
} else if (expr instanceof Parenthesis) {
return new ParenthesisOperator((Parenthesis) expr);
} else if (expr instanceof NotExpression) {
return new NotOperator((NotExpression) expr);
} else if (expr instanceof EqualsTo) {
return new EqualsToOperator((EqualsTo) expr);
} else if (expr instanceof NotEqualsTo) {
return new NotEqualsToOperator((NotEqualsTo) expr);
} else if (expr instanceof GreaterThan) {
return new GreaterThanOperator((GreaterThan) expr);
} else if (expr instanceof GreaterThanEquals) {
return new GreaterThanEqualsOperator((GreaterThanEquals) expr);
} else if (expr instanceof MinorThan) {
return new MinorThanOperator((MinorThan) expr);
} else if (expr instanceof MinorThanEquals) {
return new MinorThanEqualsOperator((MinorThanEquals) expr);
if (expr != null) {
return getTransformOperator(expr);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* OrOperator
*
*/
@TransformOperator(values = OrExpression.class)
public class OrOperator implements ExpressionOperator {

private final ExpressionOperator left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* ParenthesisOperator
*
*/
@TransformOperator(values = Parenthesis.class)
public class ParenthesisOperator implements ExpressionOperator {

private final ExpressionOperator node;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.operator;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target(TYPE)
public @interface TransformOperator {

Class<?>[] values();
}

0 comments on commit f1285ed

Please sign in to comment.