From 1d39768b265a59952d5c5c4be2e7a99f6eacdeee Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Sun, 28 Apr 2024 20:12:45 +0800 Subject: [PATCH] implement admin offset service --- .../build.gradle | 31 +++ .../gradle.properties | 16 ++ .../offsetmgmt/admin/AdminOffsetService.java | 206 ++++++++++++++++++ ...etmgmt.api.storage.OffsetManagementService | 16 ++ .../api/storage/OffsetManagementService.java | 2 +- .../runtime/connector/ConnectorRuntime.java | 2 +- settings.gradle | 1 + 7 files changed, 272 insertions(+), 2 deletions(-) create mode 100644 eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/build.gradle create mode 100644 eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/gradle.properties create mode 100644 eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java create mode 100644 eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/build.gradle b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/build.gradle new file mode 100644 index 0000000000..70defef627 --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + implementation project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api") + implementation project(":eventmesh-common") + testImplementation "org.mockito:mockito-core" + + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + + implementation "io.grpc:grpc-core" + implementation "io.grpc:grpc-protobuf" + implementation "io.grpc:grpc-stub" + implementation "io.grpc:grpc-netty" + implementation "io.grpc:grpc-netty-shaded" +} diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/gradle.properties b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/gradle.properties new file mode 100644 index 0000000000..a9fd83fea0 --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/gradle.properties @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# \ No newline at end of file diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java new file mode 100644 index 0000000000..70edb4605b --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.openconnect.offsetmgmt.admin; + +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.config.offset.OffsetStorageConfig; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.ConnectorRecordPartition; +import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.KeyValueStore; +import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.MemoryBasedKeyValueStore; +import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; + +import com.fasterxml.jackson.core.type.TypeReference; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AdminOffsetService implements OffsetManagementService { + + private String adminServerAddr; + + private ManagedChannel channel; + + private AdminServiceStub adminServiceStub; + + private AdminServiceBlockingStub adminServiceBlockingStub; + + StreamObserver responseObserver; + + StreamObserver requestObserver; + + public KeyValueStore positionStore; + + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public void configure(OffsetStorageConfig config) { + OffsetManagementService.super.configure(config); + } + + @Override + public void persist() { + + } + + @Override + public void load() { + + } + + @Override + public void synchronize() { + Metadata metadata = Metadata.newBuilder() + .setType() + .build(); + Payload payload = Payload.newBuilder() + .setMetadata() + .setBody() + .build(); + } + + @Override + public Map getPositionMap() { + // get from memory storage first + if (positionStore.getKVMap() == null || positionStore.getKVMap().isEmpty()) { + // get position from adminService + Metadata metadata = Metadata.newBuilder() + .setType() + .build(); + Payload payload = Payload.newBuilder() + .setMetadata() + .setBody() + .build(); + + adminServiceBlockingStub.invoke() + try { + Map configMap = JsonUtils.parseTypeReferenceObject(configService.getConfig(dataId, group, 5000L), + new TypeReference>() { + }); + log.info("position map {}", configMap); + return configMap; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + log.info("memory position map {}", positionStore.getKVMap()); + return positionStore.getKVMap(); + } + + @Override + public RecordOffset getPosition(ConnectorRecordPartition partition) { + // get from memory storage first + if (positionStore.get(partition) == null) { + // get position from adminService + try { + Map recordMap = JacksonUtils.toObj(configService.getConfig(dataId, group, 5000L), + new TypeReference>() { + }); + log.info("nacos record position {}", recordMap.get(partition)); + return recordMap.get(partition); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + log.info("memory record position {}", positionStore.get(partition)); + return positionStore.get(partition); + } + + @Override + public void putPosition(Map positions) { + positionStore.putAll(positions); + } + + @Override + public void putPosition(ConnectorRecordPartition partition, RecordOffset position) { + positionStore.put(partition, position); + } + + @Override + public void removePosition(List partitions) { + if (partitions == null) { + return; + } + for (ConnectorRecordPartition partition : partitions) { + positionStore.remove(partition); + } + } + + @Override + public void initialize(OffsetStorageConfig offsetStorageConfig) { + this.adminServerAddr = offsetStorageConfig.getOffsetStorageAddr(); + this.channel = ManagedChannelBuilder.forTarget(adminServerAddr) + .usePlaintext() + .build(); + this.adminServiceStub = AdminServiceGrpc.newStub(channel).withWaitForReady(); + this.adminServiceBlockingStub = AdminServiceGrpc.newBlockingStub(channel).withWaitForReady(); + + responseObserver = new StreamObserver() { + @Override + public void onNext(Payload response) { + log.info("receive message: {} ", response); + } + + @Override + public void onError(Throwable t) { + log.error("receive error message: {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("finished receive message and completed"); + } + }; + + requestObserver = adminServiceStub.invokeBiStream(responseObserver); + + this.positionStore = new MemoryBasedKeyValueStore<>(); + Map initialRecordOffsetMap = JsonUtils.parseTypeReferenceObject(offsetStorageConfig.getExtensions().get("offset"), + new TypeReference>(){ + }); + log.info("init record offset {}", initialRecordOffsetMap); + positionStore.putAll(initialRecordOffsetMap); + } +} diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService new file mode 100644 index 0000000000..11b4466d79 --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +admin=org.apache.eventmesh.openconnect.offsetmgmt.admin.AdminOffsetService \ No newline at end of file diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java index 1d9eb751df..25c927aa0c 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetManagementService.java @@ -86,6 +86,6 @@ default void configure(OffsetStorageConfig config) { */ void removePosition(List partitions); - void initialize(OffsetStorageConfig connectorConfig); + void initialize(OffsetStorageConfig offsetStorageConfig); } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 74c26be8c2..93403ae594 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -148,7 +148,7 @@ public void onCompleted() { } private void initStorageService() { - + // TODO: init producer & consumer producer = StoragePluginFactory.getMeshMQProducer(runtimeInstanceConfig.getStoragePluginType()); consumer = StoragePluginFactory.getMeshMQPushConsumer(runtimeInstanceConfig.getStoragePluginType()); diff --git a/settings.gradle b/settings.gradle index 32d2c2e261..a67978ae24 100644 --- a/settings.gradle +++ b/settings.gradle @@ -55,6 +55,7 @@ include 'eventmesh-transformer' include 'eventmesh-openconnect:eventmesh-openconnect-java' include 'eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api' +include 'eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-admin' include 'eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-nacos' include 'eventmesh-connectors:eventmesh-connector-openfunction'