diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java index bca80f45bf791..70b74effc3dd7 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.mode.state; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.state.cluster.ClusterState; import java.util.concurrent.atomic.AtomicReference; @@ -29,6 +30,7 @@ public final class StateContext { private final AtomicReference currentState = new AtomicReference<>(ClusterState.OK); + @Getter private final StateService stateService; /** diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index 27f3f9a686008..ccb9b96a10027 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -35,7 +35,6 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStatusSubscriber; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber; @@ -49,6 +48,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties; +import org.apache.shardingsphere.mode.state.StateService; import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService; import java.sql.SQLException; @@ -74,7 +74,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev setContextManagerAware(result); createSubscribers(eventBusContext, repository); registerOnline(eventBusContext, instanceContext, repository, param, result); - setClusterState(repository, result); + setClusterState(result); return result; } @@ -121,13 +121,13 @@ private void registerOnline(final EventBusContext eventBusContext, final Instanc new ClusterEventSubscriberRegistry(contextManager, repository).register(); } - private void setClusterState(final ClusterPersistRepository repository, final ContextManager contextManager) { - ClusterStatusService clusterStatusService = new ClusterStatusService(repository); - Optional clusterState = clusterStatusService.load(); + private void setClusterState(final ContextManager contextManager) { + StateService stateService = contextManager.getStateContext().getStateService(); + Optional clusterState = stateService.load(); if (clusterState.isPresent()) { contextManager.updateClusterState(clusterState.get()); } else { - clusterStatusService.persist(ClusterState.OK); + stateService.persist(ClusterState.OK); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java deleted file mode 100644 index 19a29f473be68..0000000000000 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service; - -import com.google.common.base.Strings; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.state.cluster.ClusterState; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; -import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; - -import java.util.Optional; - -/** - * Cluster status service. - */ -@RequiredArgsConstructor -public final class ClusterStatusService { - - private final ClusterPersistRepository repository; - - /** - * Persist cluster state. - * - * @param state cluster state - */ - public void persist(final ClusterState state) { - if (Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStatusNodePath()))) { - repository.persist(ComputeNode.getClusterStatusNodePath(), state.name()); - } - } - - /** - * Load cluster state. - * - * @return cluster state - */ - public Optional load() { - String value = repository.getDirectly(ComputeNode.getClusterStatusNodePath()); - return Strings.isNullOrEmpty(value) ? Optional.empty() : Optional.of(ClusterState.valueOf(value)); - } -} diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java deleted file mode 100644 index fac728a5e3c78..0000000000000 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service; - -import org.apache.shardingsphere.infra.state.cluster.ClusterState; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; -import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class ClusterStatusServiceTest { - - @Mock - private ClusterPersistRepository repository; - - @Test - void assertPersistClusterStateWithoutPath() { - ClusterStatusService clusterStatusService = new ClusterStatusService(repository); - clusterStatusService.persist(ClusterState.OK); - verify(repository).persist(ComputeNode.getClusterStatusNodePath(), ClusterState.OK.name()); - } - - @Test - void assertPersistClusterStateWithPath() { - ClusterStatusService clusterStatusService = new ClusterStatusService(repository); - when(repository.getDirectly("/nodes/compute_nodes/status")).thenReturn(ClusterState.OK.name()); - clusterStatusService.persist(ClusterState.OK); - verify(repository, times(0)).persist(ComputeNode.getClusterStatusNodePath(), ClusterState.OK.name()); - } - - @Test - void assertLoadClusterStatus() { - new ClusterStatusService(repository).load(); - verify(repository).getDirectly(ComputeNode.getClusterStatusNodePath()); - } -}