From d0abb75896cce63f8309d1992aa4f4aae5224895 Mon Sep 17 00:00:00 2001 From: Haoran Meng Date: Mon, 27 May 2024 13:41:49 +0800 Subject: [PATCH] Use ProcessPersistService instead of ProcessListEvent and remove ProcessSubscriber (#31411) --- .../mode/process/ProcessSubscriber.java | 44 ------- .../event/KillProcessRequestEvent.java | 31 ----- .../event/ShowProcessListRequestEvent.java | 24 ---- .../event/ShowProcessListResponseEvent.java | 34 ------ .../cluster/ClusterContextManagerBuilder.java | 2 - .../subscriber/ClusterProcessSubscriber.java | 113 ------------------ .../ClusterProcessPersistServiceTest.java} | 24 ++-- .../StandaloneEventSubscriberRegistry.java | 4 +- .../StandaloneProcessSubscriber.java | 59 --------- .../StandaloneProcessPersistServiceTest.java} | 39 +++++- .../admin/executor/KillProcessExecutor.java | 7 +- .../executor/ShowProcessListExecutor.java | 18 +-- .../executor/ShowProcessListExecutorTest.java | 11 +- 13 files changed, 58 insertions(+), 352 deletions(-) delete mode 100644 mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java delete mode 100644 mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java delete mode 100644 mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java delete mode 100644 mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java delete mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java rename mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/{coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java => service/ClusterProcessPersistServiceTest.java} (72%) delete mode 100644 mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java rename mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/{subscriber/StandaloneProcessSubscriberTest.java => service/StandaloneProcessPersistServiceTest.java} (53%) diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java deleted file mode 100644 index 96fb755510cdd..0000000000000 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.process; - -import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent; -import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent; - -import java.sql.SQLException; - -/** - * Process subscriber. - */ -public interface ProcessSubscriber { - - /** - * Post show process list data. - * - * @param event show process list request event - */ - void postShowProcessListData(ShowProcessListRequestEvent event); - - /** - * Kill process. - * - * @param event kill process request event - * @throws SQLException SQL exception - */ - void killProcess(KillProcessRequestEvent event) throws SQLException; -} diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java deleted file mode 100644 index e69590c04cdf1..0000000000000 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.process.event; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -/** - * Kill process request event. - */ -@RequiredArgsConstructor -@Getter -public final class KillProcessRequestEvent { - - private final String id; -} diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java deleted file mode 100644 index 289964b6287d3..0000000000000 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.process.event; - -/** - * Show process list request event. - */ -public final class ShowProcessListRequestEvent { -} diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java deleted file mode 100644 index 4b83a572516ad..0000000000000 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.process.event; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.executor.sql.process.Process; - -import java.util.Collection; - -/** - * Show process list response event. - */ -@RequiredArgsConstructor -@Getter -public final class ShowProcessListResponseEvent { - - private final Collection processes; -} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index 72f4874996c57..f6409ee689d7a 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -34,7 +34,6 @@ import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator; import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry; @@ -97,7 +96,6 @@ private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepo // TODO remove the method, only keep ZooKeeper's events, remove all decouple events private void createSubscribers(final EventBusContext eventBusContext, final ClusterPersistRepository repository) { eventBusContext.register(new QualifiedDataSourceStatusSubscriber(repository)); - eventBusContext.register(new ClusterProcessSubscriber(repository, eventBusContext)); } private void registerOnline(final EventBusContext eventBusContext, final ComputeNodeInstanceContext computeNodeInstanceContext, diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java deleted file mode 100644 index 440d8e22dc9b6..0000000000000 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber; - -import com.google.common.eventbus.Subscribe; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; -import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList; -import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; -import org.apache.shardingsphere.infra.instance.metadata.InstanceType; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; -import org.apache.shardingsphere.metadata.persist.node.ProcessNode; -import org.apache.shardingsphere.mode.process.ProcessSubscriber; -import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent; -import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent; -import org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent; -import org.apache.shardingsphere.mode.spi.PersistRepository; - -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Cluster process subscriber. - */ -@RequiredArgsConstructor -public final class ClusterProcessSubscriber implements ProcessSubscriber, EventSubscriber { - - private final PersistRepository repository; - - private final EventBusContext eventBusContext; - - private final YamlProcessListSwapper swapper = new YamlProcessListSwapper(); - - @Override - @Subscribe - public void postShowProcessListData(final ShowProcessListRequestEvent event) { - String taskId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", ""); - Collection triggerPaths = getShowProcessListTriggerPaths(taskId); - boolean isCompleted = false; - try { - triggerPaths.forEach(each -> repository.persist(each, "")); - isCompleted = ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () -> isReady(triggerPaths)); - postShowProcessListData(taskId); - } finally { - repository.delete(ProcessNode.getProcessIdPath(taskId)); - if (!isCompleted) { - triggerPaths.forEach(repository::delete); - } - } - } - - private void postShowProcessListData(final String taskId) { - YamlProcessList yamlProcessList = new YamlProcessList(); - for (String each : repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream() - .map(each -> repository.query(ProcessNode.getProcessListInstancePath(taskId, each))).collect(Collectors.toList())) { - yamlProcessList.getProcesses().addAll(YamlEngine.unmarshal(each, YamlProcessList.class).getProcesses()); - } - eventBusContext.post(new ShowProcessListResponseEvent(swapper.swapToObject(yamlProcessList))); - } - - private Collection getShowProcessListTriggerPaths(final String taskId) { - return Stream.of(InstanceType.values()) - .flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessTriggerInstanceNodePath(onlinePath, taskId))) - .collect(Collectors.toList()); - } - - private boolean isReady(final Collection paths) { - return paths.stream().noneMatch(each -> null != repository.query(each)); - } - - @Override - @Subscribe - public void killProcess(final KillProcessRequestEvent event) { - String processId = event.getId(); - Collection triggerPaths = getKillProcessTriggerPaths(processId); - boolean isCompleted = false; - try { - triggerPaths.forEach(each -> repository.persist(each, "")); - isCompleted = ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, () -> isReady(triggerPaths)); - } finally { - if (!isCompleted) { - triggerPaths.forEach(repository::delete); - } - } - } - - private Collection getKillProcessTriggerPaths(final String processId) { - return Stream.of(InstanceType.values()) - .flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessKillInstanceIdNodePath(onlinePath, processId))) - .collect(Collectors.toList()); - } -} diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java similarity index 72% rename from mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java rename to mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java index 5fc60d3e80597..d7f2ce4969e3d 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java @@ -15,12 +15,10 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber; +package org.apache.shardingsphere.mode.manager.cluster.service; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.metadata.persist.node.ComputeNode; -import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,26 +33,32 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class ClusterProcessSubscriberTest { +class ClusterProcessPersistServiceTest { @Mock private ClusterPersistRepository repository; - private final EventBusContext eventBusContext = new EventBusContext(); - - private ClusterProcessSubscriber clusterProcessListSubscriber; + private ClusterProcessPersistService processPersistService; @BeforeEach void setUp() { - clusterProcessListSubscriber = new ClusterProcessSubscriber(repository, eventBusContext); + processPersistService = new ClusterProcessPersistService(repository); } @Test - void assertPostShowProcessListData() { + void getProcessList() { when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList()); when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc")); when(repository.query(any())).thenReturn(null); - clusterProcessListSubscriber.postShowProcessListData(new ShowProcessListRequestEvent()); + processPersistService.getProcessList(); + verify(repository).persist(any(), any()); + } + + @Test + void killProcess() { + when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList()); + when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc")); + processPersistService.killProcess("foo_process_id"); verify(repository).persist(any(), any()); } } diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java index b0a4278a9a71b..040973879dc13 100644 --- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java +++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java @@ -27,8 +27,6 @@ public final class StandaloneEventSubscriberRegistry extends EventSubscriberRegistry { public StandaloneEventSubscriberRegistry(final ContextManager contextManager) { - super(contextManager, - new StandaloneProcessSubscriber(contextManager.getComputeNodeInstanceContext().getEventBusContext()), - new RuleItemChangedSubscriber(contextManager)); + super(contextManager, new RuleItemChangedSubscriber(contextManager)); } } diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java deleted file mode 100644 index 8180b6334d615..0000000000000 --- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.standalone.subscriber; - -import com.google.common.eventbus.Subscribe; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.executor.sql.process.Process; -import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.process.ProcessSubscriber; -import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent; -import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent; -import org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent; - -import java.sql.SQLException; -import java.sql.Statement; - -/** - * Standalone process subscriber. - */ -@RequiredArgsConstructor -public final class StandaloneProcessSubscriber implements ProcessSubscriber, EventSubscriber { - - private final EventBusContext eventBusContext; - - @Override - @Subscribe - public void postShowProcessListData(final ShowProcessListRequestEvent event) { - eventBusContext.post(new ShowProcessListResponseEvent(ProcessRegistry.getInstance().listAll())); - } - - @Override - @Subscribe - public void killProcess(final KillProcessRequestEvent event) throws SQLException { - Process process = ProcessRegistry.getInstance().get(event.getId()); - if (null == process) { - return; - } - for (Statement each : process.getProcessStatements().values()) { - each.cancel(); - } - } -} diff --git a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java similarity index 53% rename from mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java rename to mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java index 83e1dae112ce0..8c9e53d028a68 100644 --- a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java +++ b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java @@ -15,29 +15,56 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.standalone.subscriber; +package org.apache.shardingsphere.mode.manager.standalone.service; +import org.apache.shardingsphere.infra.executor.sql.process.Process; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; -import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(AutoMockExtension.class) @StaticMockSettings(ProcessRegistry.class) -class StandaloneProcessSubscriberTest { +class StandaloneProcessPersistServiceTest { + + private StandaloneProcessPersistService processPersistService; + + @BeforeEach + void setUp() { + processPersistService = new StandaloneProcessPersistService(); + } @Test - void assertPostShowProcessListData() { + void getProcessList() { ProcessRegistry processRegistry = mock(ProcessRegistry.class); when(ProcessRegistry.getInstance()).thenReturn(processRegistry); - new StandaloneProcessSubscriber(new EventBusContext()).postShowProcessListData(new ShowProcessListRequestEvent()); + processPersistService.getProcessList(); verify(processRegistry).listAll(); } + + @Test + void killProcess() throws SQLException { + ProcessRegistry processRegistry = mock(ProcessRegistry.class); + when(ProcessRegistry.getInstance()).thenReturn(processRegistry); + Process process = mock(Process.class); + Statement statement = mock(Statement.class); + Map processStatements = new ConcurrentHashMap<>(); + processStatements.put(1, statement); + when(process.getProcessStatements()).thenReturn(processStatements); + when(processRegistry.get(any())).thenReturn(process); + processPersistService.killProcess("foo_process_id"); + verify(statement).cancel(); + } } diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java index 87eebb9e610c3..ddea96d89a15a 100644 --- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java +++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java @@ -18,12 +18,13 @@ package org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.proxy.backend.handler.admin.executor.DatabaseAdminExecutor; import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLKillStatement; +import java.sql.SQLException; + /** * Kill process executor. */ @@ -38,8 +39,8 @@ public final class KillProcessExecutor implements DatabaseAdminExecutor { * @param connectionSession connection session */ @Override - public void execute(final ConnectionSession connectionSession) { + public void execute(final ConnectionSession connectionSession) throws SQLException { String processId = killStatement.getProcessId(); - ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new KillProcessRequestEvent(processId)); + ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().killProcess(processId); } } diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java index 8aa4aabf9763a..d4907a6816244 100644 --- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java +++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor; -import com.google.common.eventbus.Subscribe; import lombok.Getter; import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult; import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData; @@ -29,8 +28,6 @@ import org.apache.shardingsphere.infra.merge.result.MergedResult; import org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent; -import org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.proxy.backend.handler.admin.executor.DatabaseAdminQueryExecutor; import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; @@ -50,8 +47,6 @@ public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor private final boolean showFullProcesslist; - private Collection processes; - @Getter private QueryResultMetaData queryResultMetaData; @@ -63,17 +58,6 @@ public ShowProcessListExecutor(final boolean showFullProcesslist) { ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().register(this); } - /** - * Receive and handle response event. - * - * @param event show process list response event - */ - @SuppressWarnings("unused") - @Subscribe - public void receiveProcessListData(final ShowProcessListResponseEvent event) { - processes = event.getProcesses(); - } - @Override public void execute(final ConnectionSession connectionSession) { queryResultMetaData = createQueryResultMetaData(); @@ -81,7 +65,7 @@ public void execute(final ConnectionSession connectionSession) { } private QueryResult getQueryResult() { - ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new ShowProcessListRequestEvent()); + Collection processes = ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().getProcessList(); if (null == processes || processes.isEmpty()) { return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList()); } diff --git a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java index 5bd4d57c12c8f..16ad4602956a0 100644 --- a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java +++ b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java @@ -28,9 +28,9 @@ import org.apache.shardingsphere.test.mock.StaticMockSettings; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.internal.configuration.plugins.Plugins; import java.sql.SQLException; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -46,11 +46,11 @@ class ShowProcessListExecutorTest { @Test - void assertExecute() throws SQLException, ReflectiveOperationException { + void assertExecute() throws SQLException { ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS); when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager); + when(contextManager.getPersistServiceFacade().getProcessPersistService().getProcessList()).thenReturn(mockProcessList()); ShowProcessListExecutor showProcessListExecutor = new ShowProcessListExecutor(false); - setupProcesses(showProcessListExecutor); showProcessListExecutor.execute(new ConnectionSession(mock(MySQLDatabaseType.class), new DefaultAttributeMap())); assertThat(showProcessListExecutor.getQueryResultMetaData().getColumnCount(), is(8)); MergedResult mergedResult = showProcessListExecutor.getMergedResult(); @@ -64,11 +64,10 @@ void assertExecute() throws SQLException, ReflectiveOperationException { } } - private void setupProcesses(final ShowProcessListExecutor showProcessListExecutor) throws ReflectiveOperationException { + private Collection mockProcessList() { Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80", 1617939785160L, "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", "foo_db", "root", "127.0.0.1", new AtomicInteger(2), new AtomicInteger(1), new AtomicBoolean(false), new AtomicBoolean()); - Plugins.getMemberAccessor().set( - showProcessListExecutor.getClass().getDeclaredField("processes"), showProcessListExecutor, Collections.singleton(process)); + return Collections.singleton(process); } }