Skip to content

Commit

Permalink
Add deprecation warning to TransportHandshaker (elastic#123188)
Browse files Browse the repository at this point in the history
This PR adds a deprecation warning log to the TransportHandshaker when connecting with nodes < v8.18.
  • Loading branch information
JVerwolf committed Feb 27, 2025
1 parent 223d50f commit 63436c2
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 14 deletions.
19 changes: 19 additions & 0 deletions docs/changelog/123188.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
pr: 123188
summary: Add deprecation warning to `TransportHandshaker`
area: Infra/Core
type: deprecation
issues: []
deprecation:
title: Deprecate ability to connect to nodes of versions 8.17 and earlier
area: REST API
details: >
Versions 9.0.0 and later of {es} will not support communication with nodes of versions earlier than 8.18.0,
so the ability to connect to nodes of earlier versions is deprecated in this version. This applies both to
communication within a cluster and communication across clusters (e.g. for <<modules-cross-cluster-search,{ccs}>> or
<<xpack-ccr,{ccr}>>).
{es} will report in its <<deprecation-logging, deprecation logging>> each time it opens a connection to a node that
will not be supported from version 9.0.0 onwards. You must upgrade all your clusters to version 8.18.0 or later
before upgrading any of your clusters to 9.0.0 or later.
impact: >
Upgrade all of your clusters to at least 8.18.0 before upgrading any of them to 9.0.0 or later.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@

import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
Expand Down Expand Up @@ -160,6 +164,7 @@ final class TransportHandshaker {
*/

private static final Logger logger = LogManager.getLogger(TransportHandshaker.class);
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(logger.getName());

static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion.fromId(6_08_00_99);
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
Expand All @@ -171,6 +176,7 @@ final class TransportHandshaker {
);

static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
static final TransportVersion V8_18_FIRST_VERSION = TransportVersions.INDEXING_PRESSURE_THROTTLING_STATS;
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final CounterMetric numHandshakes = new CounterMetric();

Expand Down Expand Up @@ -246,17 +252,34 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
assert ignoreDeserializationErrors : exception;
throw exception;
}
ensureCompatibleVersion(version, handshakeRequest.transportVersion, handshakeRequest.releaseVersion, channel);
ensureCompatibleVersion(
version,
handshakeRequest.transportVersion,
handshakeRequest.releaseVersion,
channel,
threadPool.getThreadContext()
);
channel.sendResponse(new HandshakeResponse(this.version, Build.current().version()));
}

