Skip to content

Commit

Permalink
[BWC] fix mixedCluster and rolling upgrades (#775) (#793)
Browse files Browse the repository at this point in the history
This commit fixes mixedCluster and rolling upgrades by spoofing OpenSearch
version 1.0.0 as Legacy version 7.10.2. With this commit an OpenSearch 1.x node
can join a legacy (<= 7.10.2) cluster and rolling upgrades work as expected.
Mixed clusters will not work beyond the duration of the upgrade since shards
cannot be replicated from upgraded nodes to nodes running older versions.

Signed-off-by: Nicholas Walter Knize <[email protected]>

Co-authored-by: Shweta Thareja <[email protected]>
  • Loading branch information
nknize and Shweta Thareja committed May 28, 2021
1 parent dbce367 commit 26d5792
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public HttpEntity getEntity() {
* @return {@code true} if the input string matches the specification
*/
private static boolean matchWarningHeaderPatternByPrefix(final String s) {
return s.startsWith("299 OpenSearch-");
return s.startsWith("299 OpenSearch-") || s.startsWith("299 Elasticsearch-");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@

- do:
# sometimes IIS is listening on port 0. In that case we fail in other ways and this test isn't useful.
# make sure to stop any local webservers if running this test locally otherwise an s_s_l handshake exception may occur
catch: /connect_exception|IIS Windows Server/
reindex:
body:
Expand Down
3 changes: 3 additions & 0 deletions qa/full-cluster-restart/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ apply plugin: 'opensearch.standalone-test'
apply from : "$rootDir/gradle/bwc-test.gradle"

for (Version bwcVersion : BuildParams.bwcVersions.indexCompatible) {
if (bwcVersion.before('6.3.0')) {
continue;
}
String baseName = "v${bwcVersion}"

testClusters {
Expand Down
1 change: 1 addition & 0 deletions qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
println "Upgrade complete, endpoints are: ${-> testClusters."${baseName}".allHttpSocketURI.join(",")}"
println "Upgrading another node to create a mixed cluster"
testClusters."${baseName}".nextNodeToNextVersion()
println "Upgrading complete, endpoints are: ${-> testClusters."${baseName}".allHttpSocketURI.join(",")}"

nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
Expand Down
3 changes: 3 additions & 0 deletions qa/repository-multi-version/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ dependencies {
}

for (Version bwcVersion : BuildParams.bwcVersions.indexCompatible) {
if (bwcVersion.before('6.3.0')) {
continue;
}
String baseName = "v${bwcVersion}"
String oldClusterName = "${baseName}-old"
String newClusterName = "${baseName}-new"
Expand Down
3 changes: 3 additions & 0 deletions qa/verify-version-constants/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ apply plugin: 'opensearch.standalone-test'
apply from : "$rootDir/gradle/bwc-test.gradle"

for (Version bwcVersion : BuildParams.bwcVersions.indexCompatible) {
if (bwcVersion.before('6.3.0')) {
continue;
}
String baseName = "v${bwcVersion}"

testClusters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ setup:
match_all: {}

- is_false: valid
- match: {error: 'org.opensearch.common.ParsingException: request does not support [match_all]'}
- match: {error: '/request\sdoes\snot\ssupport\s\[match_all\]/'}
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private static Version fromStringSlow(String version) {
String[] parts = version.split("[.-]");
// todo: add back optional build number
if (parts.length != 3) {
throw new IllegalArgumentException("the version needs to contain major, minor, and revision" + version);
throw new IllegalArgumentException("the version needs to contain major, minor, and revision: " + version);
}

try {
Expand Down Expand Up @@ -287,7 +287,7 @@ public Version minimumCompatibilityVersion() {

protected Version computeMinCompatVersion() {
if (major == 1) {
return Version.fromId(6080099);
return LegacyESVersion.V_6_8_0;
} else if (major == 6) {
// force the minimum compatibility for version 6 to 5.6 since we don't reference version 5 anymore
return Version.fromId(5060099);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ private <T extends ReportingService.Info> void addInfoIfNonNull(Class<T> clazz,
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(version.id);
if (out.getVersion().before(Version.V_1_0_0)) {
out.writeVInt(LegacyESVersion.V_7_10_2.id);
} else {
out.writeVInt(version.id);
}
Build.writeBuild(build, out);
if (totalIndexingBuffer == null) {
out.writeBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,11 @@ public void writeTo(StreamOutput out) throws IOException {
}
}
}
Version.writeVersion(version, out);
if (out.getVersion().before(Version.V_1_0_0)) {
Version.writeVersion(LegacyESVersion.V_7_10_2, out);
} else {
Version.writeVersion(version, out);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class HeaderWarning {
*/
public static final Pattern WARNING_HEADER_PATTERN = Pattern.compile(
"299 " + // warn code
"OpenSearch-" + // warn agent
"(?:Elasticsearch-|OpenSearch-)" + // warn agent (note: Elasticsearch needed for bwc mixedCluster testing)
"\\d+\\.\\d+\\.\\d+(?:-(?:alpha|beta|rc)\\d+)?(?:-SNAPSHOT)?-" + // warn agent
"(?:[a-f0-9]{7}(?:[a-f0-9]{33})?|unknown) " + // warn agent
"\"((?:\t| |!|[\\x23-\\x5B]|[\\x5D-\\x7E]|[\\x80-\\xFF]|\\\\|\\\\\")*)\"( " + // quoted warning value, captured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.transport;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -79,7 +80,18 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV
// for the request we use the minCompatVersion since we don't know what's the version of the node we talk to
// we also have no payload on the request but the response will contain the actual version of the node we talk
// to as the payload.
final Version minCompatVersion = version.minimumCompatibilityVersion();
Version minCompatVersion = version.minimumCompatibilityVersion();
if(version.onOrAfter(Version.V_1_0_0)) {
// the minCompatibleVersion for OpenSearch 1.x is sent as 6.7.99 instead of 6.8.0
// as this helps in (indirectly) identifying the remote node version during handle HandshakeRequest itself
// and then send appropriate version (7.10.2/ OpenSearch 1.x version) in response.
// The advantage of doing this is early identification of remote node version as otherwise
// if OpenSearch node also sends 6.8.0, there is no way to differentiate ES 7.x version from
// OpenSearch version and OpenSearch node will end up sending BC version to both ES & OpenSearch remote node.
// Sending only BC version to ElasticSearch node provide easy deprecation path for this BC version logic
// in OpenSearch 2.0.0.
minCompatVersion = Version.fromId(6079999);
}
handshakeRequestSender.sendRequest(node, channel, requestId, minCompatVersion);

threadPool.schedule(
Expand All @@ -105,7 +117,16 @@ void handleHandshake(TransportChannel channel, long requestId, StreamInput strea
throw new IllegalStateException("Handshake request not fully read for requestId [" + requestId + "], action ["
+ TransportHandshaker.HANDSHAKE_ACTION_NAME + "], available [" + stream.available() + "]; resetting");
}
channel.sendResponse(new HandshakeResponse(this.version));
// 1. if remote node is 7.x, then StreamInput version would be 6.8.0
// 2. if remote node is 6.8 then it would be 5.6.0
// 3. if remote node is OpenSearch 1.x then it would be 6.7.99
if(this.version.onOrAfter(Version.V_1_0_0) &&
(stream.getVersion().equals(LegacyESVersion.V_6_8_0)
|| stream.getVersion().equals(Version.fromId(5060099)))) {
channel.sendResponse(new HandshakeResponse(LegacyESVersion.V_7_10_2));
} else {
channel.sendResponse(new HandshakeResponse(this.version));
}
}

TransportResponseHandler<HandshakeResponse> removeHandlerForHandshake(long requestId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -531,14 +532,22 @@ public HandshakeResponse(StreamInput in) throws IOException {
super(in);
discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
clusterName = new ClusterName(in);
version = Version.readVersion(in);
Version tmpVersion = Version.readVersion(in);
if (in.getVersion().onOrBefore(LegacyESVersion.V_7_10_2)) {
tmpVersion = LegacyESVersion.V_7_10_2;
}
version = tmpVersion;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(discoveryNode);
clusterName.writeTo(out);
Version.writeVersion(version, out);
if (out.getVersion().before(Version.V_1_0_0)) {
Version.writeVersion(LegacyESVersion.V_7_10_2, out);
} else {
Version.writeVersion(version, out);
}
}

public DiscoveryNode getDiscoveryNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ public class ZenFaultDetectionTests extends OpenSearchTestCase {
protected ThreadPool threadPool;
private CircuitBreakerService circuitBreakerService;

protected static final Version version0 = Version.fromId(/*0*/99);
protected static final Version version0 = Version.fromId(6080099);
protected DiscoveryNode nodeA;
protected MockTransportService serviceA;
private Settings settingsA;

protected static final Version version1 = Version.fromId(199);
protected static final Version version1 = Version.fromId(7100099);
protected DiscoveryNode nodeB;
protected MockTransportService serviceB;
private Settings settingsB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testHandshakeRequestAndResponse() throws IOException {
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);

verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(node, channel, reqId, getMinCompatibilityVersionForHandshakeRequest());

assertFalse(versionFuture.isDone());

Expand All @@ -104,7 +104,7 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture());

verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(node, channel, reqId, getMinCompatibilityVersionForHandshakeRequest());

TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT);
BytesStreamOutput currentHandshakeBytes = new BytesStreamOutput();
Expand Down Expand Up @@ -140,7 +140,7 @@ public void testHandshakeError() throws IOException {
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);

verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(node, channel, reqId, getMinCompatibilityVersionForHandshakeRequest());

assertFalse(versionFuture.isDone());

Expand All @@ -155,7 +155,7 @@ public void testHandshakeError() throws IOException {
public void testSendRequestThrowsException() throws IOException {
PlainActionFuture<Version> versionFuture = PlainActionFuture.newFuture();
long reqId = randomLongBetween(1, 10);
Version compatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
Version compatibilityVersion = getMinCompatibilityVersionForHandshakeRequest();
doThrow(new IOException("boom")).when(requestSender).sendRequest(node, channel, reqId, compatibilityVersion);

handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
Expand All @@ -171,11 +171,18 @@ public void testHandshakeTimeout() throws IOException {
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);

verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion());
verify(requestSender).sendRequest(node, channel, reqId, getMinCompatibilityVersionForHandshakeRequest());

ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
assertThat(cte.getMessage(), containsString("handshake_timeout"));

assertNull(handshaker.removeHandlerForHandshake(reqId));
}

private Version getMinCompatibilityVersionForHandshakeRequest() {
if(Version.CURRENT.onOrAfter(Version.V_1_0_0) && Version.CURRENT.major == 1) {
return Version.fromId(6079999);
}
return Version.CURRENT.minimumCompatibilityVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.Constants;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
Expand Down Expand Up @@ -2002,7 +2003,8 @@ public void testHandshakeUpdatesVersion() throws IOException {
service.start();
service.acceptIncomingRequests();
TransportAddress address = service.boundAddress().publishAddress();
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", address, emptyMap(), emptySet(), Version.fromString("2.0.0"));
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", address, emptyMap(), emptySet(),
LegacyESVersion.fromString("2.0.0"));
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
Expand Down Expand Up @@ -2047,7 +2049,9 @@ public void testTcpHandshake() {
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
serviceA.getOriginalTransport().openConnection(node, connectionProfile, future);
try (Transport.Connection connection = future.actionGet()) {
assertEquals(connection.getVersion(), Version.CURRENT);
// OpenSearch sends a handshake version spoofed as Legacy version 7_10_2
// todo change for OpenSearch 2.0.0
assertEquals(LegacyESVersion.V_7_10_2, connection.getVersion());
}
}
}
Expand Down

0 comments on commit 26d5792

Please sign in to comment.