Skip to content

Commit

Permalink
Use ProcessPersistService instead of ProcessListEvent and remove Proc…
Browse files Browse the repository at this point in the history
…essSubscriber (#31411)
  • Loading branch information
menghaoranss authored May 27, 2024
1 parent da31876 commit d0abb75
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 352 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
Expand Down Expand Up @@ -97,7 +96,6 @@ private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepo
// TODO remove the method, only keep ZooKeeper's events, remove all decouple events
private void createSubscribers(final EventBusContext eventBusContext, final ClusterPersistRepository repository) {
eventBusContext.register(new QualifiedDataSourceStatusSubscriber(repository));
eventBusContext.register(new ClusterProcessSubscriber(repository, eventBusContext));
}

private void registerOnline(final EventBusContext eventBusContext, final ComputeNodeInstanceContext computeNodeInstanceContext,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
package org.apache.shardingsphere.mode.manager.cluster.service;

import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -35,26 +33,32 @@
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ClusterProcessSubscriberTest {
class ClusterProcessPersistServiceTest {

@Mock
private ClusterPersistRepository repository;

private final EventBusContext eventBusContext = new EventBusContext();

private ClusterProcessSubscriber clusterProcessListSubscriber;
private ClusterProcessPersistService processPersistService;

@BeforeEach
void setUp() {
clusterProcessListSubscriber = new ClusterProcessSubscriber(repository, eventBusContext);
processPersistService = new ClusterProcessPersistService(repository);
}

@Test
void assertPostShowProcessListData() {
void getProcessList() {
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
when(repository.query(any())).thenReturn(null);
clusterProcessListSubscriber.postShowProcessListData(new ShowProcessListRequestEvent());
processPersistService.getProcessList();
verify(repository).persist(any(), any());
}

@Test
void killProcess() {
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
processPersistService.killProcess("foo_process_id");
verify(repository).persist(any(), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
public final class StandaloneEventSubscriberRegistry extends EventSubscriberRegistry {

public StandaloneEventSubscriberRegistry(final ContextManager contextManager) {
super(contextManager,
new StandaloneProcessSubscriber(contextManager.getComputeNodeInstanceContext().getEventBusContext()),
new RuleItemChangedSubscriber(contextManager));
super(contextManager, new RuleItemChangedSubscriber(contextManager));
}
}

This file was deleted.

Loading

0 comments on commit d0abb75

Please sign in to comment.