diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 129afaf9d5..9678d9e813 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -177,8 +177,13 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { Pair regionAndLeader = pdClient.getRegionByKey(backOffer, key); region = cache.putRegion(createRegion(regionAndLeader.first, regionAndLeader.second, backOffer)); + logger.debug( + String.format( + "get region id: %d with leader: %d", + region.getId(), region.getLeader().getStoreId())); } } catch (Exception e) { + logger.warn("Get region failed: ", e); return null; } finally { requestTimer.observeDuration(); @@ -240,6 +245,10 @@ public Pair getRegionStorePairByKey( } logger.info("Store {} is unreachable, try to get the next replica", peer.getStoreId()); } + // Does not set unreachable store to null in case it is incompatible with GrpcForward + if (store == null || !store.isReachable()) { + logger.warn("No TiKV store available for region: " + region); + } } else { List tiflashStores = new ArrayList<>(); for (Peer peer : region.getLearnerList()) { @@ -247,11 +256,8 @@ public Pair getRegionStorePairByKey( if (!s.isReachable()) { continue; } - for (Metapb.StoreLabel label : s.getStore().getLabelsList()) { - if (label.getKey().equals(storeType.getLabelKey()) - && label.getValue().equals(storeType.getLabelValue())) { - tiflashStores.add(s); - } + if (s.isTiFlash()) { + tiflashStores.add(s); } } // select a tiflash with Round-Robin strategy diff --git a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java index 8d305649c4..3ae3f40d1f 100644 --- a/src/main/java/org/tikv/common/region/StoreHealthyChecker.java +++ b/src/main/java/org/tikv/common/region/StoreHealthyChecker.java @@ -20,17 +20,22 @@ import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.ClientCalls; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.ReadOnlyPDClient; import org.tikv.common.util.ChannelFactory; import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Mpp; +import org.tikv.kvproto.Mpp.IsAliveRequest; +import org.tikv.kvproto.TikvGrpc; public class StoreHealthyChecker implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StoreHealthyChecker.class); @@ -75,6 +80,30 @@ private List getValidStores() { private boolean checkStoreHealth(TiStore store) { String addressStr = store.getStore().getAddress(); + if (store.isTiFlash()) { + return checkTiFlashHealth(addressStr); + } + return checkTiKVHealth(addressStr); + } + + private boolean checkTiFlashHealth(String addressStr) { + try { + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); + TikvGrpc.TikvBlockingStub stub = + TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS); + Supplier factory = () -> Mpp.IsAliveRequest.newBuilder().build(); + Mpp.IsAliveResponse resp = + ClientCalls.blockingUnaryCall( + stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get()); + return resp != null && resp.getAvailable(); + } catch (Exception e) { + logger.info( + "fail to check TiFlash health, regard as unhealthy. TiFlash address: " + addressStr, e); + return false; + } + } + + private boolean checkTiKVHealth(String addressStr) { try { ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); HealthGrpc.HealthBlockingStub stub = @@ -83,6 +112,7 @@ private boolean checkStoreHealth(TiStore store) { HealthCheckResponse resp = stub.check(req); return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING; } catch (Exception e) { + logger.info("fail to check TiKV health, regard as unhealthy. TiKV address: " + addressStr, e); return false; } } diff --git a/src/main/java/org/tikv/common/region/TiStore.java b/src/main/java/org/tikv/common/region/TiStore.java index 8513e2b56e..5feaa246fe 100644 --- a/src/main/java/org/tikv/common/region/TiStore.java +++ b/src/main/java/org/tikv/common/region/TiStore.java @@ -105,4 +105,14 @@ public Metapb.Store getProxyStore() { public long getId() { return this.store.getId(); } + + public boolean isTiFlash() { + for (Metapb.StoreLabel label : store.getLabelsList()) { + if (label.getKey().equals(TiStoreType.TiFlash.getLabelKey()) + && label.getValue().equals(TiStoreType.TiFlash.getLabelValue())) { + return true; + } + } + return false; + } }