Skip to content

Commit

Permalink
Add DispatchEventSubscriber (#34069)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 15, 2024
1 parent ed0db53 commit 257fcd9
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber;

import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;

/**
* Dispatch event subscriber.
*/
public interface DispatchEventSubscriber extends EventSubscriber {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
import org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;

/**
* Cache evicted subscriber.
*/
public final class CacheEvictedSubscriber implements EventSubscriber {
public final class CacheEvictedSubscriber implements DispatchEventSubscriber {

/**
* Callback of any {@link DispatchEvent}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,38 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.ComputeNodeInstanceStateChangedEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.LabelsEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.WorkerIdEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOfflineEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOnlineEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;

/**
* Compute node state subscriber.
*/
@RequiredArgsConstructor
public final class ComputeNodeStateSubscriber implements EventSubscriber {
public final class ComputeNodeStateSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

private final ComputeNodeInstanceContext computeNodeInstanceContext;

public ComputeNodeStateSubscriber(final ContextManager contextManager) {
this.contextManager = contextManager;
computeNodeInstanceContext = contextManager.getComputeNodeInstanceContext();
}

/**
* Renew instance list.
*
* @param event compute node online event
*/
@Subscribe
public synchronized void renew(final InstanceOnlineEvent event) {
contextManager.getComputeNodeInstanceContext().addComputeNodeInstance(
contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
computeNodeInstanceContext.addComputeNodeInstance(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
}

/**
Expand All @@ -54,7 +59,7 @@ public synchronized void renew(final InstanceOnlineEvent event) {
*/
@Subscribe
public synchronized void renew(final InstanceOfflineEvent event) {
contextManager.getComputeNodeInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
computeNodeInstanceContext.deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
}

/**
Expand All @@ -64,7 +69,7 @@ public synchronized void renew(final InstanceOfflineEvent event) {
*/
@Subscribe
public synchronized void renew(final ComputeNodeInstanceStateChangedEvent event) {
contextManager.getComputeNodeInstanceContext().updateStatus(event.getInstanceId(), event.getStatus());
computeNodeInstanceContext.updateStatus(event.getInstanceId(), event.getStatus());
}

/**
Expand All @@ -74,7 +79,7 @@ public synchronized void renew(final ComputeNodeInstanceStateChangedEvent event)
*/
@Subscribe
public synchronized void renew(final WorkerIdEvent event) {
contextManager.getComputeNodeInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId());
computeNodeInstanceContext.updateWorkerId(event.getInstanceId(), event.getWorkerId());
}

/**
Expand All @@ -85,6 +90,6 @@ public synchronized void renew(final WorkerIdEvent event) {
@Subscribe
public synchronized void renew(final LabelsEvent event) {
// TODO labels may be empty
contextManager.getComputeNodeInstanceContext().updateLabels(event.getInstanceId(), event.getLabels());
computeNodeInstanceContext.updateLabels(event.getInstanceId(), event.getLabels());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataAddedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataAddedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataChangedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.data.TableDataChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.metadata.manager.ShardingSphereDatabaseDataManager;

/**
* Database data changed subscriber.
*/
@RequiredArgsConstructor
public final class DatabaseDataChangedSubscriber implements EventSubscriber {
public final class DatabaseDataChangedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;
private final ShardingSphereDatabaseDataManager databaseManager;

public DatabaseDataChangedSubscriber(final ContextManager contextManager) {
databaseManager = contextManager.getMetaDataContextManager().getDatabaseManager();
}

/**
* Renew to persist ShardingSphere database data.
Expand All @@ -44,7 +47,7 @@ public final class DatabaseDataChangedSubscriber implements EventSubscriber {
*/
@Subscribe
public synchronized void renew(final DatabaseDataAddedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereDatabaseData(event.getDatabaseName());
databaseManager.addShardingSphereDatabaseData(event.getDatabaseName());
}

/**
Expand All @@ -54,7 +57,7 @@ public synchronized void renew(final DatabaseDataAddedEvent event) {
*/
@Subscribe
public synchronized void renew(final DatabaseDataDeletedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereDatabaseData(event.getDatabaseName());
databaseManager.dropShardingSphereDatabaseData(event.getDatabaseName());
}

/**
Expand All @@ -64,7 +67,7 @@ public synchronized void renew(final DatabaseDataDeletedEvent event) {
*/
@Subscribe
public synchronized void renew(final SchemaDataAddedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
databaseManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
}

/**
Expand All @@ -74,7 +77,7 @@ public synchronized void renew(final SchemaDataAddedEvent event) {
*/
@Subscribe
public synchronized void renew(final SchemaDataDeletedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
databaseManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
}

/**
Expand All @@ -85,10 +88,10 @@ public synchronized void renew(final SchemaDataDeletedEvent event) {
@Subscribe
public synchronized void renew(final TableDataChangedEvent event) {
if (null != event.getAddedTable()) {
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable());
databaseManager.addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable());
}
if (null != event.getDeletedTable()) {
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
databaseManager.dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
}
}

Expand All @@ -99,7 +102,7 @@ public synchronized void renew(final TableDataChangedEvent event) {
*/
@Subscribe
public synchronized void renew(final ShardingSphereRowDataChangedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
databaseManager.alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
}

/**
Expand All @@ -109,6 +112,6 @@ public synchronized void renew(final ShardingSphereRowDataChangedEvent event) {
*/
@Subscribe
public synchronized void renew(final ShardingSphereRowDataDeletedEvent event) {
contextManager.getMetaDataContextManager().getDatabaseManager().deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey());
databaseManager.deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.config.AlterGlobalRuleConfigurationEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.spi.RuleConfigurationPersistDecorator;

import java.util.Optional;
Expand All @@ -33,7 +33,7 @@
* Global rule configuration event subscriber.
*/
@RequiredArgsConstructor
public final class GlobalRuleConfigurationEventSubscriber implements EventSubscriber {
public final class GlobalRuleConfigurationEventSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
import org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListenerAssistedEvent;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.DatabaseMetaDataChangedListener;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand All @@ -35,7 +35,7 @@
* Listener assisted subscriber.
*/
@RequiredArgsConstructor
public final class ListenerAssistedSubscriber implements EventSubscriber {
public final class ListenerAssistedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent;
Expand All @@ -31,14 +30,15 @@
import org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

/**
* Meta data changed subscriber.
*/
public final class MetaDataChangedSubscriber implements EventSubscriber {
public final class MetaDataChangedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessCompletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesCompletedEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;

import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -40,12 +40,19 @@
/**
* Process list changed subscriber.
*/
@RequiredArgsConstructor
public final class ProcessListChangedSubscriber implements EventSubscriber {
public final class ProcessListChangedSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();
private final PersistRepository repository;

private final YamlProcessListSwapper swapper;

public ProcessListChangedSubscriber(final ContextManager contextManager) {
this.contextManager = contextManager;
repository = contextManager.getPersistServiceFacade().getRepository();
swapper = new YamlProcessListSwapper();
}

/**
* Report local processes.
Expand All @@ -59,10 +66,9 @@ public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
}
Collection<Process> processes = ProcessRegistry.getInstance().listAll();
if (!processes.isEmpty()) {
contextManager.getPersistServiceFacade().getRepository().persist(
ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
}
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
}

/**
Expand Down Expand Up @@ -93,7 +99,7 @@ public synchronized void killLocalProcess(final KillLocalProcessEvent event) thr
each.cancel();
}
}
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.dispatch.config.AlterPropertiesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;

/**
* Properties event subscriber.
*/
@RequiredArgsConstructor
public final class PropertiesEventSubscriber implements EventSubscriber {
public final class PropertiesEventSubscriber implements DispatchEventSubscriber {

private final ContextManager contextManager;

Expand Down
Loading

0 comments on commit 257fcd9

Please sign in to comment.