diff --git a/inlong-dashboard/src/plugins/sources/common/SourceDefaultInfo.ts b/inlong-dashboard/src/plugins/sources/common/SourceDefaultInfo.ts index d5c768d8a9f..970bd181a7e 100644 --- a/inlong-dashboard/src/plugins/sources/common/SourceDefaultInfo.ts +++ b/inlong-dashboard/src/plugins/sources/common/SourceDefaultInfo.ts @@ -118,6 +118,10 @@ export class SourceDefaultInfo implements DataWithBackend, RenderRow, RenderList @I18n('meta.Sources.Name') sourceName: string; + @ColumnDecorator() + @I18n('meta.Sources.ClusterName') + readonly inlongClusterName: string; + @FieldDecorator({ type: 'select', props: { diff --git a/inlong-dashboard/src/ui/locales/cn.json b/inlong-dashboard/src/ui/locales/cn.json index 379a5a83e1a..f6b1e5a4935 100644 --- a/inlong-dashboard/src/ui/locales/cn.json +++ b/inlong-dashboard/src/ui/locales/cn.json @@ -25,6 +25,7 @@ "basic.SuccessfullyRestart": "重启成功", "basic.SuccessfullyStop": "停止成功", "meta.Sources.Name": "数据源名称", + "meta.Sources.ClusterName": "集群名", "meta.Sources.NameRule": "只能包含英文字母、数字、点号(.)、中划线(-)、下划线(_)", "meta.Sources.Type": "类型", "meta.Sources.File.SerializationType": "文件类型", diff --git a/inlong-dashboard/src/ui/locales/en.json b/inlong-dashboard/src/ui/locales/en.json index bc1106fdd6f..473f53d20d8 100644 --- a/inlong-dashboard/src/ui/locales/en.json +++ b/inlong-dashboard/src/ui/locales/en.json @@ -25,6 +25,7 @@ "basic.SuccessfullyRestart": "Successfully restart", "basic.SuccessfullyStop": "Successfully stop", "meta.Sources.Name": "Source name", + "meta.Sources.ClusterName": "Cluster Name", "meta.Sources.NameRule": "Only English letters, numbers, dots(.), minus(-), and underscores(_)", "meta.Sources.Type": "Type", "meta.Sources.File.SerializationType": "File type", diff --git a/inlong-dashboard/src/ui/pages/GroupDetail/DataSources/index.tsx b/inlong-dashboard/src/ui/pages/GroupDetail/DataSources/index.tsx index e1050f61d69..6af108b3c17 100644 --- a/inlong-dashboard/src/ui/pages/GroupDetail/DataSources/index.tsx +++ b/inlong-dashboard/src/ui/pages/GroupDetail/DataSources/index.tsx @@ -209,6 +209,8 @@ const Comp = ({ inlongGroupId, inlongStreamId, readonly }: Props, ref) => { { title: i18n.t('basic.Operating'), dataIndex: 'action', + fixed: 'right', + width: 200, render: (text, record) => readonly ? ( '-' @@ -235,7 +237,7 @@ const Comp = ({ inlongGroupId, inlongStreamId, readonly }: Props, ref) => { }, ]); }, [entityColumns, onDelete, onEdit, readonly]); - + const scroll = { x: 850 }; return ( <> { pagination, loading, onChange, + scroll, }} /> )} diff --git a/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx b/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx index a608dbcb7ef..fa9edff445c 100644 --- a/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx +++ b/inlong-dashboard/src/ui/pages/GroupDetail/DataStream/PreviewModal.tsx @@ -24,6 +24,7 @@ import { useRequest, useUpdateEffect } from '@/ui/hooks'; import i18n from '@/i18n'; import { ColumnsType } from 'antd/es/table'; import dayjs from 'dayjs'; +import { SortOrder } from 'antd/es/table/interface'; export interface Props extends ModalProps { inlongGroupId: string; @@ -87,6 +88,12 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, ...modalProps }) } return; }; + const timestampSorter = (a, b) => { + const dateA = dayjs(a.dt, 'YYYY-MM-DD HH:mm:ss'); + const dateB = dayjs(b.dt, 'YYYY-MM-DD HH:mm:ss'); + return dateA.isBefore(dateB) ? -1 : dateA.isAfter(dateB) ? 1 : 0; + }; + const sortOrder: SortOrder = 'descend'; const detailColumns: ColumnsType = [ { @@ -94,11 +101,13 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, ...modalProps }) key: 'dt', width: 200, dataIndex: 'dt', + sorter: timestampSorter, + defaultSortOrder: sortOrder, }, ].concat( (getColumn() ? getColumn() : []).concat([ { - title: 'operation', + title: i18n.t('basic.Operating'), key: 'operation', fixed: 'right', width: 100, diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index 0f1952c938a..47b139f1592 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -37,6 +37,7 @@ public class DataNodeType { public static final String SQLSERVER = "SQLSERVER"; public static final String MONGODB = "MONGODB"; public static final String DORIS = "DORIS"; + public static final String HTTP = "HTTP"; public static final String OCEANBASE = "OCEANBASE"; /** diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 5d069e33df1..16a1bfd3d81 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -71,6 +71,9 @@ public class SinkType extends StreamType { @SupportSortType(sortType = SortType.SORT_FLINK) public static final String TUBEMQ = "TUBEMQ"; + @SupportSortType(sortType = SortType.SORT_STANDALONE) + public static final String HTTP = "HTTP"; + @SupportSortType(sortType = SortType.SORT_FLINK) public static final String OCEANBASE = "OCEANBASE"; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java index d7e8ba7d4b1..d16ab1d46c5 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java @@ -33,6 +33,7 @@ public class ClusterType { public static final String DATAPROXY = "DATAPROXY"; public static final String KAFKA = "KAFKA"; + public static final String SORT_HTTP = "SORT_HTTP"; public static final String SORT_ES = "SORT_ES"; public static final String SORT_CLS = "SORT_CLS"; public static final String SORT_PULSAR = "SORT_PULSAR"; @@ -48,6 +49,7 @@ public class ClusterType { add(ClusterType.PULSAR); add(ClusterType.DATAPROXY); add(ClusterType.KAFKA); + add(ClusterType.SORT_HTTP); add(ClusterType.SORT_ES); add(ClusterType.SORT_CLS); add(ClusterType.SORT_PULSAR); diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java index eddf41f641c..5f7a46aaedc 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java @@ -40,6 +40,7 @@ public class InlongClusterNodeEntity implements Serializable { private String protocolType; private Integer nodeLoad; private String extParams; + private String operateLog; private String description; private Integer status; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java index fb2068435a6..f168de98d9e 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java @@ -55,6 +55,8 @@ List selectByParentId(@Param("parentId") Integer parent */ int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus, @Param("status") Integer status); + int updateOperateLogById(@Param("id") Integer id, @Param("operateLog") String operateLog); + int deleteById(Integer id); } \ No newline at end of file diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index d0016a28e3c..2afe00ee6bc 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -32,6 +32,7 @@ + @@ -42,8 +43,7 @@ - id, parent_id, type, ip, port, username, password, ssh_port, protocol_type, node_load, ext_params, description, - status, is_deleted, creator, modifier, create_time, modify_time, version + id, parent_id, type, ip, port, username, password, ssh_port, protocol_type, node_load, ext_params, operate_log, description, status, is_deleted, creator, modifier, create_time, modify_time, version ext_params = #{extParams,jdbcType=LONGVARCHAR}, + + operate_log = #{operateLog,jdbcType=LONGVARCHAR}, + description = #{description,jdbcType=VARCHAR}, @@ -248,6 +254,12 @@ + + update inlong_cluster_node + set operate_log = #{operateLog,jdbcType=LONGVARCHAR} + where id = #{id,jdbcType=INTEGER} + and is_deleted = 0 + delete from inlong_cluster_node diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java index 5e17a7f9b0f..14eaa7d9847 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java @@ -77,6 +77,9 @@ public class ClusterNodeRequest { @Length(min = 1, max = 163840, message = "length must be between 1 and 163840") private String extParams; + @ApiModelProperty(value = "Operate log") + private String operateLog; + @ApiModelProperty(value = "Description of the cluster node") @Length(max = 256, message = "length must be less than or equal to 256") private String description; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java index 51291fafdce..bfaea9a6327 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeResponse.java @@ -73,6 +73,9 @@ public class ClusterNodeResponse { @ApiModelProperty(value = "Extended params") private String extParams; + @ApiModelProperty(value = "Operate log") + private String operateLog; + @ApiModelProperty(value = "Description of the cluster node") private String description; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java new file mode 100644 index 00000000000..b2660c2df95 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterInfo.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sort.http; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterInfo; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORT_HTTP) +@ApiModel("Inlong cluster info for Sort http") +public class SortHttpClusterInfo extends BaseSortClusterInfo { + + public SortHttpClusterInfo() { + this.setType(ClusterType.SORT_HTTP); + } +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java new file mode 100644 index 00000000000..a7fd4d027ea --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/sort/http/SortHttpClusterRequest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.sort.http; + +import org.apache.inlong.manager.common.enums.ClusterType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sort.BaseSortClusterRequest; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = ClusterType.SORT_HTTP) +@ApiModel("Inlong cluster request for Sort http") +public class SortHttpClusterRequest extends BaseSortClusterRequest { + + public SortHttpClusterRequest() { + this.setType(ClusterType.SORT_HTTP); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java new file mode 100644 index 00000000000..52fa047606f --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeDTO.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.http; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +/** + * Http service data node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Http service data node info") +public class HttpDataNodeDTO { + + @ApiModelProperty("HTTP base url") + private String baseUrl; + + @ApiModelProperty("Whether to enable credential") + private Boolean enableCredential; + + @ApiModelProperty("Max connect count") + private Integer maxConnect; + /** + * Get the dto instance from the request + */ + public static HttpDataNodeDTO getFromRequest(HttpDataNodeRequest request, String extParams) { + HttpDataNodeDTO dto = StringUtils.isNotBlank(extParams) + ? HttpDataNodeDTO.getFromJson(extParams) + : new HttpDataNodeDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static HttpDataNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, HttpDataNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT, + String.format("Failed to parse extParams for Cloud log service node: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java new file mode 100644 index 00000000000..b24224134c3 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.http; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Cloud log service data node info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.HTTP) +@ApiModel("HTTP data node info") +public class HttpDataNodeInfo extends DataNodeInfo { + + @ApiModelProperty("HTTP base url") + private String baseUrl; + + @ApiModelProperty("Whether to enable credential") + private Boolean enableCredential; + + @ApiModelProperty("Max connect count") + private Integer maxConnect; + + public HttpDataNodeInfo() { + setType(DataNodeType.HTTP); + } + + @Override + public DataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, HttpDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java new file mode 100644 index 00000000000..1fba8d9518d --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/http/HttpDataNodeRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.http; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Cloud log service data node request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.HTTP) +@ApiModel("Http service data node request") +public class HttpDataNodeRequest extends DataNodeRequest { + + @ApiModelProperty("HTTP base url") + private String baseUrl; + + @ApiModelProperty("Whether to enable credential") + private Boolean enableCredential; + + @ApiModelProperty("Max connect count") + private Integer maxConnect; + + public HttpDataNodeRequest() { + this.setType(DataNodeType.HTTP); + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java index 09115a9e020..d8df7f4a285 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/BaseStreamSink.java @@ -32,6 +32,9 @@ @ApiModel("Base info of stream sink") public class BaseStreamSink { + @ApiModelProperty("Transform sql") + private String transformSql; + @ApiModelProperty("Start consume time, yyyy-MM-dd HH:mm:ss format") private String startConsumeTime; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java index 8c190a3a848..24c544b943f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java @@ -71,6 +71,9 @@ public abstract class SinkRequest { @Pattern(regexp = "^[a-zA-Z0-9_.-]{1,100}$", message = "sinkName only supports letters, numbers, '.', '-', or '_'") private String sinkName; + @ApiModelProperty("Transform sql") + private String transformSql; + @ApiModelProperty("Sink description") @Length(max = 500, message = "length must be less than or equal to 500") private String description; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java index 7641256de0a..85fd72a1a44 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java @@ -68,6 +68,9 @@ public abstract class StreamSink extends StreamNode { @ApiModelProperty("Sink name, unique in one stream.") private String sinkName; + @ApiModelProperty("Transform sql") + private String transformSql; + @ApiModelProperty("Sink description") private String description; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java new file mode 100644 index 00000000000..fdcd7228fe0 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSink.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.http; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Map; + +/** + * HTTP sink info + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "HTTP sink info") +@JsonTypeDefine(value = SinkType.HTTP) +public class HttpSink extends StreamSink { + + @ApiModelProperty("HTTP path") + private String path; + + @ApiModelProperty("HTTP method, like POST, GET") + private String method; + + @ApiModelProperty("HTTP headers") + private Map headers; + + @ApiModelProperty("Max retry times") + private Integer maxRetryTimes; + + public HttpSink() { + this.setSinkType(SinkType.HTTP); + } + + @Override + public SinkRequest genSinkRequest() { + return CommonBeanUtils.copyProperties(this, HttpSinkRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java new file mode 100644 index 00000000000..5431e07ec71 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkDTO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.http; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +import java.util.Map; + +/** + * Sink info of Cloud log service + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class HttpSinkDTO extends BaseStreamSink { + + @ApiModelProperty("HTTP path") + private String path; + + @ApiModelProperty("HTTP method, like POST, GET") + private String method; + + @ApiModelProperty("HTTP headers") + private Map headers; + + @ApiModelProperty("Max retry times") + private Integer maxRetryTimes; + + /** + * Get the dto instance from the request + */ + public static HttpSinkDTO getFromRequest(HttpSinkRequest request, String extParams) { + HttpSinkDTO dto = StringUtils.isNotBlank(extParams) + ? HttpSinkDTO.getFromJson(extParams) + : new HttpSinkDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + public static HttpSinkDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, HttpSinkDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + String.format("parse extParams of http SinkDTO failure: %s", e.getMessage())); + } + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java new file mode 100644 index 00000000000..207322aa3a9 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/http/HttpSinkRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.http; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Map; + +/** + * Http sink request. + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Http sink request") +@JsonTypeDefine(value = SinkType.HTTP) +public class HttpSinkRequest extends SinkRequest { + + @ApiModelProperty("HTTP path") + private String path; + + @ApiModelProperty("HTTP method, like POST, GET") + private String method; + + @ApiModelProperty("HTTP headers") + private Map headers; + + @ApiModelProperty("Max retry times") + private Integer maxRetryTimes; + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index e5912bd93f8..7caef673dd2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -691,6 +691,7 @@ public Integer saveNode(ClusterNodeRequest request, String operator) { InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(request.getType()); Integer id = instance.saveOpt(request, operator); if (request.getIsInstall()) { + request.setId(id); InlongClusterNodeInstallOperator clusterNodeInstallOperator = clusterNodeInstallOperatorFactory.getInstance( request.getType()); clusterNodeInstallOperator.install(request, operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java index 6ffdbafae08..b328a39b376 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/SortClusterOperator.java @@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import org.apache.inlong.manager.pojo.cluster.sort.cls.SortClsClusterInfo; import org.apache.inlong.manager.pojo.cluster.sort.es.SortEsClusterInfo; +import org.apache.inlong.manager.pojo.cluster.sort.http.SortHttpClusterInfo; import org.apache.inlong.manager.pojo.cluster.sort.kafka.SortKafkaClusterInfo; import org.apache.inlong.manager.pojo.cluster.sort.pulsar.SortPulsarClusterInfo; import org.apache.inlong.manager.pojo.sort.BaseSortClusterDTO; @@ -50,6 +51,7 @@ public class SortClusterOperator extends AbstractClusterOperator { private static final Set SORT_CLUSTER_SET = new HashSet() { { + add(ClusterType.SORT_HTTP); add(ClusterType.SORT_CLS); add(ClusterType.SORT_PULSAR); add(ClusterType.SORT_ES); @@ -84,6 +86,9 @@ public ClusterInfo getFromEntity(InlongClusterEntity entity) { ClusterInfo sortClusterInfo; switch (entity.getType()) { + case ClusterType.SORT_HTTP: + sortClusterInfo = new SortHttpClusterInfo(); + break; case ClusterType.SORT_CLS: sortClusterInfo = new SortClsClusterInfo(); break; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java index 5d190b902c2..77a848dbb65 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ModuleType; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; +import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity; @@ -28,10 +29,12 @@ import org.apache.inlong.manager.dao.entity.PackageConfigEntity; import org.apache.inlong.manager.dao.entity.UserEntity; import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; +import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper; import org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper; import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper; import org.apache.inlong.manager.dao.mapper.UserEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest; +import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest; import org.apache.inlong.manager.service.cmd.CommandExecutor; @@ -42,6 +45,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -49,17 +53,17 @@ @Service public class AgentClusterNodeInstallOperator implements InlongClusterNodeInstallOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(AgentClusterNodeInstallOperator.class); - public static final String INSTALLER_CONF_PATH = "/conf/installer.properties"; public static final String INSTALLER_START_CMD = "/bin/installer.sh start"; + public static final String INSTALLER_RESTART_CMD = "/bin/installer.sh restart"; + public static final String INSTALLER_STOP_CMD = "/bin/installer.sh restart"; public static final String AGENT_MANAGER_AUTH_SECRET_ID = "agent.manager.auth.secretId"; public static final String AGENT_MANAGER_AUTH_SECRET_KEY = "agent.manager.auth.secretKey"; public static final String AGENT_MANAGER_ADDR = "agent.manager.addr"; public static final String AGENT_CLUSTER_NAME = "agent.cluster.name"; public static final String AGENT_CLUSTER_TAG = "agent.cluster.tag"; public static final String AGENT_LOCAL_IP = "agent.local.ip"; - + private static final Logger LOGGER = LoggerFactory.getLogger(AgentClusterNodeInstallOperator.class); @Autowired private InlongClusterEntityMapper clusterEntityMapper; @Autowired @@ -70,6 +74,8 @@ public class AgentClusterNodeInstallOperator implements InlongClusterNodeInstall private PackageConfigEntityMapper packageConfigEntityMapper; @Autowired private UserEntityMapper userEntityMapper; + @Autowired + private InlongClusterNodeEntityMapper clusterNodeEntityMapper; @Value("${agent.install.path:inlong/inlong-installer/}") private String agentInstallPath; @@ -88,44 +94,55 @@ public String getClusterNodeType() { @Override public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) { - LOGGER.info("begin to insert agent inlong cluster node={}", clusterNodeRequest); + LOGGER.info("begin to insert agent cluster node={}", clusterNodeRequest); try { - InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeRequest.getParentId()); AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest; - commandExecutor.mkdir(request, agentInstallPath); - String downLoadUrl = getInstallerDownLoadUrl(request); - String fileName = downLoadUrl.substring(downLoadUrl.lastIndexOf('/') + 1); - commandExecutor.downLoadPackage(request, agentInstallPath, downLoadUrl); - commandExecutor.tarPackage(request, fileName, agentInstallPath); - String confFile = agentInstallPath + INSTALLER_CONF_PATH; - Map configMap = new HashMap<>(); - configMap.put(AGENT_LOCAL_IP, request.getIp()); - configMap.put(AGENT_MANAGER_ADDR, managerUrl); - UserEntity userInfo = userEntityMapper.selectByName(operator); - Preconditions.expectNotNull(userInfo, "User doesn't exist"); - String secretKey = - new String(AESUtils.decryptAsString(userInfo.getSecretKey(), userInfo.getEncryptVersion())); - configMap.put(AGENT_MANAGER_AUTH_SECRET_ID, operator); - configMap.put(AGENT_MANAGER_AUTH_SECRET_KEY, secretKey); - configMap.put(AGENT_CLUSTER_TAG, clusterEntity.getClusterTags()); - configMap.put(AGENT_CLUSTER_NAME, clusterEntity.getName()); - commandExecutor.modifyConfig(request, configMap, confFile); + deployInstaller(request, operator); + } catch (Exception e) { + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), e.getMessage()); + String errMsg = String.format("install agent cluster node failed for ip=%s", clusterNodeRequest.getIp()); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + LOGGER.info("success to install agent cluster node={}", clusterNodeRequest); + return true; + } + + @Override + public boolean reInstall(ClusterNodeRequest clusterNodeRequest, String operator) { + LOGGER.info("begin to reInstall agent cluster node={}", clusterNodeRequest); + try { + AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest; + commandExecutor.rmDir(request, agentInstallPath.substring(0, agentInstallPath.lastIndexOf(File.separator))); + deployInstaller(request, operator); String startCmd = agentInstallPath + INSTALLER_START_CMD; commandExecutor.execRemote(request, startCmd); - } catch (Exception e) { - String errMsg = String.format("install installer failed for ip=%s", clusterNodeRequest.getIp()); + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), e.getMessage()); + String errMsg = String.format("reInstall agent cluster node failed for ip=%s", clusterNodeRequest.getIp()); LOGGER.error(errMsg, e); throw new BusinessException(errMsg); } - LOGGER.info("success to insert agent inlong cluster node={}", clusterNodeRequest); + LOGGER.info("success to re reInstall agent cluster node={}", clusterNodeRequest); return true; } @Override public boolean unload(InlongClusterNodeEntity clusterNodeEntity, String operator) { - // todo Provide agent uninstallation capability - InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeEntity.getParentId()); + try { + AgentClusterNodeRequest request = CommonBeanUtils.copyProperties(clusterNodeEntity, + AgentClusterNodeRequest::new, true); + AgentClusterNodeDTO agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(clusterNodeEntity.getExtParams()); + CommonBeanUtils.copyProperties(agentClusterNodeDTO, request, true); + String stopCmd = agentInstallPath + INSTALLER_STOP_CMD; + commandExecutor.execRemote(request, stopCmd); + commandExecutor.rmDir(request, agentInstallPath.substring(0, agentInstallPath.lastIndexOf(File.separator))); + } catch (Exception e) { + clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(), e.getMessage()); + String errMsg = String.format("unload agent cluster node failed for ip=%s", clusterNodeEntity.getIp()); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } return true; } @@ -147,4 +164,26 @@ private String getInstallerDownLoadUrl(AgentClusterNodeRequest request) { String.format("can't get installer download url for ip=%s, type=%s", request.getIp(), request.getType())); } + + private void deployInstaller(AgentClusterNodeRequest request, String operator) throws Exception { + InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(request.getParentId()); + commandExecutor.mkdir(request, agentInstallPath); + String downLoadUrl = getInstallerDownLoadUrl(request); + String fileName = downLoadUrl.substring(downLoadUrl.lastIndexOf('/') + 1); + commandExecutor.downLoadPackage(request, agentInstallPath, downLoadUrl); + commandExecutor.tarPackage(request, fileName, agentInstallPath); + String confFile = agentInstallPath + INSTALLER_CONF_PATH; + Map configMap = new HashMap<>(); + configMap.put(AGENT_LOCAL_IP, request.getIp()); + configMap.put(AGENT_MANAGER_ADDR, managerUrl); + UserEntity userInfo = userEntityMapper.selectByName(operator); + Preconditions.expectNotNull(userInfo, "User doesn't exist"); + String secretKey = + new String(AESUtils.decryptAsString(userInfo.getSecretKey(), userInfo.getEncryptVersion())); + configMap.put(AGENT_MANAGER_AUTH_SECRET_ID, operator); + configMap.put(AGENT_MANAGER_AUTH_SECRET_KEY, secretKey); + configMap.put(AGENT_CLUSTER_TAG, clusterEntity.getClusterTags()); + configMap.put(AGENT_CLUSTER_NAME, clusterEntity.getName()); + commandExecutor.modifyConfig(request, configMap, confFile); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeInstallOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeInstallOperator.java index 8fcb695bdf3..851d085eee9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeInstallOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeInstallOperator.java @@ -39,6 +39,14 @@ public interface InlongClusterNodeInstallOperator { */ boolean install(ClusterNodeRequest clusterNodeRequest, String operator); + /** + * ReInstalling cluster nodes. + * + * @param clusterNodeRequest cluster request + * @param operator operator + */ + boolean reInstall(ClusterNodeRequest clusterNodeRequest, String operator); + /** * Uninstalling cluster nodes. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java index ef44719d7ed..6a565fb1c5e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutor.java @@ -39,4 +39,7 @@ CommandResult downLoadPackage(AgentClusterNodeRequest clusterNodeRequest, String throws Exception; CommandResult mkdir(AgentClusterNodeRequest clusterNodeRequest, String path) throws Exception; + + CommandResult rmDir(AgentClusterNodeRequest clusterNodeRequest, String path) throws Exception; + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java index 1ce4069fe1f..804d04e4b69 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java @@ -139,4 +139,9 @@ public CommandResult mkdir(AgentClusterNodeRequest clusterNodeRequest, String pa return execRemote(clusterNodeRequest, "mkdir " + path); } + @Override + public CommandResult rmDir(AgentClusterNodeRequest clusterNodeRequest, String path) throws Exception { + return execRemote(clusterNodeRequest, "rm -rf " + path); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java new file mode 100644 index 00000000000..18482d4a040 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/http/HttpDataNodeOperator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node.http; + +import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig; +import org.apache.inlong.common.pojo.sort.node.NodeConfig; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeInfo; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeRequest; +import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class HttpDataNodeOperator extends AbstractDataNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpDataNodeOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { + HttpDataNodeRequest httpDataNodeRequest = (HttpDataNodeRequest) request; + CommonBeanUtils.copyProperties(httpDataNodeRequest, targetEntity, true); + try { + HttpDataNodeDTO dto = HttpDataNodeDTO.getFromRequest(httpDataNodeRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("Failed to build extParams for Cloud log service node: %s", e.getMessage())); + } + } + + @Override + public Boolean accept(String dataNodeType) { + return DataNodeType.HTTP.equals(dataNodeType); + } + + @Override + public String getDataNodeType() { + return DataNodeType.HTTP; + } + + @Override + public DataNodeInfo getFromEntity(DataNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); + } + HttpDataNodeInfo info = new HttpDataNodeInfo(); + CommonBeanUtils.copyProperties(entity, info); + if (StringUtils.isNotBlank(entity.getExtParams())) { + HttpDataNodeDTO dto = HttpDataNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, info); + } + return info; + } + + @Override + public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) { + DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity); + HttpNodeConfig httpNodeConfig = CommonBeanUtils.copyProperties(dataNodeInfo, HttpNodeConfig::new); + HttpDataNodeDTO dto = HttpDataNodeDTO.getFromJson(dataNodeEntity.getExtParams()); + CommonBeanUtils.copyProperties(dto, httpNodeConfig); + httpNodeConfig.setPassword(dataNodeEntity.getToken()); + httpNodeConfig.setNodeName(dataNodeInfo.getName()); + return httpNodeConfig; + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index ea46dc3432d..a0b6a29443b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -115,6 +115,15 @@ public interface StreamSinkService { */ PageResult listByCondition(SinkPageRequest request, String operator); + /** + * Paging query stream sink detail info based on conditions. + * + * @param request paging request + * @param operator operator + * @return sink detail page list + */ + PageResult> listDetail(SinkPageRequest request, String operator); + /** * Paging query stream sink info based on conditions. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 21804003056..9177253d903 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -17,6 +17,8 @@ package org.apache.inlong.manager.service.sink; +import org.apache.inlong.common.constant.Constants; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.OperationTarget; @@ -25,21 +27,31 @@ import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; import org.apache.inlong.manager.dao.entity.InlongStreamEntity; +import org.apache.inlong.manager.dao.entity.SortConfigEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; +import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; +import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; +import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo; import org.apache.inlong.manager.pojo.sink.ParseFieldRequest; import org.apache.inlong.manager.pojo.sink.SinkApproveDTO; import org.apache.inlong.manager.pojo.sink.SinkBriefInfo; @@ -93,6 +105,8 @@ import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; +import static org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION; +import static org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP; /** * Implementation of sink service interface @@ -104,7 +118,10 @@ public class StreamSinkServiceImpl implements StreamSinkService { private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]"); private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3; private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2; - + @Autowired + private SortConfigEntityMapper sortConfigEntityMapper; + @Autowired + private InlongClusterEntityMapper clusterEntityMapper; @Autowired private SinkOperatorFactory operatorFactory; @Autowired @@ -121,7 +138,6 @@ public class StreamSinkServiceImpl implements StreamSinkService { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Autowired private ObjectMapper objectMapper; - // To avoid circular dependencies, you cannot use @Autowired, it will be injected by AutowireCapableBeanFactory private InlongStreamProcessService streamProcessOperation; @@ -297,6 +313,89 @@ public PageResult listByCondition(SinkPageRequest request, return pageResult; } + @Override + public PageResult> listDetail(SinkPageRequest request, String operator) { + PageHelper.startPage(request.getPageNum(), request.getPageSize()); + OrderFieldEnum.checkOrderField(request); + OrderTypeEnum.checkOrderType(request); + Page entityPage = (Page) sinkMapper.selectByCondition(request); + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId()); + InlongGroupInfo groupInfo = null; + switch (groupEntity.getMqType()) { + case MQType.PULSAR: + groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongPulsarInfo::new, true); + break; + case MQType.TUBEMQ: + groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongTubeMQInfo::new, true); + break; + case MQType.KAFKA: + groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongKafkaInfo::new, true); + default: + throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage()); + } + InlongGroupInfo finalGroupInfo = groupInfo; + List> responseList = entityPage.stream().map(sink -> { + StreamSinkOperator sinkOperator = operatorFactory.getInstance(sink.getSinkType()); + StreamSink streamSink = sinkOperator.getFromEntity(sink); + Map requestMap = JsonUtils.OBJECT_MAPPER.convertValue(streamSink, + new TypeReference>() { + }); + InlongStreamEntity streamEntity = + streamMapper.selectByIdentifier(request.getInlongGroupId(), sink.getInlongStreamId()); + String topic = ""; + String consumeGroup = ""; + switch (groupEntity.getMqType()) { + case MQType.PULSAR: + List pulsarClusters = clusterEntityMapper.selectByKey( + finalGroupInfo.getInlongClusterTag(), null, MQType.PULSAR); + InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); + if (CollectionUtils.isEmpty(pulsarClusters)) { + break; + } + String tenant = pulsarDTO.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + InlongClusterEntity pulsarCluster = pulsarClusters.get(0); + // Multiple adminUrls should be configured for pulsar, + // otherwise all requests will be sent to the same broker + PulsarClusterDTO pulsarClusterDTO = PulsarClusterDTO.getFromJson(pulsarCluster.getExtParams()); + tenant = pulsarClusterDTO.getPulsarTenant(); + } + String fullTopicName = + tenant + "/" + finalGroupInfo.getMqResource() + "/" + streamEntity.getMqResource(); + topic = "persistent://" + fullTopicName; + consumeGroup = String.format(PULSAR_SUBSCRIPTION, finalGroupInfo.getInlongClusterTag(), + fullTopicName, sink.getId()); + break; + case MQType.TUBEMQ: + topic = streamEntity.getMqResource(); + consumeGroup = String.format(TUBE_CONSUMER_GROUP, groupEntity.getInlongClusterTag(), topic, + sink.getId()); + break; + case MQType.KAFKA: + topic = streamEntity.getMqResource(); + if (topic.equals(streamEntity.getInlongStreamId())) { + // the default mq resource (stream id) is not sufficient to discriminate different kafka topics + topic = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, + finalGroupInfo.getMqResource(), streamEntity.getMqResource()); + } + break; + default: + throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage()); + } + requestMap.put("topic", topic); + requestMap.put("consumerGroup", consumeGroup); + SortConfigEntity sortConfigEntity = sortConfigEntityMapper.selectBySinkId(sink.getId()); + if (sortConfigEntity != null) { + requestMap.put("dataFlowInfo", sortConfigEntity.getConfigParams()); + } + return requestMap; + }).collect(Collectors.toList()); + PageResult> pageResult = new PageResult<>(responseList, entityPage.getTotal(), + entityPage.getPageNum(), entityPage.getPageSize()); + LOGGER.debug("success to list sink detail page, result size {}", pageResult.getList().size()); + return pageResult; + } + @Override public List listByCondition(SinkPageRequest request, UserInfo opInfo) { // check sink id diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java new file mode 100644 index 00000000000..030c0c787af --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/http/HttpSinkOperator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink.http; + +import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig; +import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; +import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.node.http.HttpDataNodeDTO; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.http.HttpSink; +import org.apache.inlong.manager.pojo.sink.http.HttpSinkDTO; +import org.apache.inlong.manager.pojo.sink.http.HttpSinkRequest; +import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.service.sink.AbstractSinkOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Http sink operator + */ +@Service +public class HttpSinkOperator extends AbstractSinkOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkOperator.class); + + @Autowired + private ObjectMapper objectMapper; + @Autowired + private DataNodeEntityMapper dataNodeEntityMapper; + + @Override + protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) { + if (!this.getSinkType().equals(request.getSinkType())) { + throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT, + ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType()); + } + HttpSinkRequest sinkRequest = (HttpSinkRequest) request; + try { + HttpSinkDTO dto = HttpSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, + String.format("serialize extParams of http SinkDTO failure: %s", e.getMessage())); + } + } + + @Override + protected String getSinkType() { + return SinkType.HTTP; + } + + @Override + public Boolean accept(String sinkType) { + return SinkType.HTTP.equals(sinkType); + } + + @Override + public StreamSink getFromEntity(StreamSinkEntity entity) { + HttpSink sink = new HttpSink(); + if (entity == null) { + return sink; + } + + HttpSinkDTO dto = HttpSinkDTO.getFromJson(entity.getExtParams()); + DataNodeEntity dataNodeEntity = dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(), + DataNodeType.HTTP); + HttpDataNodeDTO httpDataNodeDTO = JsonUtils.parseObject(dataNodeEntity.getExtParams(), + HttpDataNodeDTO.class); + CommonBeanUtils.copyProperties(entity, sink, true); + CommonBeanUtils.copyProperties(dto, sink, true); + CommonBeanUtils.copyProperties(httpDataNodeDTO, sink, true); + List sinkFields = getSinkFields(entity.getId()); + sink.setSinkFieldList(sinkFields); + return sink; + } + + @Override + public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, StreamSink sink) { + HttpSink httpSink = (HttpSink) sink; + HttpSinkConfig sinkConfig = CommonBeanUtils.copyProperties(httpSink, HttpSinkConfig::new); + List fields = sinkFieldMapper.selectBySinkId(sink.getId()).stream().map( + v -> { + FieldConfig fieldConfig = new FieldConfig(); + FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat( + v.getFieldType().toLowerCase()); + fieldConfig.setName(v.getFieldName()); + fieldConfig.setFormatInfo(formatInfo); + return fieldConfig; + }).collect(Collectors.toList()); + sinkConfig.setFieldConfigs(fields); + return sinkConfig; + } +} diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index a1bff1615ce..bfaa28b1cd9 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -138,6 +138,7 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node', `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', + `operate_log` text DEFAULT NULL COMMENT 'The operate log', `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', `status` int(4) DEFAULT '0' COMMENT 'Cluster status', `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index a1388e6368f..8697d3c0e78 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -149,6 +149,7 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node` `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP', `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node', `ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string', + `operate_log` text DEFAULT NULL COMMENT 'The operate log', `description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node', `status` int(4) DEFAULT '0' COMMENT 'Cluster status', `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', diff --git a/inlong-manager/manager-web/sql/changes-1.14.0.sql b/inlong-manager/manager-web/sql/changes-1.14.0.sql new file mode 100644 index 00000000000..c82c699ae66 --- /dev/null +++ b/inlong-manager/manager-web/sql/changes-1.14.0.sql @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This is the SQL change file from version 1.13.0 to the current version 1.14.0. +-- When upgrading to version 1.14.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +USE `apache_inlong_manager`; + +ALTER TABLE `inlong_cluster_node` ADD COLUMN `operate_log` text DEFAULT NULL COMMENT 'The operate log'; + diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index 4fd4eaabb60..b7b8dbd5d40 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -48,6 +48,7 @@ import org.springframework.web.bind.annotation.RestController; import java.util.List; +import java.util.Map; /** * Stream sink control layer @@ -88,6 +89,12 @@ public Response> listByCondition(@RequestBody S return Response.success(sinkService.listByCondition(request, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/sink/listDetail", method = RequestMethod.POST) + @ApiOperation(value = "List stream sinks detail by paginating") + public Response>> listDetail(@RequestBody SinkPageRequest request) { + return Response.success(sinkService.listDetail(request, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/sink/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SINK) @ApiOperation(value = "Update stream sink") diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/build.sh b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/build.sh index f38f535dad4..29759c6c9cc 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/build.sh +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/build.sh @@ -51,6 +51,15 @@ if [ "$(printf '%s\n' "$PYTHON_REQUIRED" "$PYTHON_VERSION" | sort -V | head -n1) exit 1 fi +# Install Python packages from requirements.txt +if [ -f $PY_SDK_DIR/requirements.txt ]; then + echo "Installing Python packages from requirements.txt..." + pip install -r $PY_SDK_DIR/requirements.txt +else + echo "Error: cannot find requirements.txt!" + exit 1 +fi + # Build pybind11(If the pybind11 has been compiled, this step will be skipped) if [ ! -d "$PY_SDK_DIR/pybind11/build" ]; then if [ -d "$PY_SDK_DIR/pybind11" ]; then @@ -58,9 +67,16 @@ if [ ! -d "$PY_SDK_DIR/pybind11/build" ]; then fi git clone https://github.com/pybind/pybind11.git $PY_SDK_DIR/pybind11 mkdir $PY_SDK_DIR/pybind11/build && cd $PY_SDK_DIR/pybind11/build + + # Add a trap command to delete the pybind11 folder if an error occurs + trap 'echo "Error occurred during pybind11 build. Deleting pybind11 folder..."; cd $PY_SDK_DIR; rm -r pybind11; exit 1' ERR + cmake $PY_SDK_DIR/pybind11 cmake --build $PY_SDK_DIR/pybind11/build --config Release --target check make -j 4 + + # Remove the trap command if the build is successful + trap - ERR else echo "Skipped build pybind11" fi diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp index 2fc5eeee83d..07bf79613cc 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/inlong_dataproxy.cpp @@ -63,7 +63,6 @@ PYBIND11_MODULE(inlong_dataproxy, m) { int result = self.InitApi(config_path); return result; }) - .def("add_bid", &inlong::InLongApi::AddBid) .def("send", [](inlong::InLongApi& self, const char* groupId, const char* streamId, const char* msg, int32_t msgLen, py::object pyCallback = py::none()) { if (!pyCallback.is(py::none())) { g_py_callbacks[UserCallBackBridge] = pyCallback.cast(); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/requirements.txt b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/requirements.txt new file mode 100644 index 00000000000..e160152b522 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/requirements.txt @@ -0,0 +1 @@ +pytest>=7.1.2 \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AcosFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AcosFunction.java new file mode 100644 index 00000000000..332716e57b6 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AcosFunction.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * AcosFunction + * description: acos(numeric)--returns the arc cosine of numeric + */ +public class AcosFunction implements ValueParser { + + private ValueParser numberParser; + + /** + * Constructor + * @param expr + */ + public AcosFunction(Function expr) { + numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObj = numberParser.parse(sourceData, rowIndex, context); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return Math.acos(numberValue.doubleValue()); + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/BinFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/BinFunction.java new file mode 100644 index 00000000000..ae4dbc96e72 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/BinFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.util.List; +/** + * BinFunction + * description: bin(integer)--Returns a string representation of an integer in binary format. If the integer is NULL, NULL is returned. + */ +public class BinFunction implements ValueParser { + + private ValueParser valueParser; + + public BinFunction(Function expr) { + if (expr.getParameters() != null) { + List expressions = expr.getParameters().getExpressions(); + if (expressions != null && expressions.size() == 1) { + valueParser = OperatorTools.buildParser(expressions.get(0)); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + if (valueParser != null) { + Object valueObj = valueParser.parse(sourceData, rowIndex, context); + BigDecimal value = OperatorTools.parseBigDecimal(valueObj); + return Integer.toBinaryString(value.intValue()); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LengthFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LengthFunction.java new file mode 100644 index 00000000000..5a01652c930 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LengthFunction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +/** + * LengthFunction + * description: length(string) + * - return the length of the string + * - return NULL if the string is NULL + */ +public class LengthFunction implements ValueParser { + + private final ValueParser stringParser; + + public LengthFunction(Function expr) { + stringParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObject = stringParser.parse(sourceData, rowIndex, context); + if (stringObject == null) { + return null; + } + return OperatorTools.parseString(stringObject).length(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ModuloFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ModuloFunction.java new file mode 100644 index 00000000000..b6b8b3c0af1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ModuloFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; +import java.util.List; + +/** + * ModuloFunction + * description: MOD(NUMERIC1, NUMERIC2) : Return the remainder of numeric1 divided by numeric2. + */ +public class ModuloFunction implements ValueParser { + + private ValueParser dividendParser; + private ValueParser divisorParser; + + public ModuloFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + dividendParser = OperatorTools.buildParser(expressions.get(0)); + divisorParser = OperatorTools.buildParser(expressions.get(1)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object dividendObj = dividendParser.parse(sourceData, rowIndex, context); + Object divisorObj = divisorParser.parse(sourceData, rowIndex, context); + BigDecimal dividend = OperatorTools.parseBigDecimal(dividendObj); + BigDecimal divisor = OperatorTools.parseBigDecimal(divisorObj); + return dividend.remainder(divisor); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReplaceFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReplaceFunction.java new file mode 100644 index 00000000000..d9d1d26a210 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReplaceFunction.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.List; + +/** + * ReplaceFunction + * description: replace(s, s1, s2)--replace string s1 in string s with string s2. + */ +public class ReplaceFunction implements ValueParser { + + private ValueParser stringParser; + private ValueParser targetParser; + private ValueParser replacementParser; + + public ReplaceFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + stringParser = OperatorTools.buildParser(expressions.get(0)); + targetParser = OperatorTools.buildParser(expressions.get(1)); + replacementParser = OperatorTools.buildParser(expressions.get(2)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object strObj = stringParser.parse(sourceData, rowIndex, context); + Object targetObj = targetParser.parse(sourceData, rowIndex, context); + Object replacementObj = replacementParser.parse(sourceData, rowIndex, context); + String str = OperatorTools.parseString(strObj); + String target = OperatorTools.parseString(targetObj); + String replacement = OperatorTools.parseString(replacementObj); + return str.replace(target, replacement); + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReverseFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReverseFunction.java new file mode 100644 index 00000000000..5015e0225a9 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ReverseFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; +/** + * ReverseFunction + * description: reverse(string)--returns the string with the order of the characters reversed. + * returns NULL if string is a empty string. + */ +public class ReverseFunction implements ValueParser { + + private ValueParser stringParser; + public ReverseFunction(Function expr) { + stringParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + /** + * parse + * Parse and reverse string by reverse() in StringBuilder. + * @param sourceData + * @param rowIndex + * @param context + * @return + */ + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object stringObj = stringParser.parse(sourceData, rowIndex, context); + if (stringObj == null) { + return null; + } + String str = OperatorTools.parseString(stringObj); + return new StringBuilder(str).reverse().toString(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 17c486a76a2..c00d10b00f1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -18,6 +18,8 @@ package org.apache.inlong.sdk.transform.process.operator; import org.apache.inlong.sdk.transform.process.function.AbsFunction; +import org.apache.inlong.sdk.transform.process.function.AcosFunction; +import org.apache.inlong.sdk.transform.process.function.BinFunction; import org.apache.inlong.sdk.transform.process.function.CeilFunction; import org.apache.inlong.sdk.transform.process.function.ConcatFunction; import org.apache.inlong.sdk.transform.process.function.CosFunction; @@ -27,14 +29,18 @@ import org.apache.inlong.sdk.transform.process.function.ExpFunction; import org.apache.inlong.sdk.transform.process.function.FloorFunction; import org.apache.inlong.sdk.transform.process.function.FromUnixTimeFunction; +import org.apache.inlong.sdk.transform.process.function.LengthFunction; import org.apache.inlong.sdk.transform.process.function.LnFunction; import org.apache.inlong.sdk.transform.process.function.LocateFunction; import org.apache.inlong.sdk.transform.process.function.Log10Function; import org.apache.inlong.sdk.transform.process.function.Log2Function; import org.apache.inlong.sdk.transform.process.function.LogFunction; +import org.apache.inlong.sdk.transform.process.function.ModuloFunction; import org.apache.inlong.sdk.transform.process.function.NowFunction; import org.apache.inlong.sdk.transform.process.function.PowerFunction; +import org.apache.inlong.sdk.transform.process.function.ReplaceFunction; import org.apache.inlong.sdk.transform.process.function.ReplicateFunction; +import org.apache.inlong.sdk.transform.process.function.ReverseFunction; import org.apache.inlong.sdk.transform.process.function.RoundFunction; import org.apache.inlong.sdk.transform.process.function.SignFunction; import org.apache.inlong.sdk.transform.process.function.SinFunction; @@ -52,24 +58,30 @@ import org.apache.inlong.sdk.transform.process.parser.ColumnParser; import org.apache.inlong.sdk.transform.process.parser.DateParser; import org.apache.inlong.sdk.transform.process.parser.DivisionParser; +import org.apache.inlong.sdk.transform.process.parser.DoubleParser; import org.apache.inlong.sdk.transform.process.parser.LongParser; +import org.apache.inlong.sdk.transform.process.parser.ModuloParser; import org.apache.inlong.sdk.transform.process.parser.MultiplicationParser; import org.apache.inlong.sdk.transform.process.parser.ParenthesisParser; +import org.apache.inlong.sdk.transform.process.parser.SignParser; import org.apache.inlong.sdk.transform.process.parser.StringParser; import org.apache.inlong.sdk.transform.process.parser.SubtractionParser; import org.apache.inlong.sdk.transform.process.parser.TimestampParser; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.DateValue; +import net.sf.jsqlparser.expression.DoubleValue; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; import net.sf.jsqlparser.expression.LongValue; import net.sf.jsqlparser.expression.NotExpression; import net.sf.jsqlparser.expression.Parenthesis; +import net.sf.jsqlparser.expression.SignedExpression; import net.sf.jsqlparser.expression.StringValue; import net.sf.jsqlparser.expression.TimestampValue; import net.sf.jsqlparser.expression.operators.arithmetic.Addition; import net.sf.jsqlparser.expression.operators.arithmetic.Division; +import net.sf.jsqlparser.expression.operators.arithmetic.Modulo; import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication; import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; import net.sf.jsqlparser.expression.operators.conditional.AndExpression; @@ -91,7 +103,7 @@ /** * OperatorTools - * + * */ public class OperatorTools { @@ -112,6 +124,7 @@ public class OperatorTools { functionMap.put("log2", Log2Function::new); functionMap.put("log", LogFunction::new); functionMap.put("exp", ExpFunction::new); + functionMap.put("reverse", ReverseFunction::new); functionMap.put("substring", SubstringFunction::new); functionMap.put("trim", TrimFunction::new); functionMap.put("replicate", ReplicateFunction::new); @@ -123,7 +136,9 @@ public class OperatorTools { functionMap.put("sin", SinFunction::new); functionMap.put("sinh", SinhFunction::new); functionMap.put("cos", CosFunction::new); + functionMap.put("acos", AcosFunction::new); functionMap.put("tan", TanFunction::new); + functionMap.put("bin", BinFunction::new); functionMap.put("year", func -> new DateExtractFunction(DateExtractFunctionType.YEAR, func)); functionMap.put("quarter", func -> new DateExtractFunction(DateExtractFunctionType.QUARTER, func)); functionMap.put("month", func -> new DateExtractFunction(DateExtractFunctionType.MONTH, func)); @@ -142,8 +157,11 @@ public class OperatorTools { functionMap.put("from_unixtime", FromUnixTimeFunction::new); functionMap.put("unix_timestamp", UnixTimestampFunction::new); functionMap.put("to_timestamp", ToTimestampFunction::new); + functionMap.put("mod", ModuloFunction::new); functionMap.put("to_base64", ToBase64Function::new); functionMap.put("sign", SignFunction::new); + functionMap.put("length", LengthFunction::new); + functionMap.put("replace", ReplaceFunction::new); } public static ExpressionOperator buildOperator(Expression expr) { @@ -178,6 +196,10 @@ public static ValueParser buildParser(Expression expr) { return new StringParser((StringValue) expr); } else if (expr instanceof LongValue) { return new LongParser((LongValue) expr); + } else if (expr instanceof DoubleValue) { + return new DoubleParser((DoubleValue) expr); + } else if (expr instanceof SignedExpression) { + return new SignParser((SignedExpression) expr); } else if (expr instanceof Parenthesis) { return new ParenthesisParser((Parenthesis) expr); } else if (expr instanceof Addition) { @@ -188,6 +210,8 @@ public static ValueParser buildParser(Expression expr) { return new MultiplicationParser((Multiplication) expr); } else if (expr instanceof Division) { return new DivisionParser((Division) expr); + } else if (expr instanceof Modulo) { + return new ModuloParser((Modulo) expr); } else if (expr instanceof DateValue) { return new DateParser((DateValue) expr); } else if (expr instanceof TimestampValue) { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java new file mode 100644 index 00000000000..2f50b5311b2 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; + +import net.sf.jsqlparser.expression.DoubleValue; + +/** + * LongParser + */ +public class DoubleParser implements ValueParser { + + private final Double value; + + public DoubleParser(DoubleValue expr) { + this.value = expr.getValue(); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + return value; + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ModuloParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ModuloParser.java new file mode 100644 index 00000000000..ca0a8411709 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ModuloParser.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.operators.arithmetic.Modulo; + +import java.math.BigDecimal; + +/** + * ModuloParser + * description: analyze the % expression + */ +public class ModuloParser implements ValueParser { + + private ValueParser left; + + private ValueParser right; + + public ModuloParser(Modulo expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object leftObj = this.left.parse(sourceData, rowIndex, context); + Object rightObj = this.right.parse(sourceData, rowIndex, context); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.remainder(rightValue); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java new file mode 100644 index 00000000000..360fc0d0f6e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SignParser.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.SignedExpression; + +import java.math.BigDecimal; + +/** + * SignParser + * + */ +public class SignParser implements ValueParser { + + private final Integer sign; + private final ValueParser number; + + public SignParser(SignedExpression expr) { + sign = expr.getSign() == '-' ? -1 : 1; + number = OperatorTools.buildParser(expr.getExpression()); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object numberObject = number.parse(sourceData, rowIndex, context); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObject); + return numberValue.multiply(new BigDecimal(sign)); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java index e2acd597834..6e2d0808474 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -75,6 +75,152 @@ public void testSignFunction() throws Exception { Assert.assertEquals(1, output3.size()); Assert.assertEquals(output3.get(0), "result=0"); } + + @Test + public void testModuloFunction() throws Exception { + String transformFunctionSql = "select mod(numeric1,100) from source"; + String transformExpressionSql = "select numeric1 % 100 from source"; + List output1, output2; + String data; + TransformConfig functionConfig = new TransformConfig(transformFunctionSql); + TransformProcessor functionProcessor = TransformProcessor + .create(functionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + TransformConfig expressionConfig = new TransformConfig(transformExpressionSql); + TransformProcessor expressionProcessor = TransformProcessor + .create(expressionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: "mod(3.1415926,100)" and "3.1415926 % 100" + data = "3.1415926|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=3.1415926", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=3.1415926", output2.get(0)); + + // case2: "mod(-3.1415926,100)" and "-3.1415926 % 100" + data = "-3.1415926|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-3.1415926", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-3.1415926", output2.get(0)); + + // case3: "mod(320,100)" and "320 % 100" + data = "320|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=20", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=20", output2.get(0)); + + // case4: "mod(-320,100)" and "-320 % 100" + data = "-320|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-20", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-20", output2.get(0)); + + transformFunctionSql = "select mod(numeric1,-10) from source"; + transformExpressionSql = "select numeric1 % -10 from source"; + functionConfig = new TransformConfig(transformFunctionSql); + functionProcessor = TransformProcessor + .create(functionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + expressionConfig = new TransformConfig(transformExpressionSql); + expressionProcessor = TransformProcessor + .create(expressionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: "mod(9,-10)" and "9 % -10" + data = "9|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=9", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=9", output2.get(0)); + + // case6: "mod(-13,-10)" and "-13 % -10" + data = "-13|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-3", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-3", output2.get(0)); + + // case7: "mod(-13.14,-10)" and "-13.14 % -10" + data = "-13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-3.14", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-3.14", output2.get(0)); + + // case8: "mod(13.14,-10)" and "13.14 % -10" + data = "13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=3.14", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=3.14", output2.get(0)); + + transformFunctionSql = "select mod(numeric1,-3.14) from source"; + transformExpressionSql = "select numeric1 % -3.14 from source"; + functionConfig = new TransformConfig(transformFunctionSql); + functionProcessor = TransformProcessor + .create(functionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + expressionConfig = new TransformConfig(transformExpressionSql); + expressionProcessor = TransformProcessor + .create(expressionConfig, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case9: "mod(9,-3.14)" and "9 % -3.14" + data = "9|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=2.72", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=2.72", output2.get(0)); + + // case10: "mod(-9,-3.14)" and "-9 % -3.14" + data = "-9|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-2.72", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-2.72", output2.get(0)); + + // case11: "mod(-13.14,-3.14)" and "-13.14 % -3.14" + data = "-13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=-0.58", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=-0.58", output2.get(0)); + + // case12: "mod(13.14,-3.14)" and "13.14 % -3.14" + data = "13.14|4a|4|8"; + output1 = functionProcessor.transform(data); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=0.58", output1.get(0)); + output2 = expressionProcessor.transform(data); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals("result=0.58", output2.get(0)); + } @Test public void testRoundFunction() throws Exception { @@ -347,6 +493,27 @@ public void testCosFunction() throws Exception { Assert.assertEquals(output1.get(0), "result=1.0"); } + @Test + public void testAcosFunction() throws Exception { + String transformSql = "select acos(numeric1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: acos(1) + List output1 = processor.transform("1|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=0.0"); + // case2: acos(0) + List output2 = processor.transform("0|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=1.5707963267948966"); + // case3: acos(-1) + List output3 = processor.transform("-1|4|6|8", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=3.141592653589793"); + } + @Test public void testTanFunction() throws Exception { String transformSql = "select tan(numeric1) from source"; @@ -368,4 +535,26 @@ public void testTanFunction() throws Exception { Assert.assertEquals(output3.get(0), "result=-2.185039863261519"); } + @Test + public void testBinFunction() throws Exception { + String transformSql1 = "select bin(numeric1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case: bin(4) + List output1 = processor1.transform("4|5|6|8", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=100"); + String transformSql2 = "select bin() from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case: bin() + List output2 = processor2.transform("1|2|3|4", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output2.get(0), "result=null"); + } + } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java index 5479b4485aa..b489a892184 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformStringFunctionsProcessor.java @@ -41,6 +41,7 @@ public class TestTransformStringFunctionsProcessor { private static final List dstFields = new ArrayList<>(); private static final CsvSourceInfo csvSource; private static final KvSinkInfo kvSink; + static { for (int i = 1; i < 4; i++) { FieldInfo field = new FieldInfo(); @@ -118,6 +119,7 @@ public void testLocateFunction() throws Exception { Assert.assertEquals(1, output5.size()); Assert.assertEquals(output5.get(0), "result=null"); } + @Test public void testReplicateFunction() throws Exception { String transformSql1 = "select replicate(string1, numeric1) from source"; @@ -184,6 +186,36 @@ public void testTrimFunction() throws Exception { Assert.assertEquals(output3.get(0), "result=in long"); } + @Test + public void testReverseFunction() throws Exception { + String transformSql1 = "select reverse(string1) from source"; + TransformConfig config1 = new TransformConfig(transformSql1); + TransformProcessor processor1 = TransformProcessor + .create(config1, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: reverse('apple') + List output1 = processor1.transform("apple|banana|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=elppa"); + // case2: reverse('ban ana ') + String transformSql2 = "select reverse(string2) from source"; + TransformConfig config2 = new TransformConfig(transformSql2); + TransformProcessor processor2 = TransformProcessor + .create(config2, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + List output2 = processor2.transform("apple|ban ana |cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result= ana nab"); + // case3: reverse(12345) + List output3 = processor1.transform("12345|banana|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=54321"); + // case4: reverse(null) + List output4 = processor1.transform("|banana|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result="); + } + @Test public void testToBase64Function() throws Exception { String transformSql = "select to_base64(string1) from source"; @@ -200,4 +232,63 @@ public void testToBase64Function() throws Exception { Assert.assertEquals(1, output2.size()); Assert.assertEquals(output2.get(0), "result=aGVsbG8gd29ybGQ="); } + + @Test + public void testLengthFunction() throws Exception { + String transformSql = "select length(string1) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: length('hello world') + List output1 = processor1.transform("hello world|apple|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=11", output1.get(0)); + + transformSql = "select length(xxd) from source"; + config = new TransformConfig(transformSql); + processor1 = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case2: length(null) + output1 = processor1.transform("hello world|apple|cloud|2|1|3", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals("result=null", output1.get(0)); + } + @Test + public void testReplaceFunction() throws Exception { + String transformSql = "select replace(string1, string2, string3) from source"; + TransformConfig config = new TransformConfig(transformSql); + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + // case1: replace('hooray', 'oray', 'lly') + List output1 = processor.transform("hooray|oray|lly", new HashMap<>()); + Assert.assertEquals(1, output1.size()); + Assert.assertEquals(output1.get(0), "result=holly"); + // case2: replace('hooray', 'hook', 'hoor') + List output2 = processor.transform("hooray|hook|hoor", new HashMap<>()); + Assert.assertEquals(1, output2.size()); + Assert.assertEquals(output2.get(0), "result=hooray"); + // case3: replace('Hello World', 'World', '') + List output3 = processor.transform("Hello World|World|", new HashMap<>()); + Assert.assertEquals(1, output3.size()); + Assert.assertEquals(output3.get(0), "result=Hello "); + // case4: replace('Hello World', '', 'J') + List output4 = processor.transform("Hello World||J", new HashMap<>()); + Assert.assertEquals(1, output4.size()); + Assert.assertEquals(output4.get(0), "result=JHJeJlJlJoJ JWJoJrJlJdJ"); + // case5: replace('', '', '') + List output5 = processor.transform("||", new HashMap<>()); + Assert.assertEquals(1, output5.size()); + Assert.assertEquals(output5.get(0), "result="); + // case6: replace('abababab', 'ab', 'cd') + List output6 = processor.transform("abababab|ab|cd", new HashMap<>()); + Assert.assertEquals(1, output6.size()); + Assert.assertEquals(output6.get(0), "result=cdcdcdcd"); + // case7: replace('aaa', 'aa', 'd') + List output7 = processor.transform("aaa|aa|d", new HashMap<>()); + Assert.assertEquals(1, output7.size()); + Assert.assertEquals(output7.get(0), "result=da"); + } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java new file mode 100644 index 00000000000..43f1cd97615 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/DefaultEvent2HttpRequestHandler.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.apache.inlong.sort.standalone.utils.UnescapeHelper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.slf4j.Logger; + +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DefaultEvent2HttpRequestHandler implements IEvent2HttpRequestHandler { + + public static final Logger LOG = InlongLoggerFactory.getLogger(DefaultEvent2HttpRequestHandler.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); + public static final String HTTP_PREFIX = "http://"; + public static final String HTTPS_PREFIX = "https://"; + public static final String INLONG_GROUP_ID_HEADER = "inlongGroupId"; + public static final String INLONG_STREAM_ID_HEADER = "inlongStreamId"; + + @Override + public HttpRequest parse(HttpSinkContext context, ProfileEvent event) + throws URISyntaxException, JsonProcessingException { + String uid = event.getUid(); + HttpIdConfig idConfig = context.getIdConfig(uid); + if (idConfig == null) { + context.addSendResultMetric(event, context.getTaskName(), false, System.currentTimeMillis()); + return null; + } + // get the delimiter + String delimiter = idConfig.getSeparator(); + char cDelimiter = delimiter.charAt(0); + // for tab separator + byte[] bodyBytes = event.getBody(); + String strContext = new String(bodyBytes, idConfig.getSourceCharset()); + // unescape + List columnValues = UnescapeHelper.toFiledList(strContext, cDelimiter); + int valueLength = columnValues.size(); + List fieldList = idConfig.getFieldList(); + int columnLength = fieldList.size(); + // get field value + Map fieldMap = new HashMap<>(); + for (int i = 0; i < columnLength; ++i) { + String fieldName = fieldList.get(i); + String fieldValue = i < valueLength ? columnValues.get(i) : ""; + fieldMap.put(fieldName, fieldValue); + } + + // build + String uriString = context.getBaseUrl() + idConfig.getPath(); + if (!uriString.startsWith(HTTP_PREFIX) && !uriString.startsWith(HTTPS_PREFIX)) { + uriString = HTTP_PREFIX + uriString; + } + URI uri = new URI(uriString); + String jsonData; + HttpUriRequest request; + String requestMethod = idConfig.getMethod().toUpperCase(); + switch (requestMethod) { + case "GET": + String params = fieldMap.entrySet().stream() + .map(entry -> { + try { + return entry.getKey() + "=" + + URLEncoder.encode(entry.getValue(), idConfig.getSinkCharset().name()); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.joining("&")); + request = new HttpGet(uri + "?" + params); + for (Map.Entry entry : idConfig.getHeaders().entrySet()) { + request.setHeader(entry.getKey(), entry.getValue()); + } + request.setHeader(INLONG_GROUP_ID_HEADER, idConfig.getInlongGroupId()); + request.setHeader(INLONG_STREAM_ID_HEADER, idConfig.getInlongStreamId()); + break; + case "POST": + request = new HttpPost(uri); + for (Map.Entry entry : idConfig.getHeaders().entrySet()) { + request.setHeader(entry.getKey(), entry.getValue()); + } + request.setHeader(INLONG_GROUP_ID_HEADER, idConfig.getInlongGroupId()); + request.setHeader(INLONG_STREAM_ID_HEADER, idConfig.getInlongStreamId()); + jsonData = objectMapper.writeValueAsString(fieldMap); + setEntity((HttpEntityEnclosingRequestBase) request, jsonData); + break; + case "PUT": + request = new HttpPut(uri); + for (Map.Entry entry : idConfig.getHeaders().entrySet()) { + request.setHeader(entry.getKey(), entry.getValue()); + } + request.setHeader(INLONG_GROUP_ID_HEADER, idConfig.getInlongGroupId()); + request.setHeader(INLONG_STREAM_ID_HEADER, idConfig.getInlongStreamId()); + jsonData = objectMapper.writeValueAsString(fieldMap); + setEntity((HttpEntityEnclosingRequestBase) request, jsonData); + break; + default: + LOG.error("Unsupported request method: {}", requestMethod); + return null; + } + return new HttpRequest(request, event, idConfig.getMaxRetryTimes()); + } + + private static void setEntity(HttpEntityEnclosingRequestBase request, String jsonData) { + StringEntity requestEntity = new StringEntity(jsonData, ContentType.APPLICATION_JSON); + request.setEntity(requestEntity); + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java new file mode 100644 index 00000000000..59ef12f5e0a --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpCallback.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; + +import org.apache.http.HttpResponse; +import org.apache.http.concurrent.FutureCallback; +import org.slf4j.Logger; + +public class HttpCallback implements FutureCallback { + + public static final Logger LOG = InlongLoggerFactory.getLogger(HttpCallback.class); + + private HttpSinkContext context; + private HttpRequest requestItem; + + public HttpCallback(HttpSinkContext context, HttpRequest requestItem) { + this.context = context; + this.requestItem = requestItem; + } + + @Override + public void completed(HttpResponse httpResponse) { + int statusCode = httpResponse.getStatusLine().getStatusCode(); + ProfileEvent event = requestItem.getEvent(); + long sendTime = requestItem.getSendTime(); + + // is fail + if (statusCode != 200) { + handleFailedRequest(event, sendTime); + } else { + context.addSendResultMetric(event, context.getTaskName(), true, sendTime); + context.releaseDispatchQueue(requestItem); + event.ack(); + } + } + + @Override + public void failed(Exception e) { + LOG.error("Http request failed,errorMsg:{}", e.getMessage(), e); + ProfileEvent event = requestItem.getEvent(); + long sendTime = requestItem.getSendTime(); + handleFailedRequest(event, sendTime); + } + + @Override + public void cancelled() { + LOG.info("Request cancelled"); + } + + private void handleFailedRequest(ProfileEvent event, long sendTime) { + int remainRetryTimes = requestItem.getRemainRetryTimes(); + context.addSendResultMetric(event, context.getTaskName(), false, sendTime); + // if reach the max retry times, release the request + if (remainRetryTimes == 1) { + context.releaseDispatchQueue(requestItem); + return; + } else if (remainRetryTimes > 1) { + requestItem.setRemainRetryTimes(remainRetryTimes - 1); + } + context.backDispatchQueue(requestItem); + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java new file mode 100644 index 00000000000..673051cdab6 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpChannelWorker.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.sort.standalone.channel.ProfileEvent; + +import org.apache.flume.Channel; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.lifecycle.LifecycleState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpChannelWorker extends Thread { + + public static final Logger LOG = LoggerFactory.getLogger(HttpChannelWorker.class); + + private final HttpSinkContext context; + private final int workerIndex; + + private LifecycleState status; + private IEvent2HttpRequestHandler handler; + + public HttpChannelWorker(HttpSinkContext context, int workerIndex) { + this.context = context; + this.workerIndex = workerIndex; + this.status = LifecycleState.IDLE; + this.handler = context.createHttpRequestHandler(); + } + + @Override + public void run() { + status = LifecycleState.START; + LOG.info("Starting HttpChannelWorker:{},status:{},index:{}", context.getTaskName(), status, workerIndex); + while (status == LifecycleState.START) { + try { + this.doRun(); + } catch (Throwable t) { + LOG.error("Error occurred while starting HttpChannelWorker:{},status:{},index:{}", + context.getTaskName(), status, workerIndex, t); + } + } + } + + public void doRun() { + Channel channel = context.getChannel(); + Transaction tx = channel.getTransaction(); + tx.begin(); + try { + Event event = channel.take(); + if (event == null) { + tx.commit(); + Thread.sleep(context.getProcessInterval()); + return; + } + if (!(event instanceof ProfileEvent)) { + tx.commit(); + this.context.addSendFailMetric(); + Thread.sleep(context.getProcessInterval()); + return; + } + // to profileEvent + ProfileEvent profileEvent = (ProfileEvent) event; + HttpRequest httpRequest = handler.parse(context, profileEvent); + // offer queue + if (httpRequest != null) { + context.offerDispatchQueue(httpRequest); + } else { + context.addSendFailMetric(); + profileEvent.ack(); + } + tx.commit(); + } catch (Throwable t) { + LOG.error("Process event failed!{}", this.getName(), t); + try { + tx.rollback(); + } catch (Throwable e) { + LOG.error("Channel take transaction rollback exception:{}", getName(), e); + } + } finally { + tx.close(); + } + } + + public void close() { + this.status = LifecycleState.STOP; + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java new file mode 100644 index 00000000000..602ef1a7938 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpIdConfig.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; +import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig; +import org.apache.inlong.common.pojo.sort.dataflow.sink.HttpSinkConfig; +import org.apache.inlong.sort.standalone.config.pojo.IdConfig; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@EqualsAndHashCode(callSuper = true) +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder +@Slf4j +public class HttpIdConfig extends IdConfig { + + private String path; + private String method; + private Map headers; + private Integer maxRetryTimes; + private String separator = "|"; + private List fieldList; + private Charset sourceCharset; + private Charset sinkCharset; + + public static HttpIdConfig create(DataFlowConfig dataFlowConfig) { + HttpSinkConfig sinkConfig = (HttpSinkConfig) dataFlowConfig.getSinkConfig(); + List fields = sinkConfig.getFieldConfigs() + .stream() + .map(FieldConfig::getName) + .collect(Collectors.toList()); + Charset sourceCharset, sinkCharset; + try { + sinkCharset = Charset.forName(sinkConfig.getEncodingType()); + } catch (Throwable t) { + log.warn("do not support field encoding type={}, dataflow id={}", + sinkConfig.getEncodingType(), dataFlowConfig.getDataflowId()); + sinkCharset = Charset.defaultCharset(); + } + try { + sourceCharset = Charset.forName(dataFlowConfig.getSourceConfig().getEncodingType()); + } catch (Throwable t) { + log.warn("do not support context encoding type={}, dataflow id={}", + dataFlowConfig.getSourceConfig().getEncodingType(), dataFlowConfig.getDataflowId()); + sourceCharset = Charset.defaultCharset(); + } + return HttpIdConfig.builder() + .inlongGroupId(dataFlowConfig.getInlongGroupId()) + .inlongStreamId(dataFlowConfig.getInlongStreamId()) + .path(sinkConfig.getPath()) + .method(sinkConfig.getMethod()) + .headers(sinkConfig.getHeaders()) + .maxRetryTimes(sinkConfig.getMaxRetryTimes()) + .separator("|") + .fieldList(fields) + .sinkCharset(sinkCharset) + .sourceCharset(sourceCharset) + .build(); + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java new file mode 100644 index 00000000000..db7e8da9236 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpOutputChannel.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; + +import org.apache.flume.lifecycle.LifecycleState; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.slf4j.Logger; + +import java.io.IOException; + +public class HttpOutputChannel extends Thread { + + public static final Logger LOG = InlongLoggerFactory.getLogger(HttpOutputChannel.class); + + private LifecycleState status; + private HttpSinkContext context; + private CloseableHttpAsyncClient httpClient; + + public HttpOutputChannel(HttpSinkContext context) { + super(context.getTaskName()); + this.context = context; + this.status = LifecycleState.IDLE; + } + + public void init() { + initHttpClient(); + } + + private boolean initHttpClient() { + try { + if (httpClient == null) { + String userName = context.getUsername(); + String password = context.getPassword(); + LOG.info("initHttpAsyncClient:url:{}", context.getBaseUrl()); + + HttpAsyncClientBuilder builder = HttpAsyncClients.custom(); + final CredentialsProvider provider = new BasicCredentialsProvider(); + if (context.getEnableCredential()) { + provider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(userName, password)); + builder.setDefaultCredentialsProvider(provider); + } + + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(context.getConnectionRequestTimeout()) + .setSocketTimeout(context.getSocketTimeout()) + .setMaxRedirects(context.getMaxRedirects()) + .setConnectTimeout(120 * 1000) + .build(); + + builder.setDefaultRequestConfig(requestConfig) + .setMaxConnTotal(context.getMaxConnect()) + .setMaxConnPerRoute(context.getMaxConnectPerRoute()); + + httpClient = HttpSinkFactory.createHttpAsyncClient(builder); + httpClient.start(); + } + } catch (Exception e) { + LOG.error("init httpclient failed.", e); + httpClient = null; + return false; + } + return true; + } + + public void close() { + status = LifecycleState.STOP; + try { + httpClient.close(); + } catch (IOException e) { + LOG.error(String.format("close HttpClient:%s", e.getMessage()), e); + } + } + + @Override + public void run() { + status = LifecycleState.START; + LOG.info("Starting HttpOutputChannel:{},status:{}", context.getTaskName(), status); + while (status == LifecycleState.START) { + try { + send(); + } catch (Throwable t) { + LOG.error("Error occurred while starting HttpOutputChannel:{},status:{}", context.getTaskName(), status, + t); + } + } + } + + public void send() throws InterruptedException { + HttpRequest httpRequest = null; + try { + // get httpRequest + httpRequest = context.takeDispatchQueue(); + if (httpRequest == null) { + Thread.sleep(context.getProcessInterval()); + return; + } + // get id config + String uid = httpRequest.getEvent().getUid(); + if (context.getIdConfig(uid) == null) { + context.addSendResultMetric(httpRequest.getEvent(), context.getTaskName(), false, + httpRequest.getSendTime()); + return; + } + // send + httpClient.execute(httpRequest.getRequest(), new HttpCallback(context, httpRequest)); + context.addSendMetric(httpRequest.getEvent(), context.getTaskName()); + } catch (Throwable e) { + LOG.error("Failed to send HttpRequest '{}': {}", httpRequest, e.getMessage(), e); + if (httpRequest != null) { + context.backDispatchQueue(httpRequest); + context.addSendResultMetric(httpRequest.getEvent(), context.getTaskName(), false, + httpRequest.getSendTime()); + } + try { + Thread.sleep(context.getProcessInterval()); + } catch (InterruptedException e1) { + LOG.error("Thread interrupted while sleeping, error: {}", e1.getMessage(), e1); + } + } + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java new file mode 100644 index 00000000000..d1731a61b6b --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpRequest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.sort.standalone.channel.ProfileEvent; + +import org.apache.http.client.methods.HttpUriRequest; + +public class HttpRequest { + + private final HttpUriRequest request; + private final ProfileEvent event; + private final long sendTime; + private int remainRetryTimes; + + public HttpRequest(HttpUriRequest request, ProfileEvent event, int remainRetryTimes) { + this.request = request; + this.event = event; + this.sendTime = System.currentTimeMillis(); + this.remainRetryTimes = remainRetryTimes; + } + + public HttpUriRequest getRequest() { + return request; + } + + public ProfileEvent getEvent() { + return event; + } + + public long getSendTime() { + return sendTime; + } + + public int getRemainRetryTimes() { + return remainRetryTimes; + } + + public void setRemainRetryTimes(int remainRetryTimes) { + this.remainRetryTimes = remainRetryTimes; + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java new file mode 100644 index 00000000000..9da6fa0bffd --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSink.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.sort.standalone.sink.SinkContext; +import org.apache.inlong.sort.standalone.utils.BufferQueue; + +import org.apache.flume.Context; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class HttpSink extends AbstractSink implements Configurable { + + public static final Logger LOG = LoggerFactory.getLogger(HttpSink.class); + + private Context parentContext; + private BufferQueue dispatchQueue; + private HttpSinkContext context; + // workers + private List workers = new ArrayList<>(); + // output + private HttpOutputChannel outputChannel; + + @Override + public void start() { + super.start(); + try { + this.dispatchQueue = SinkContext.createBufferQueue(); + this.context = new HttpSinkContext(getName(), parentContext, getChannel(), dispatchQueue); + this.context.start(); + for (int i = 0; i < context.getMaxThreads(); i++) { + HttpChannelWorker worker = new HttpChannelWorker(context, i); + this.workers.add(worker); + worker.start(); + } + this.outputChannel = HttpSinkFactory.createHttpOutputChannel(context); + this.outputChannel.init(); + this.outputChannel.start(); + } catch (Exception e) { + LOG.error("Failed to start HttpSink '{}': {}", this.getName(), e.getMessage()); + } + } + + @Override + public void stop() { + super.stop(); + try { + this.context.close(); + for (HttpChannelWorker worker : this.workers) { + worker.close(); + } + this.workers.clear(); + this.outputChannel.close(); + } catch (Exception e) { + LOG.error("Failed to stop HttpSink '{}': {}", this.getName(), e.getMessage()); + } + } + + @Override + public void configure(Context context) { + LOG.info("Start to configure:{}, context:{}.", this.getName(), context.toString()); + this.parentContext = context; + } + + @Override + public Status process() throws EventDeliveryException { + return Status.BACKOFF; + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java new file mode 100644 index 00000000000..9228cf3957a --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkContext.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.common.pojo.sort.ClusterTagConfig; +import org.apache.inlong.common.pojo.sort.TaskConfig; +import org.apache.inlong.common.pojo.sort.node.HttpNodeConfig; +import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; +import org.apache.inlong.sort.standalone.config.pojo.InlongId; +import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter; +import org.apache.inlong.sort.standalone.metrics.SortMetricItem; +import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils; +import org.apache.inlong.sort.standalone.sink.SinkContext; +import org.apache.inlong.sort.standalone.utils.BufferQueue; +import org.apache.inlong.sort.standalone.utils.Constants; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.ClassUtils; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class HttpSinkContext extends SinkContext { + + public static final Logger LOG = InlongLoggerFactory.getLogger(HttpSinkContext.class); + public static final String KEY_NODE_ID = "nodeId"; + public static final String KEY_BASE_URL = "baseUrl"; + public static final String KEY_ENABLE_CREDENTIAL = "enableCredential"; + public static final String KEY_USERNAME = "username"; + public static final String KEY_PASSWORD = "password"; + public static final String KEY_MAX_CONNECT_TOTAL = "maxConnect"; + public static final String KEY_MAX_CONNECT_PER_ROUTE = "maxConnectPerRoute"; + public static final String KEY_CONNECTION_REQUEST_TIMEOUT = "connectionRequestTimeout"; + public static final String KEY_SOCKET_TIMEOUT = "socketTimeout"; + public static final String KEY_MAX_REDIRECTS = "maxRedirects"; + public static final String KEY_LOG_MAX_LENGTH = "logMaxLength"; + public static final String KEY_EVENT_HTTP_REQUEST_HANDLER = "httpRequestHandler"; + + public static final boolean DEFAULT_ENABLE_CREDENTIAL = false; + public static final int DEFAULT_MAX_CONNECT_TOTAL = 1000; + public static final int DEFAULT_MAX_CONNECT_PER_ROUTE = 1000; + public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 0; + public static final int DEFAULT_SOCKET_TIMEOUT = 0; + public static final int DEFAULT_MAX_REDIRECTS = 0; + public static final int DEFAULT_LOG_MAX_LENGTH = 32 * 1024; + + private Context sinkContext; + private HttpNodeConfig httpNodeConfig; + private String nodeId; + private Map idConfigMap = new ConcurrentHashMap<>(); + private ObjectMapper objectMapper = new ObjectMapper(); + private final BufferQueue dispatchQueue; + private AtomicLong offerCounter = new AtomicLong(0); + private AtomicLong takeCounter = new AtomicLong(0); + private AtomicLong backCounter = new AtomicLong(0); + // rest client + private String baseUrl; + private boolean enableCredential; + private String username; + private String password; + private int maxConnect = DEFAULT_MAX_CONNECT_TOTAL; + private int maxConnectPerRoute = DEFAULT_MAX_CONNECT_PER_ROUTE; + private int connectionRequestTimeout = DEFAULT_CONNECTION_REQUEST_TIMEOUT; + private int socketTimeout = DEFAULT_SOCKET_TIMEOUT; + private int maxRedirects = DEFAULT_MAX_REDIRECTS; + private int logMaxLength = DEFAULT_LOG_MAX_LENGTH; + + public HttpSinkContext(String sinkName, Context context, Channel channel, + BufferQueue dispatchQueue) { + super(sinkName, context, channel); + this.sinkContext = context; + this.dispatchQueue = dispatchQueue; + this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID); + } + + public void reload() { + try { + LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}", + taskName, dispatchQueue.size(), offerCounter.getAndSet(0), + takeCounter.getAndSet(0), backCounter.getAndSet(0)); + TaskConfig newTaskConfig = SortConfigHolder.getTaskConfig(taskName); + SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName); + if ((newTaskConfig == null || newTaskConfig.equals(taskConfig)) + && (newSortTaskConfig == null || newSortTaskConfig.equals(sortTaskConfig))) { + return; + } + LOG.info("get new SortTaskConfig:taskName:{}", taskName); + + if (newTaskConfig != null) { + HttpNodeConfig requestNodeConfig = (HttpNodeConfig) newTaskConfig.getNodeConfig(); + if (httpNodeConfig == null || requestNodeConfig.getVersion() > httpNodeConfig.getVersion()) { + this.httpNodeConfig = requestNodeConfig; + } + } + + this.taskConfig = newTaskConfig; + this.sortTaskConfig = newSortTaskConfig; + + // change current config + Map fromTaskConfig = reloadIdParamsFromTaskConfig(taskConfig); + Map fromSortTaskConfig = reloadIdParamsFromSortTaskConfig(sortTaskConfig); + if (unifiedConfiguration) { + idConfigMap = fromTaskConfig; + reloadClientsFromNodeConfig(httpNodeConfig); + } else { + idConfigMap = fromSortTaskConfig; + reloadClientsFromSortTaskConfig(sortTaskConfig); + } + SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, fromTaskConfig, fromSortTaskConfig); + // log + LOG.info("End to get SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName, + objectMapper.writeValueAsString(idConfigMap)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + private Map reloadIdParamsFromTaskConfig(TaskConfig taskConfig) { + if (taskConfig == null) { + return new HashMap<>(); + } + return taskConfig.getClusterTagConfigs() + .stream() + .map(ClusterTagConfig::getDataFlowConfigs) + .flatMap(Collection::stream) + .map(HttpIdConfig::create) + .collect(Collectors.toMap( + config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()), + v -> v, + (flow1, flow2) -> flow1)); + } + + private Map reloadIdParamsFromSortTaskConfig(SortTaskConfig sortTaskConfig) + throws JsonProcessingException { + if (sortTaskConfig == null) { + return new HashMap<>(); + } + Map newIdConfigMap = new ConcurrentHashMap<>(); + List> idList = this.sortTaskConfig.getIdParams(); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + for (Map idParam : idList) { + String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID); + String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID); + String uid = InlongId.generateUid(inlongGroupId, inlongStreamId); + String jsonIdConfig = objectMapper.writeValueAsString(idParam); + HttpIdConfig idConfig = objectMapper.readValue(jsonIdConfig, HttpIdConfig.class); + newIdConfigMap.put(uid, idConfig); + } + return newIdConfigMap; + } + + private void reloadClientsFromNodeConfig(HttpNodeConfig httpNodeConfig) { + Map properties = httpNodeConfig.getProperties(); + this.sinkContext = new Context(properties != null ? properties : new HashMap<>()); + this.baseUrl = httpNodeConfig.getBaseUrl(); + this.enableCredential = httpNodeConfig.getEnableCredential();; + this.username = httpNodeConfig.getUsername(); + this.password = httpNodeConfig.getPassword(); + this.maxConnect = httpNodeConfig.getMaxConnect(); + + this.maxConnectPerRoute = sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, DEFAULT_MAX_CONNECT_PER_ROUTE); + this.connectionRequestTimeout = + sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, DEFAULT_CONNECTION_REQUEST_TIMEOUT); + this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); + this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, DEFAULT_MAX_REDIRECTS); + this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, DEFAULT_LOG_MAX_LENGTH); + } + + private void reloadClientsFromSortTaskConfig(SortTaskConfig sortTaskConfig) { + this.sinkContext = new Context(sortTaskConfig.getSinkParams()); + this.baseUrl = sinkContext.getString(KEY_BASE_URL); + this.enableCredential = sinkContext.getBoolean(KEY_ENABLE_CREDENTIAL, DEFAULT_ENABLE_CREDENTIAL); + this.username = sinkContext.getString(KEY_USERNAME); + this.password = sinkContext.getString(KEY_PASSWORD); + this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL, DEFAULT_MAX_CONNECT_TOTAL); + + this.maxConnectPerRoute = sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, DEFAULT_MAX_CONNECT_PER_ROUTE); + this.connectionRequestTimeout = + sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, DEFAULT_CONNECTION_REQUEST_TIMEOUT); + this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); + this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, DEFAULT_MAX_REDIRECTS); + this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, DEFAULT_LOG_MAX_LENGTH); + } + + public void addSendMetric(ProfileEvent currentRecord, String bid) { + Map dimensions = new HashMap<>(); + dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId()); + dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName()); + // metric + fillInlongId(currentRecord, dimensions); + dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName()); + dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid); + long msgTime = currentRecord.getRawLogTime(); + long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); + dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime)); + SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions); + long count = 1; + long size = currentRecord.getBody().length; + metricItem.sendCount.addAndGet(count); + metricItem.sendSize.addAndGet(size); + } + + public void addSendFailMetric() { + Map dimensions = new HashMap<>(); + dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId()); + dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName()); + long msgTime = System.currentTimeMillis(); + long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); + dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime)); + SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions); + metricItem.readFailCount.incrementAndGet(); + } + + public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) { + Map dimensions = new HashMap<>(); + dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId()); + dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName()); + // metric + fillInlongId(currentRecord, dimensions); + dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName()); + dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid); + final long currentTime = System.currentTimeMillis(); + long msgTime = currentRecord.getRawLogTime(); + long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); + dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime)); + SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions); + if (result) { + metricItem.sendSuccessCount.incrementAndGet(); + metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length); + AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord); + if (sendTime > 0) { + long sinkDuration = currentTime - sendTime; + long nodeDuration = currentTime - currentRecord.getFetchTime(); + long wholeDuration = currentTime - currentRecord.getRawLogTime(); + metricItem.sinkDuration.addAndGet(sinkDuration); + metricItem.nodeDuration.addAndGet(nodeDuration); + metricItem.wholeDuration.addAndGet(wholeDuration); + } + } else { + metricItem.sendFailCount.incrementAndGet(); + metricItem.sendFailSize.addAndGet(currentRecord.getBody().length); + } + } + + public HttpIdConfig getIdConfig(String uid) { + return this.idConfigMap.get(uid); + } + + public String getNodeId() { + return nodeId; + } + + public Map getIdConfigMap() { + return idConfigMap; + } + + public Context getSinkContext() { + return sinkContext; + } + + public void setSinkContext(Context sinkContext) { + this.sinkContext = sinkContext; + } + + public void offerDispatchQueue(HttpRequest httpRequest) { + this.offerCounter.incrementAndGet(); + dispatchQueue.acquire(httpRequest.getEvent().getBody().length); + dispatchQueue.offer(httpRequest); + } + + public HttpRequest takeDispatchQueue() { + HttpRequest httpRequest = this.dispatchQueue.pollRecord(); + if (httpRequest != null) { + this.takeCounter.incrementAndGet(); + } + return httpRequest; + } + + public void backDispatchQueue(HttpRequest httpRequest) { + this.backCounter.incrementAndGet(); + dispatchQueue.offer(httpRequest); + } + + public void releaseDispatchQueue(HttpRequest httpRequest) { + dispatchQueue.release(httpRequest.getEvent().getBody().length); + } + + public String getBaseUrl() { + return baseUrl; + } + + public void setBaseUrl(String baseUrl) { + this.baseUrl = baseUrl; + } + + public boolean getEnableCredential() { + return enableCredential; + } + + public void setEnableCredential(boolean enableCredential) { + this.enableCredential = enableCredential; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getMaxConnect() { + return maxConnect; + } + + public int getMaxConnectPerRoute() { + return maxConnectPerRoute; + } + + public int getConnectionRequestTimeout() { + return connectionRequestTimeout; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public int getMaxRedirects() { + return maxRedirects; + } + + public int getLogMaxLength() { + return logMaxLength; + } + + public void setMaxConnect(int maxConnect) { + this.maxConnect = maxConnect; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public void setIdConfigMap(Map idConfigMap) { + this.idConfigMap = idConfigMap; + } + + public IEvent2HttpRequestHandler createHttpRequestHandler() { + // IEvent2HttpRequestHandler + String httpRequestHandlerClass = CommonPropertiesHolder.getString(KEY_EVENT_HTTP_REQUEST_HANDLER, + DefaultEvent2HttpRequestHandler.class.getName()); + try { + Class handlerClass = ClassUtils.getClass(httpRequestHandlerClass); + Object handlerObject = handlerClass.getDeclaredConstructor().newInstance(); + if (handlerObject instanceof IEvent2HttpRequestHandler) { + return (IEvent2HttpRequestHandler) handlerObject; + } + } catch (Throwable t) { + LOG.error("Fail to init IEvent2HttpRequestHandler,handlerClass:{},error:{}", + httpRequestHandlerClass, t.getMessage(), t); + } + return null; + } + +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java new file mode 100644 index 00000000000..6125297bae1 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/HttpSinkFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; + +public class HttpSinkFactory { + + public static HttpOutputChannel createHttpOutputChannel(HttpSinkContext context) { + return new HttpOutputChannel(context); + } + + public static CloseableHttpAsyncClient createHttpAsyncClient(HttpAsyncClientBuilder builder) { + return builder.build(); + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java new file mode 100644 index 00000000000..65502fb50ee --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/http/IEvent2HttpRequestHandler.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink.http; + +import org.apache.inlong.sort.standalone.channel.ProfileEvent; + +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.net.URISyntaxException; + +public interface IEvent2HttpRequestHandler { + + HttpRequest parse(HttpSinkContext context, ProfileEvent event) throws URISyntaxException, JsonProcessingException; +}