From 266148ccd336d2de64dbf0e8391a2d030a1c92a9 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Tue, 10 May 2022 22:37:32 +0800 Subject: [PATCH 01/27] [INLONG-4133][Manager] Not pass the type field when querying sources and sinks (#4135) * Add the judgment of equal ID in the update * Delete the requirement that type must be used in paging query * Delete the type parameter of the delete function * Change the exception log * Change the check of the source name Co-authored-by: healchow --- .../api/impl/DefaultInlongStreamBuilder.java | 6 +-- .../client/api/impl/InlongStreamImpl.java | 6 +-- .../api/inner/InnerInlongManagerClient.java | 8 +-- .../dao/mapper/StreamSinkEntityMapper.java | 6 ++- .../mappers/StreamSinkEntityMapper.xml | 11 ++-- .../service/sink/StreamSinkOperation.java | 7 ++- .../service/sink/StreamSinkService.java | 8 ++- .../service/sink/StreamSinkServiceImpl.java | 52 ++++++++++++++----- .../sink/ck/ClickHouseSinkOperation.java | 5 +- .../service/sink/hive/HiveSinkOperation.java | 5 +- .../sink/iceberg/IcebergSinkOperation.java | 5 +- .../sink/kafka/KafkaSinkOperation.java | 5 +- .../source/AbstractSourceOperation.java | 33 +++++++++--- .../service/source/StreamSourceOperation.java | 6 +-- .../service/source/StreamSourceService.java | 14 ++--- .../source/StreamSourceServiceImpl.java | 34 +++++++----- .../AbstractSourceOperateListener.java | 2 +- .../source/listener/SourceDeleteListener.java | 2 +- .../listener/SourceRestartListener.java | 2 +- .../source/listener/SourceStopListener.java | 2 +- .../service/core/impl/AgentServiceTest.java | 2 +- .../sink/ClickHouseStreamSinkServiceTest.java | 6 +-- .../core/sink/HiveStreamSinkServiceTest.java | 8 +-- .../sink/IcebergStreamSinkServiceTest.java | 8 +-- .../core/sink/KafkaStreamSinkServiceTest.java | 6 +-- .../core/source/StreamSourceServiceTest.java | 10 ++-- .../listener/DataSourceListenerTest.java | 5 +- .../web/controller/StreamSinkController.java | 20 +++---- .../controller/StreamSourceController.java | 20 +++---- 29 files changed, 163 insertions(+), 141 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java index c9b0a8fd072..c744ac40748 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java @@ -212,9 +212,8 @@ private void initOrUpdateSource() { for (SourceListResponse sourceListResponse : sourceListResponses) { final String sourceName = sourceListResponse.getSourceName(); final int id = sourceListResponse.getId(); - final String type = sourceListResponse.getSourceType(); if (sourceRequests.get(sourceName) == null) { - boolean isDelete = managerClient.deleteSource(id, type); + boolean isDelete = managerClient.deleteSource(id); if (!isDelete) { throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse)); } @@ -251,9 +250,8 @@ private void initOrUpdateSink() { for (SinkListResponse sinkListResponse : sinkListResponses) { final String sinkName = sinkListResponse.getSinkName(); final int id = sinkListResponse.getId(); - final String type = sinkListResponse.getSinkType(); if (sinkRequests.get(sinkName) == null) { - boolean isDelete = managerClient.deleteSink(id, type); + boolean isDelete = managerClient.deleteSink(id); if (!isDelete) { throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse)); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java index 11da4a2ef63..ebfba5909f8 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java @@ -363,9 +363,8 @@ private void initOrUpdateSource(InlongStreamInfo streamInfo) { for (SourceListResponse sourceListResponse : sourceListResponses) { final String sourceName = sourceListResponse.getSourceName(); final int id = sourceListResponse.getId(); - final String type = sourceListResponse.getSourceType(); if (this.streamSources.get(sourceName) == null) { - boolean isDelete = managerClient.deleteSource(id, type); + boolean isDelete = managerClient.deleteSource(id); if (!isDelete) { throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse)); } @@ -400,9 +399,8 @@ private void initOrUpdateSink(InlongStreamInfo streamInfo) { for (SinkListResponse sinkListResponse : sinkListResponses) { final String sinkName = sinkListResponse.getSinkName(); final int id = sinkListResponse.getId(); - final String type = sinkListResponse.getSinkType(); if (this.streamSinks.get(sinkName) == null) { - boolean isDelete = managerClient.deleteSink(id, type); + boolean isDelete = managerClient.deleteSink(id); if (!isDelete) { throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse)); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java index fa6def6cd35..daffd1e4ba7 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java @@ -480,12 +480,10 @@ public Pair updateSource(SourceRequest sourceRequest) { } } - public boolean deleteSource(int id, String type) { + public boolean deleteSource(int id) { AssertUtil.isTrue(id > 0, "sourceId is illegal"); - AssertUtil.notEmpty(type, "sourceType should not be null"); final String path = HTTP_PATH + "/source/delete/" + id; String url = formatUrl(path); - url = String.format("%s&sourceType=%s", url, type); RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) @@ -638,12 +636,10 @@ public String createSink(SinkRequest sinkRequest) { } } - public boolean deleteSink(int id, String type) { + public boolean deleteSink(int id) { AssertUtil.isTrue(id > 0, "sinkId is illegal"); - AssertUtil.notEmpty(type, "sinkType should not be null"); final String path = HTTP_PATH + "/sink/delete/" + id; String url = formatUrl(path); - url = String.format("%s&sinkType=%s", url, type); RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), ""); Request request = new Request.Builder() .url(url) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java index 8ec752811cd..b6f88b83318 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java @@ -18,8 +18,8 @@ package org.apache.inlong.manager.dao.mapper; import org.apache.ibatis.annotations.Param; -import org.apache.inlong.manager.common.pojo.sink.SinkInfo; import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse; +import org.apache.inlong.manager.common.pojo.sink.SinkInfo; import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.springframework.stereotype.Repository; @@ -63,9 +63,11 @@ List selectSummary(@Param("groupId") String groupId, * * @param groupId Inlong group id. * @param streamId Inlong stream id. + * @param sinkName Stream sink name. * @return Sink entity list. */ - List selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId); + List selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId, + @Param("sinkName") String sinkName); /** * According to the group id, stream id and sink type, query valid sink entity list. diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml index d7b337a3e95..bddd6086018 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml @@ -94,16 +94,16 @@ and status = #{request.status, jdbcType=INTEGER} - + and inlong_cluster_name = #{request.inlongClusterName, jdbcType=VARCHAR} - + and data_node_name = #{request.dataNodeName, jdbcType=VARCHAR} - + and sort_task_name = #{request.sortTaskName, jdbcType=VARCHAR} - + and sort_consumer_group = #{request.sortConsumerGroup, jdbcType=VARCHAR} order by modify_time desc @@ -131,6 +131,9 @@ and inlong_stream_id = #{streamId, jdbcType=VARCHAR} + + and sink_name = #{sinkName, jdbcType=VARCHAR} + +