Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #10797]Enhance the registration logic. #11228

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.packagescan.resource.Resource;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.ThreadFactoryBuilder;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
Expand All @@ -54,16 +54,15 @@
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;

import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -356,56 +355,59 @@ public Connection connectToServer(ServerInfo serverInfo) {
int port = serverInfo.getServerPort() + rpcPortOffset();
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
if (newChannelStubTemp != null) {

Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);

Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (!(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
return null;
}
// submit ability table as soon as possible
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
}

//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel(managedChannel);
//send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(
NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
boolean waitForResponse = recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(),
TimeUnit.MILLISECONDS);
if (!waitForResponse) {
// haven't received a response for registration; need to register again.
return null;
}

// submit ability table as soon as possible
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
}

//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel(managedChannel);
//send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS);
} else {
// leave for adapting old version server
// wait to register connection setup
Thread.sleep(100L);
}
return grpcConn;
} else {
// leave for adapting old version server
// registration is considered successful by default after 100ms
// wait to register connection setup
Thread.sleep(100L);
}
return null;
return grpcConn;
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
// remove and notify
Expand All @@ -425,17 +427,17 @@ public Connection connectToServer(ServerInfo serverInfo) {
protected void afterReset(ConnectResetRequest request) {
recAbilityContext.release(null);
}

/**
* This is for receiving server abilities.
*/
class RecAbilityContext {

static class RecAbilityContext {
/**
* connection waiting for server abilities.
*/
private volatile Connection connection;

/**
* way to block client.
*/
Expand Down Expand Up @@ -484,19 +486,24 @@ public void release(Map<String, Boolean> abilities) {
}
this.needToSync = false;
}

/**
* await for abilities.
* Wait for a specified duration for a condition to be met.
*
* @param timeout timeout.
* @param unit unit.
* @throws InterruptedException by blocker.
* @param timeout The maximum time to wait.
* @param unit The time unit for the timeout.
* @return true if the condition was successfully awaited, false otherwise.
* @throws InterruptedException if the waiting thread is interrupted.
*/
public void await(long timeout, TimeUnit unit) throws InterruptedException {
if (this.blocker != null) {
this.blocker.await(timeout, unit);
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (blocker != null) {
boolean waitForResponse = blocker.await(timeout, unit);
if (waitForResponse) {
needToSync = false;
}
return waitForResponse;
}
this.needToSync = false;
return false;
}
}

Expand Down