From 937b3d69051c5579b950efc9e1c213ce5c73fc81 Mon Sep 17 00:00:00 2001 From: KawYang <86555206+KawYang@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:29:32 +0800 Subject: [PATCH] [Feature][Transform] add json path transform (#247) --- .../request/job/transform/JsonPath.java | 39 ++++++++++++++ .../transform/JsonPathTransformOptions.java | 26 +++++++++ .../request/job/transform/Transform.java | 3 +- .../service/impl/ConnectorServiceImpl.java | 4 +- .../app/service/impl/JobTaskServiceImpl.java | 2 + .../impl/JsonPathTransformSwitcher.java | 51 ++++++++++++++++++ .../seatunnel/app/utils/TaskOptionUtils.java | 4 ++ .../dag/canvas/node.tsx | 3 ++ .../dag/images/json-path.png | Bin 0 -> 2878 bytes .../dag/sidebar/index.tsx | 3 ++ .../dag/use-model.ts | 26 ++++++++- .../dag/use-node-setting.ts | 12 +++++ 12 files changed, 169 insertions(+), 4 deletions(-) create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java create mode 100644 seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java new file mode 100644 index 000000000..4989061ab --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.domain.request.job.transform; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.List; + +@Data +@EqualsAndHashCode(callSuper = true) +public class JsonPath extends TransformOption { + + private List columns; +} + +@Data +class JsonPathColumn { + private String src_field; + private String path; + private String destField; + private String destType; + private String columnErrorHandleWay; +} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java new file mode 100644 index 000000000..cf9fa672b --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.domain.request.job.transform; + +import lombok.Data; + +@Data +public class JsonPathTransformOptions implements TransformOptions { + + private JsonPath jsonPath; +} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java index 3671a7f7e..a2b2cf3cb 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java @@ -24,5 +24,6 @@ public enum Transform { FIELDMAPPER, FILTERROWKIND, SPLIT, - SQL + SQL, + JSONPATH } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java index 307066a2f..cc9f538c6 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java @@ -123,6 +123,7 @@ public List listTransformsForJob(Long jobId) { .toUpperCase()); if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) { + return connectorCache.getTransform().stream() .filter( connectorInfo -> { @@ -133,7 +134,8 @@ public List listTransformsForJob(Long jobId) { || pluginName.equals("Replace") || pluginName.equals("Copy") || pluginName.equals("MultiFieldSplit") - || pluginName.equals("Sql"); + || pluginName.equals("Sql") + || pluginName.equals("JsonPath"); }) .collect(Collectors.toList()); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java index 107f7fb9e..c5cef3173 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java @@ -350,6 +350,8 @@ private JobTaskCheckRes checkNextTaskSchema( Collections.singletonList(sqlTransformOptions.getSql())); } break; + case JSONPATH: + break; case FILTERROWKIND: case REPLACE: default: diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java new file mode 100644 index 000000000..09579b6ce --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.thirdparty.transfrom.impl; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.app.domain.request.job.TableSchemaReq; +import org.apache.seatunnel.app.domain.request.job.transform.Transform; +import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions; +import org.apache.seatunnel.app.dynamicforms.FormStructure; +import org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper; +import org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcher; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AutoService(TransformConfigSwitcher.class) +public class JsonPathTransformSwitcher implements TransformConfigSwitcher { + @Override + public Transform getTransform() { + return Transform.JSONPATH; + } + + @Override + public FormStructure getFormStructure(OptionRule transformOptionRule) { + return SeaTunnelOptionRuleWrapper.wrapper(transformOptionRule, this.getTransform().name()); + } + + @Override + public Config mergeTransformConfig( + Config transformConfig, TransformOptions transformOption, TableSchemaReq inputSchema) { + return transformConfig; + } +} diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java index 02cc62ad5..579067871 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.app.domain.request.job.transform.CopyTransformOptions; import org.apache.seatunnel.app.domain.request.job.transform.FieldMapperTransformOptions; +import org.apache.seatunnel.app.domain.request.job.transform.JsonPathTransformOptions; import org.apache.seatunnel.app.domain.request.job.transform.SQLTransformOptions; import org.apache.seatunnel.app.domain.request.job.transform.SplitTransformOptions; import org.apache.seatunnel.app.domain.request.job.transform.Transform; @@ -47,6 +48,9 @@ public static T getTransformOption( transformOptionsStr, CopyTransformOptions.class); case SQL: return convertTransformStrToOptions(transformOptionsStr, SQLTransformOptions.class); + case JSONPATH: + return convertTransformStrToOptions( + transformOptionsStr, JsonPathTransformOptions.class); case FILTERROWKIND: case REPLACE: default: diff --git a/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx b/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx index e875761d1..998759ca3 100644 --- a/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx +++ b/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx @@ -23,6 +23,7 @@ import SinkImg from '../images/sink.png' import FieldMapperImg from '../images/field-mapper.png' import FilterEventTypeImg from '../images/filter-event-type.png' import ReplaceImg from '../images/replace.png' +import JsonPathImg from '../images/json-path.png' import SplitImg from '../images/spilt.png' import CopyImg from '../images/copy.png' import SqlImg from '../images/sql.png' @@ -52,6 +53,8 @@ const Node = defineComponent({ icon.value = CopyImg } else if (type === 'transform' && connectorType === 'Sql') { icon.value = SqlImg + } else if (type === 'transform' && connectorType === 'JsonPath') { + icon.value = JsonPathImg } return () => ( diff --git a/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png b/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png new file mode 100644 index 0000000000000000000000000000000000000000..b42f69ff3298e1754a81c1e45a68fa66c1410031 GIT binary patch literal 2878 zcmcImXHXN^7LEu)2%!c-Rf=X2&S?L`? zg&<0#2!vi#2q;A~AVmZg6e(_Y-preM^Jd??KX2xq`^|Umd~<%BGv|J{ZLCbeg0g}D z003-;HMZko&tDX$c?pk4y$8?0V}tmNHkX zM)D+l`Ej6UxzMT0DP2UDw-<(sGhL@F#8|`XXRWal_MMd2U$V>LHQLbm+mR6J_;gZd z^0_Q!8u@Imma*uND9QlMyDlWw`F|lD&MCq2jq7}*>scT~jPkfcSKQQ(`QZgspCIom zUaFKQR4IQHEW0YdDILMqPOMM_QdsekI>1WH7amLR>cd8#X4DgfkLmQetMwRsp?Q<4 zGyBU15Sc(kx(8?Ew~cJ&quQW1Bv=oTLoG?GHy7FbUT8*>K$U)taac)CJ|w1d&hllz zoeR+yJ0Sb#xA;yjP)n&LVZTDfgbj|*<~ls1R+EfhXFO=I)vESYgQdPNnpJvsCVb<$ zC4<>2)A1|gp$vP+;8@UVSp*5T@N~YN`EsJXGOt58+Gc7i1adA=dh19&yRCYqD!1pG zzBgodkH7ZC>pBCEk2$b>a;^&o*jsVg&{XIEt)e)qJ?64o?zSUcmb90~xn5loml__c zgCl=0i)u^Px~2Ao!tgshzWKsnrz^C8cR6@e&r$A|;GFr~zCY%2mjbd3C3M|(rPn6W z`&N86YW6b*XTz7_rTWtCOqu!FegV5xEhP!%M&4I-V(~)P> z7M&ev<(Cq=J<l>$cRxD{o zKa)Biq(YPrqlb>ftp&NbrBz52@Uj+Iue8KMH9vd~>f3rGjYma4Wg;w?&<)=@wFH@0 zyp`qJt_pRWk>^goho-Z;n$Hq`M-H8NuVH3l*CC>R9LIQQ+!|B$LAbTgtVqox4L##1 zl_T#{nvIM$_mH!38y8L-tPwHU#IaQ$95R*r1c>W(7!}f+G8eQNynf~;_?pXZ#SEe9 zbE;#}j2GBf_vdis!9oavX#Ms1eWU5Y0Y6^7-XjANVmq26nZuWY7iTXfFG4*z&x{`7 z)LqVk!S*X30@6NGqMV5!m)cWFlDZSUC(?vU^ZIh+1Gp1;m0Wi52p*M9_2rTY+~dCp z&-MZ8{dtyaXKQlhrkNq^SeopWwVt{zMN596}?a-|3S{6mqjh`vI{+5AC*Zc?M zK~2H);~wdNajJ^P#V%3vlf{WTNR=lweeYoYs6u&C5=9BbNJJp1MBTdeppK2i9Y|tk zo3yy|tYTe#-*;dwQPW?>T60ytBzZOWPJerZbgDgf7pMi>K32&c`r-sz$R?mGP)=&> z&QCwp=MYa#G%L*s0e#|l-Ikr|w;!$VK{J+*Nv?~LfoqhW8_@-x{MekyZcpRr?XDN% z5$7G_#0MW~&+t}o^3KwN>Mt#S(~r{q;nj9broyAIGKt2^oBNtzA$u+gvT0W?gIwOY z8(WQ`Z!%1P_79im>QqTMjngW-i6V1M{z4xs5z(5O26n2;TH7Tnyuidnad(R z@Lnji)=veiGIjpTmn$s|F;+UU^spMQ(ahsv(cwx#ZgvK!Y0&{}xJAJfe&}jG;w0Jo z7`g*uea$L+a7Zs)Li7<@MR^M#TXEQerOG;abl8VwlRxiSCtkZwigt;@A*|+BW>sWE z_%jvZ_XFga)M!soowSeX(N2UcW1A_Coam#~Otc$b*xayOIE#9@;L{r!;gz^kldOb&tSf!Z8*DHv`Zz}>l3+rAxr$7`EAQ>0b?4C=4P_Xqk8n{JmBjS|< z;pV|ujhw7xOgtazZJ#SN!)V$y6GY3J-~W_4G1e(LrooA#Kh}jYeZR@W?+TGN*)3@| zTN<^coEv~>BDXIAwcdC5`1Q#7ERz(U2oG7i6C!5o4I3}fM^;RhcG*`Iw3OT?hSt0r z*g)||sWXhmiEEQ=cc@5v;OS-YnYp0an(h1AMq_$*sbagVSCm#v4c%FW-SwD0)gP`iuxD+`%ZgPm*kdD9mJj`_7b{s zbn`}g98p{gqHNt*haV(1`dwO{F&WjoEzE46W#T8crBQ@bjgq}Jaj)cSA z)`d&{P2hcUJ|5*g>v{2900L(B}# match.match(/"([^"]+)"$/)[1]); + + state.outputTableData = state.inputTableData.map(item => ({ ...item })); + destFields.map(item => { + state.outputTableData.push( + { + "format":"", + "type": "TEXT", + "name": item, + "comment": "", + "primaryKey": false, + "defaultValue": '', + "nullable": false, + "unSupport": false, + "outputDataType": "TEXT" + } + ) + }); + } + else { state.outputTableData = t.fields } } diff --git a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts index 62f1b7ddf..e115758ff 100644 --- a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts +++ b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts @@ -241,6 +241,18 @@ export function useNodeSettingModal( tableName: tableInfo[0].tableName, fields: resultSchema.outputTableData }] + }else if(props.nodeInfo.connectorType === 'JsonPath') { + const resultSchema = modelRef.value.getOutputSchema() + const tableInfo = resultSchema.allTableData[0].tableInfos + transformOptions.columns = { + "sourceFieldName": resultSchema.outputTableData[0]?.name || null, + "columns": values.columns + } + modelOutputTableData = [{ + database: tableInfo[0].database, + tableName: tableInfo[0].tableName, + fields: resultSchema.outputTableData + }] } await saveTaskDefinitionItem(