static void ensureCompatibleVersion(
TransportVersion localTransportVersion,
TransportVersion remoteTransportVersion,
String releaseVersion,
Object channel
String remoteReleaseVersion,
Object channel,
ThreadContext threadContext
) {
if (TransportVersion.isCompatible(remoteTransportVersion)) {
// Prevent log message headers from being added to the handshake response.
try (var ignored = threadContext.stashContext()) {
if (remoteTransportVersion.before(V8_18_FIRST_VERSION)) {
deprecationLogger.warn(
DeprecationCategory.OTHER,
"handshake_version",
getDeprecationMessage(localTransportVersion, remoteTransportVersion, remoteReleaseVersion, channel)
);
}
}
if (remoteTransportVersion.onOrAfter(localTransportVersion)) {
// Remote is newer than us, so we will be using our transport protocol and it's up to the other end to decide whether it
// knows how to do that.
Expand All @@ -273,7 +296,7 @@ static void ensureCompatibleVersion(
"""
Rejecting unreadable transport handshake from remote node with version [%s/%s] received on [%s] since this node has \
version [%s/%s] which has an incompatible wire format.""",
releaseVersion,
remoteReleaseVersion,
remoteTransportVersion,
channel,
Build.current().version(),
Expand All @@ -284,6 +307,24 @@ static void ensureCompatibleVersion(

}

// Non-private for testing
static String getDeprecationMessage(
TransportVersion localTransportVersion,
TransportVersion remoteTransportVersion,
String remoteReleaseVersion,
Object channel
) {
return Strings.format(
"Performed a handshake with a remote node with version [%s/%s] received on [%s] which "
+ "will be incompatible after this node on version [%s/%s] is upgraded to 9.x.",
remoteReleaseVersion,
remoteTransportVersion,
channel,
Build.current().version(),
localTransportVersion
);
}

TransportResponseHandler<HandshakeResponse> removeHandlerForHandshake(long requestId) {
return pendingHandshakes.remove(requestId);
}
Expand Down Expand Up @@ -323,7 +364,13 @@ public Executor executor() {
public void handleResponse(HandshakeResponse response) {
if (isDone.compareAndSet(false, true)) {
ActionListener.completeWith(listener, () -> {
ensureCompatibleVersion(version, response.getTransportVersion(), response.getReleaseVersion(), channel);
ensureCompatibleVersion(
version,
response.getTransportVersion(),
response.getReleaseVersion(),
channel,
threadPool.getThreadContext()
);
final var resultVersion = TransportVersion.min(TransportHandshaker.this.version, response.getTransportVersion());
assert TransportVersion.current().before(version) // simulating a newer-version transport service for test purposes
|| resultVersion.isKnown() : "negotiated unknown version " + resultVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.transport.TransportHandshaker.V8_18_FIRST_VERSION;
import static org.elasticsearch.transport.TransportHandshaker.getDeprecationMessage;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -100,10 +102,8 @@ public void testHandshakeRequestAndResponse() throws IOException {

@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
public void testIncompatibleHandshakeRequest() throws IOException {
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(
getRandomIncompatibleTransportVersion(),
randomIdentifier()
);
var remoteVersion = getRandomIncompatibleTransportVersion();
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(remoteVersion, randomIdentifier());
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
bytesStreamOutput.setTransportVersion(HANDSHAKE_REQUEST_VERSION);
handshakeRequest.writeTo(bytesStreamOutput);
Expand Down Expand Up @@ -132,6 +132,7 @@ public void testIncompatibleHandshakeRequest() throws IOException {
"Rejecting unreadable transport handshake * incompatible wire format."
)
);
assertDeprecationMessageIsLogged(remoteVersion, remoteVersion.toReleaseVersion(), channel);
}

public void testHandshakeResponseFromOlderNode() throws Exception {
Expand All @@ -143,10 +144,13 @@ public void testHandshakeResponseFromOlderNode() throws Exception {
assertFalse(versionFuture.isDone());

final var remoteVersion = TransportVersionUtils.randomCompatibleVersion(random());
handler.handleResponse(new TransportHandshaker.HandshakeResponse(remoteVersion, randomIdentifier()));
var releaseVersion = randomIdentifier();
handler.handleResponse(new TransportHandshaker.HandshakeResponse(remoteVersion, releaseVersion));

assertTrue(versionFuture.isDone());
assertEquals(remoteVersion, versionFuture.result());

assertDeprecationMessageIsLogged(remoteVersion, releaseVersion, channel);
}

@TestLogging(reason = "testing WARN logging", value = "org.elasticsearch.transport.TransportHandshaker:WARN")
Expand All @@ -158,10 +162,9 @@ public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {

assertFalse(versionFuture.isDone());

final var handshakeResponse = new TransportHandshaker.HandshakeResponse(
getRandomIncompatibleTransportVersion(),
randomIdentifier()
);
var remoteVersion = getRandomIncompatibleTransportVersion();
var releaseVersion = randomIdentifier();
final var handshakeResponse = new TransportHandshaker.HandshakeResponse(remoteVersion, releaseVersion);

MockLog.assertThatLogger(
() -> handler.handleResponse(handshakeResponse),
Expand All @@ -184,6 +187,13 @@ public void testHandshakeResponseFromOlderNodeWithPatchedProtocol() {
containsString("which has an incompatible wire format")
)
);
assertDeprecationMessageIsLogged(remoteVersion, releaseVersion, channel);
}

private void assertDeprecationMessageIsLogged(TransportVersion remoteVersion, String remoteReleaseVersion, Object channel) {
if (remoteVersion.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE) && remoteVersion.before(V8_18_FIRST_VERSION)) {
assertWarnings(getDeprecationMessage(TransportVersion.current(), remoteVersion, remoteReleaseVersion, channel));
}
}

private static TransportVersion getRandomIncompatibleTransportVersion() {
Expand Down

0 comments on commit 63436c2

Please sign in to comment.