From 257fcd9e54d81d2506e518723771847ded81c059 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sun, 15 Dec 2024 23:53:13 +0800 Subject: [PATCH] Add DispatchEventSubscriber (#34069) --- .../subscriber/DispatchEventSubscriber.java | 26 ++++++++++++++++ .../type/CacheEvictedSubscriber.java | 6 ++-- .../type/ComputeNodeStateSubscriber.java | 27 +++++++++------- .../type/DatabaseDataChangedSubscriber.java | 31 ++++++++++--------- ...lobalRuleConfigurationEventSubscriber.java | 4 +-- .../type/ListenerAssistedSubscriber.java | 4 +-- .../type/MetaDataChangedSubscriber.java | 4 +-- .../type/ProcessListChangedSubscriber.java | 26 ++++++++++------ .../type/PropertiesEventSubscriber.java | 4 +-- .../type/QualifiedDataSourceSubscriber.java | 6 ++-- .../type/RuleItemChangedSubscriber.java | 15 ++++++--- .../type/StateChangedSubscriber.java | 10 +++--- .../type/StorageUnitEventSubscriber.java | 4 +-- 13 files changed, 105 insertions(+), 62 deletions(-) create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java new file mode 100644 index 0000000000000..2a915c5093d22 --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber; + +import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; + +/** + * Dispatch event subscriber. + */ +public interface DispatchEventSubscriber extends EventSubscriber { +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java index de796a986f23d..ef02905d65d6c 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java @@ -18,14 +18,14 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; import com.google.common.eventbus.Subscribe; -import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent; import org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; +import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; /** * Cache evicted subscriber. */ -public final class CacheEvictedSubscriber implements EventSubscriber { +public final class CacheEvictedSubscriber implements DispatchEventSubscriber { /** * Callback of any {@link DispatchEvent}. diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java index 6eb8b274a64dc..cf282f34c58ff 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java @@ -18,24 +18,30 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; import com.google.common.eventbus.Subscribe; -import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.instance.ComputeNodeInstance; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.event.dispatch.state.compute.ComputeNodeInstanceStateChangedEvent; import org.apache.shardingsphere.mode.event.dispatch.state.compute.LabelsEvent; import org.apache.shardingsphere.mode.event.dispatch.state.compute.WorkerIdEvent; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOfflineEvent; import org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOnlineEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; /** * Compute node state subscriber. */ -@RequiredArgsConstructor -public final class ComputeNodeStateSubscriber implements EventSubscriber { +public final class ComputeNodeStateSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; + private final ComputeNodeInstanceContext computeNodeInstanceContext; + + public ComputeNodeStateSubscriber(final ContextManager contextManager) { + this.contextManager = contextManager; + computeNodeInstanceContext = contextManager.getComputeNodeInstanceContext(); + } + /** * Renew instance list. * @@ -43,8 +49,7 @@ public final class ComputeNodeStateSubscriber implements EventSubscriber { */ @Subscribe public synchronized void renew(final InstanceOnlineEvent event) { - contextManager.getComputeNodeInstanceContext().addComputeNodeInstance( - contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData())); + computeNodeInstanceContext.addComputeNodeInstance(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData())); } /** @@ -54,7 +59,7 @@ public synchronized void renew(final InstanceOnlineEvent event) { */ @Subscribe public synchronized void renew(final InstanceOfflineEvent event) { - contextManager.getComputeNodeInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData())); + computeNodeInstanceContext.deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData())); } /** @@ -64,7 +69,7 @@ public synchronized void renew(final InstanceOfflineEvent event) { */ @Subscribe public synchronized void renew(final ComputeNodeInstanceStateChangedEvent event) { - contextManager.getComputeNodeInstanceContext().updateStatus(event.getInstanceId(), event.getStatus()); + computeNodeInstanceContext.updateStatus(event.getInstanceId(), event.getStatus()); } /** @@ -74,7 +79,7 @@ public synchronized void renew(final ComputeNodeInstanceStateChangedEvent event) */ @Subscribe public synchronized void renew(final WorkerIdEvent event) { - contextManager.getComputeNodeInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId()); + computeNodeInstanceContext.updateWorkerId(event.getInstanceId(), event.getWorkerId()); } /** @@ -85,6 +90,6 @@ public synchronized void renew(final WorkerIdEvent event) { @Subscribe public synchronized void renew(final LabelsEvent event) { // TODO labels may be empty - contextManager.getComputeNodeInstanceContext().updateLabels(event.getInstanceId(), event.getLabels()); + computeNodeInstanceContext.updateLabels(event.getInstanceId(), event.getLabels()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java index c092df8e8985e..4264ad3dcc5ea 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java @@ -18,9 +18,6 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; import com.google.common.eventbus.Subscribe; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataAddedEvent; import org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataDeletedEvent; import org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataAddedEvent; @@ -28,14 +25,20 @@ import org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataChangedEvent; import org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataDeletedEvent; import org.apache.shardingsphere.mode.event.dispatch.metadata.data.TableDataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; +import org.apache.shardingsphere.mode.metadata.manager.ShardingSphereDatabaseDataManager; /** * Database data changed subscriber. */ -@RequiredArgsConstructor -public final class DatabaseDataChangedSubscriber implements EventSubscriber { +public final class DatabaseDataChangedSubscriber implements DispatchEventSubscriber { - private final ContextManager contextManager; + private final ShardingSphereDatabaseDataManager databaseManager; + + public DatabaseDataChangedSubscriber(final ContextManager contextManager) { + databaseManager = contextManager.getMetaDataContextManager().getDatabaseManager(); + } /** * Renew to persist ShardingSphere database data. @@ -44,7 +47,7 @@ public final class DatabaseDataChangedSubscriber implements EventSubscriber { */ @Subscribe public synchronized void renew(final DatabaseDataAddedEvent event) { - contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereDatabaseData(event.getDatabaseName()); + databaseManager.addShardingSphereDatabaseData(event.getDatabaseName()); } /** @@ -54,7 +57,7 @@ public synchronized void renew(final DatabaseDataAddedEvent event) { */ @Subscribe public synchronized void renew(final DatabaseDataDeletedEvent event) { - contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereDatabaseData(event.getDatabaseName()); + databaseManager.dropShardingSphereDatabaseData(event.getDatabaseName()); } /** @@ -64,7 +67,7 @@ public synchronized void renew(final DatabaseDataDeletedEvent event) { */ @Subscribe public synchronized void renew(final SchemaDataAddedEvent event) { - contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName()); + databaseManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName()); } /** @@ -74,7 +77,7 @@ public synchronized void renew(final SchemaDataAddedEvent event) { */ @Subscribe public synchronized void renew(final SchemaDataDeletedEvent event) { - contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName()); + databaseManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName()); } /** @@ -85,10 +88,10 @@ public synchronized void renew(final SchemaDataDeletedEvent event) { @Subscribe public synchronized void renew(final TableDataChangedEvent event) { if (null != event.getAddedTable()) { - contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable()); + databaseManager.addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable()); } if (null != event.getDeletedTable()) { - contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable()); + databaseManager.dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable()); } } @@ -99,7 +102,7 @@ public synchronized void renew(final TableDataChangedEvent event) { */ @Subscribe public synchronized void renew(final ShardingSphereRowDataChangedEvent event) { - contextManager.getMetaDataContextManager().getDatabaseManager().alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData()); + databaseManager.alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData()); } /** @@ -109,6 +112,6 @@ public synchronized void renew(final ShardingSphereRowDataChangedEvent event) { */ @Subscribe public synchronized void renew(final ShardingSphereRowDataDeletedEvent event) { - contextManager.getMetaDataContextManager().getDatabaseManager().deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey()); + databaseManager.deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java index eeff01817b06a..d9ff986b01e3d 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java @@ -22,9 +22,9 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.mode.event.dispatch.config.AlterGlobalRuleConfigurationEvent; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; import org.apache.shardingsphere.mode.spi.RuleConfigurationPersistDecorator; import java.util.Optional; @@ -33,7 +33,7 @@ * Global rule configuration event subscriber. */ @RequiredArgsConstructor -public final class GlobalRuleConfigurationEventSubscriber implements EventSubscriber { +public final class GlobalRuleConfigurationEventSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java index cb09c7d43785e..c2d5b80768fd5 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java @@ -20,13 +20,13 @@ import com.google.common.eventbus.Subscribe; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; import org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent; import org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListenerAssistedEvent; import org.apache.shardingsphere.mode.lock.GlobalLockContext; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.DatabaseMetaDataChangedListener; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService; import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; @@ -35,7 +35,7 @@ * Listener assisted subscriber. */ @RequiredArgsConstructor -public final class ListenerAssistedSubscriber implements EventSubscriber { +public final class ListenerAssistedSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java index d096dbd732536..7ed94f407b9f7 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java @@ -22,7 +22,6 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent; import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent; import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent; @@ -31,6 +30,7 @@ import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent; import org.apache.shardingsphere.mode.lock.GlobalLockContext; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService; import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; @@ -38,7 +38,7 @@ /** * Meta data changed subscriber. */ -public final class MetaDataChangedSubscriber implements EventSubscriber { +public final class MetaDataChangedSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java index ba8bd0a3cf3a3..b64dec3645df8 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java @@ -18,20 +18,20 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; import com.google.common.eventbus.Subscribe; -import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.executor.sql.process.Process; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.metadata.persist.node.ComputeNode; import org.apache.shardingsphere.metadata.persist.node.ProcessNode; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessCompletedEvent; import org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessEvent; import org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesCompletedEvent; import org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; +import org.apache.shardingsphere.mode.spi.PersistRepository; import java.sql.SQLException; import java.sql.Statement; @@ -40,12 +40,19 @@ /** * Process list changed subscriber. */ -@RequiredArgsConstructor -public final class ProcessListChangedSubscriber implements EventSubscriber { +public final class ProcessListChangedSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; - private final YamlProcessListSwapper swapper = new YamlProcessListSwapper(); + private final PersistRepository repository; + + private final YamlProcessListSwapper swapper; + + public ProcessListChangedSubscriber(final ContextManager contextManager) { + this.contextManager = contextManager; + repository = contextManager.getPersistServiceFacade().getRepository(); + swapper = new YamlProcessListSwapper(); + } /** * Report local processes. @@ -59,10 +66,9 @@ public void reportLocalProcesses(final ReportLocalProcessesEvent event) { } Collection processes = ProcessRegistry.getInstance().listAll(); if (!processes.isEmpty()) { - contextManager.getPersistServiceFacade().getRepository().persist( - ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes))); + repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes))); } - contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId())); + repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId())); } /** @@ -93,7 +99,7 @@ public synchronized void killLocalProcess(final KillLocalProcessEvent event) thr each.cancel(); } } - contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId())); + repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId())); } /** diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java index 9f20377a7d0e7..0665147e7033e 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java @@ -20,15 +20,15 @@ import com.google.common.base.Preconditions; import com.google.common.eventbus.Subscribe; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.mode.event.dispatch.config.AlterPropertiesEvent; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; /** * Properties event subscriber. */ @RequiredArgsConstructor -public final class PropertiesEventSubscriber implements EventSubscriber { +public final class PropertiesEventSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java index a354fa0a0ca30..2de68b4e98b98 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java @@ -23,15 +23,15 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource; import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.event.dispatch.state.storage.QualifiedDataSourceStateEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; /** * Qualified data source subscriber. */ @RequiredArgsConstructor -public class QualifiedDataSourceSubscriber implements EventSubscriber { +public class QualifiedDataSourceSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java index 3d84d50e1a030..de4bcd1c403fc 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java @@ -21,8 +21,9 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.mode.event.dispatch.rule.alter.AlterRuleItemEvent; import org.apache.shardingsphere.mode.event.dispatch.rule.drop.DropRuleItemEvent; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; +import org.apache.shardingsphere.mode.metadata.manager.RuleItemManager; import java.sql.SQLException; @@ -30,9 +31,13 @@ * Rule item changed subscriber. */ @RequiredArgsConstructor -public final class RuleItemChangedSubscriber implements EventSubscriber { +public final class RuleItemChangedSubscriber implements DispatchEventSubscriber { - private final ContextManager contextManager; + private final RuleItemManager ruleItemManager; + + public RuleItemChangedSubscriber(final ContextManager contextManager) { + ruleItemManager = contextManager.getMetaDataContextManager().getRuleItemManager(); + } /** * Renew with alter rule item. @@ -42,7 +47,7 @@ public final class RuleItemChangedSubscriber implements EventSubscriber { */ @Subscribe public void renew(final AlterRuleItemEvent event) throws SQLException { - contextManager.getMetaDataContextManager().getRuleItemManager().alterRuleItem(event); + ruleItemManager.alterRuleItem(event); } /** @@ -53,6 +58,6 @@ public void renew(final AlterRuleItemEvent event) throws SQLException { */ @Subscribe public void renew(final DropRuleItemEvent event) throws SQLException { - contextManager.getMetaDataContextManager().getRuleItemManager().dropRuleItem(event); + ruleItemManager.dropRuleItem(event); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java index 05f0d9d2cb5d0..809b3bc400005 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java @@ -18,21 +18,19 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; import com.google.common.eventbus.Subscribe; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.mode.event.dispatch.state.cluster.ClusterStateEvent; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; /** * State changed subscriber. */ -public final class StateChangedSubscriber implements EventSubscriber { +@RequiredArgsConstructor +public final class StateChangedSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager; - public StateChangedSubscriber(final ContextManager contextManager) { - this.contextManager = contextManager; - } - /** * Renew cluster state. * diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java index ea167f1e09402..cf8627d6d9dac 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java @@ -21,11 +21,11 @@ import com.google.common.eventbus.Subscribe; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent; import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent; import org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent; import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; import java.util.Collections; @@ -33,7 +33,7 @@ * Storage unit event subscriber. */ @RequiredArgsConstructor -public final class StorageUnitEventSubscriber implements EventSubscriber { +public final class StorageUnitEventSubscriber implements DispatchEventSubscriber { private final ContextManager contextManager;