Skip to content

Commit c72413e

Browse files
author
alexsa
committed
Add SubnetAddressTranslator to translate Cassandra node IPs from private network based on its subnet mask
1 parent c9facc3 commit c72413e

File tree

17 files changed

+915
-69
lines changed

17 files changed

+915
-69
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,48 @@ public enum DefaultDriverOption implements DriverOption {
994994
*
995995
* <p>Value-type: boolean
996996
*/
997-
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san");
997+
SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"),
998+
/**
999+
* An address to always translate all node addresses to that same proxy hostname no matter what IP
1000+
* address a node has, but still using its native transport port.
1001+
*
1002+
* <p>Value-Type: {@link String}
1003+
*/
1004+
ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME("advanced.address-translator.advertised-hostname"),
1005+
/**
1006+
* A map of Cassandra node subnets (CIDR notations) to target addresses, for example (note quoted
1007+
* keys):
1008+
*
1009+
* <pre>
1010+
* advanced.address-translator.subnet-addresses {
1011+
* "100.64.0.0/15" = "cassandra.datacenter1.com:9042"
1012+
* "100.66.0.0/15" = "cassandra.datacenter2.com:9042"
1013+
* # IPv6 example:
1014+
* # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042"
1015+
* # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042"
1016+
* }
1017+
* </pre>
1018+
*
1019+
* Note: subnets must be represented as prefix blocks, see {@link
1020+
* inet.ipaddr.Address#isPrefixBlock()}.
1021+
*
1022+
* <p>Value type: {@link java.util.Map Map}&#60;{@link String},{@link String}&#62;
1023+
*/
1024+
ADDRESS_TRANSLATOR_SUBNET_ADDRESSES("advanced.address-translator.subnet-addresses"),
1025+
/**
1026+
* A default address to fallback to if Cassandra node IP isn't contained in any of the configured
1027+
* subnets.
1028+
*
1029+
* <p>Value-Type: {@link String}
1030+
*/
1031+
ADDRESS_TRANSLATOR_DEFAULT_ADDRESS("advanced.address-translator.default-address"),
1032+
/**
1033+
* Whether to resolve the addresses on initialization (if true) or on each node (re-)connection
1034+
* (if false). Defaults to false.
1035+
*
1036+
* <p>Value-Type: boolean
1037+
*/
1038+
ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES("advanced.address-translator.resolve-addresses");
9981039

9991040
private final String path;
10001041

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,20 @@ public String toString() {
896896
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS,
897897
GenericType.BOOLEAN);
898898

899+
public static final TypedDriverOption<String> ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME =
900+
new TypedDriverOption<>(
901+
DefaultDriverOption.ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME, GenericType.STRING);
902+
public static final TypedDriverOption<Map<String, String>> ADDRESS_TRANSLATOR_SUBNET_ADDRESSES =
903+
new TypedDriverOption<>(
904+
DefaultDriverOption.ADDRESS_TRANSLATOR_SUBNET_ADDRESSES,
905+
GenericType.mapOf(GenericType.STRING, GenericType.STRING));
906+
public static final TypedDriverOption<String> ADDRESS_TRANSLATOR_DEFAULT_ADDRESS =
907+
new TypedDriverOption<>(
908+
DefaultDriverOption.ADDRESS_TRANSLATOR_DEFAULT_ADDRESS, GenericType.STRING);
909+
public static final TypedDriverOption<Boolean> ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES =
910+
new TypedDriverOption<>(
911+
DefaultDriverOption.ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES, GenericType.BOOLEAN);
912+
899913
/**
900914
* Ordered preference list of remote dcs optionally supplied for automatic failover and included
901915
* in query plan. This feature is enabled only when max-nodes-per-remote-dc is greater than 0.

core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java

Lines changed: 17 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919

2020
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2121
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
22+
import com.datastax.oss.driver.internal.core.util.AddressUtils;
2223
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
2324
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
24-
import java.net.InetAddress;
2525
import java.net.InetSocketAddress;
26-
import java.net.UnknownHostException;
27-
import java.util.Arrays;
2826
import java.util.Collections;
29-
import java.util.HashSet;
3027
import java.util.List;
3128
import java.util.Set;
3229
import org.slf4j.Logger;
@@ -41,7 +38,22 @@ public static Set<EndPoint> merge(
4138

4239
Set<EndPoint> result = Sets.newHashSet(programmaticContactPoints);
4340
for (String spec : configContactPoints) {
44-
for (InetSocketAddress address : extract(spec, resolve)) {
41+
42+
Set<InetSocketAddress> addresses = Collections.emptySet();
43+
try {
44+
addresses = AddressUtils.extract(spec, resolve);
45+
} catch (RuntimeException e) {
46+
LOG.warn("Ignoring invalid contact point {} ({})", spec, e.getMessage(), e);
47+
}
48+
49+
if (addresses.size() > 1) {
50+
LOG.info(
51+
"Contact point {} resolves to multiple addresses, will use them all ({})",
52+
spec,
53+
addresses);
54+
}
55+
56+
for (InetSocketAddress address : addresses) {
4557
DefaultEndPoint endPoint = new DefaultEndPoint(address);
4658
boolean wasNew = result.add(endPoint);
4759
if (!wasNew) {
@@ -51,43 +63,4 @@ public static Set<EndPoint> merge(
5163
}
5264
return ImmutableSet.copyOf(result);
5365
}
54-
55-
private static Set<InetSocketAddress> extract(String spec, boolean resolve) {
56-
int separator = spec.lastIndexOf(':');
57-
if (separator < 0) {
58-
LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec);
59-
return Collections.emptySet();
60-
}
61-
62-
String host = spec.substring(0, separator);
63-
String portSpec = spec.substring(separator + 1);
64-
int port;
65-
try {
66-
port = Integer.parseInt(portSpec);
67-
} catch (NumberFormatException e) {
68-
LOG.warn("Ignoring invalid contact point {} (expecting a number, got {})", spec, portSpec);
69-
return Collections.emptySet();
70-
}
71-
if (!resolve) {
72-
return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port));
73-
} else {
74-
try {
75-
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
76-
if (inetAddresses.length > 1) {
77-
LOG.info(
78-
"Contact point {} resolves to multiple addresses, will use them all ({})",
79-
spec,
80-
Arrays.deepToString(inetAddresses));
81-
}
82-
Set<InetSocketAddress> result = new HashSet<>();
83-
for (InetAddress inetAddress : inetAddresses) {
84-
result.add(new InetSocketAddress(inetAddress, port));
85-
}
86-
return result;
87-
} catch (UnknownHostException e) {
88-
LOG.warn("Ignoring invalid contact point {} (unknown host {})", spec, host);
89-
return Collections.emptySet();
90-
}
91-
}
92-
}
9366
}

core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslator.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
*/
1818
package com.datastax.oss.driver.internal.core.addresstranslation;
1919

20+
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME;
21+
2022
import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
21-
import com.datastax.oss.driver.api.core.config.DriverOption;
2223
import com.datastax.oss.driver.api.core.context.DriverContext;
2324
import edu.umd.cs.findbugs.annotations.NonNull;
2425
import java.net.InetSocketAddress;
@@ -37,28 +38,13 @@ public class FixedHostNameAddressTranslator implements AddressTranslator {
3738

3839
private static final Logger LOG = LoggerFactory.getLogger(FixedHostNameAddressTranslator.class);
3940

40-
public static final String ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME =
41-
"advanced.address-translator.advertised-hostname";
42-
43-
public static DriverOption ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION =
44-
new DriverOption() {
45-
@NonNull
46-
@Override
47-
public String getPath() {
48-
return ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME;
49-
}
50-
};
51-
5241
private final String advertisedHostname;
5342
private final String logPrefix;
5443

5544
public FixedHostNameAddressTranslator(@NonNull DriverContext context) {
5645
logPrefix = context.getSessionName();
5746
advertisedHostname =
58-
context
59-
.getConfig()
60-
.getDefaultProfile()
61-
.getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION);
47+
context.getConfig().getDefaultProfile().getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME);
6248
}
6349

6450
@NonNull
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.addresstranslation;
19+
20+
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
21+
import com.datastax.oss.driver.shaded.guava.common.base.Splitter;
22+
import java.net.InetAddress;
23+
import java.net.UnknownHostException;
24+
import java.util.Arrays;
25+
import java.util.List;
26+
27+
class Subnet {
28+
private final byte[] subnet;
29+
private final byte[] networkMask;
30+
private final byte[] upper;
31+
private final byte[] lower;
32+
33+
private Subnet(byte[] subnet, byte[] networkMask) {
34+
this.subnet = subnet;
35+
this.networkMask = networkMask;
36+
37+
byte[] upper = new byte[subnet.length];
38+
byte[] lower = new byte[subnet.length];
39+
for (int i = 0; i < subnet.length; i++) {
40+
upper[i] = (byte) (subnet[i] | ~networkMask[i]);
41+
lower[i] = (byte) (subnet[i] & networkMask[i]);
42+
}
43+
this.upper = upper;
44+
this.lower = lower;
45+
}
46+
47+
static Subnet parse(String subnetCIDR) throws UnknownHostException {
48+
List<String> parts = Splitter.on("/").splitToList("/");
49+
if (parts.size() != 2) {
50+
throw new IllegalArgumentException("Invalid subnet: " + subnetCIDR);
51+
}
52+
53+
boolean isIPv6 = parts.get(0).contains(":");
54+
byte[] subnet = InetAddress.getByName(parts.get(0)).getAddress();
55+
if (isIPv4(subnet) && isIPv6) {
56+
subnet = toIPv6(subnet);
57+
}
58+
int prefixLength = Integer.parseInt(parts.get(1));
59+
validatePrefixLength(subnet, prefixLength);
60+
61+
byte[] networkMask = toNetworkMask(subnet, prefixLength);
62+
validateSubnetIsPrefixBlock(subnet, networkMask, subnetCIDR);
63+
return new Subnet(subnet, networkMask);
64+
}
65+
66+
private static byte[] toNetworkMask(byte[] subnet, int prefixLength) {
67+
int fullBytes = prefixLength / 8;
68+
int remainingBits = prefixLength % 8;
69+
byte[] mask = new byte[subnet.length];
70+
Arrays.fill(mask, 0, fullBytes, (byte) 0xFF);
71+
if (remainingBits > 0) {
72+
mask[fullBytes] = (byte) (0xFF << (8 - remainingBits));
73+
}
74+
return mask;
75+
}
76+
77+
private static void validatePrefixLength(byte[] subnet, int prefixLength) {
78+
int max_prefix_length = subnet.length * 8;
79+
if (prefixLength < 0 || max_prefix_length < prefixLength) {
80+
throw new IllegalArgumentException(
81+
String.format(
82+
"Prefix length %s must be within [0; %s]", prefixLength, max_prefix_length));
83+
}
84+
}
85+
86+
private static void validateSubnetIsPrefixBlock(
87+
byte[] subnet, byte[] networkMask, String subnetCIDR) {
88+
byte[] prefixBlock = toPrefixBlock(subnet, networkMask);
89+
if (!Arrays.equals(subnet, prefixBlock)) {
90+
throw new IllegalArgumentException(
91+
String.format("Subnet %s must be represented as a network prefix block", subnetCIDR));
92+
}
93+
}
94+
95+
private static byte[] toPrefixBlock(byte[] subnet, byte[] networkMask) {
96+
byte[] prefixBlock = new byte[subnet.length];
97+
for (int i = 0; i < subnet.length; i++) {
98+
prefixBlock[i] = (byte) (subnet[i] & networkMask[i]);
99+
}
100+
return prefixBlock;
101+
}
102+
103+
@VisibleForTesting
104+
byte[] getSubnet() {
105+
return Arrays.copyOf(subnet, subnet.length);
106+
}
107+
108+
@VisibleForTesting
109+
byte[] getNetworkMask() {
110+
return Arrays.copyOf(networkMask, networkMask.length);
111+
}
112+
113+
byte[] getUpper() {
114+
return Arrays.copyOf(upper, upper.length);
115+
}
116+
117+
byte[] getLower() {
118+
return Arrays.copyOf(lower, lower.length);
119+
}
120+
121+
boolean isIPv4() {
122+
return isIPv4(subnet);
123+
}
124+
125+
boolean isIPv6() {
126+
return isIPv6(subnet);
127+
}
128+
129+
boolean contains(byte[] ip) {
130+
if (isIPv4() && !isIPv4(ip)) {
131+
return false;
132+
}
133+
if (isIPv6() && isIPv4(ip)) {
134+
ip = toIPv6(ip);
135+
}
136+
if (subnet.length != ip.length) {
137+
throw new IllegalArgumentException(
138+
"IP version is unknown: " + Arrays.toString(toZeroBasedByteArray(ip)));
139+
}
140+
for (int i = 0; i < subnet.length; i++) {
141+
if (subnet[i] != (byte) (ip[i] & networkMask[i])) {
142+
return false;
143+
}
144+
}
145+
return true;
146+
}
147+
148+
private static boolean isIPv4(byte[] ip) {
149+
return ip.length == 4;
150+
}
151+
152+
private static boolean isIPv6(byte[] ip) {
153+
return ip.length == 16;
154+
}
155+
156+
private static byte[] toIPv6(byte[] ipv4) {
157+
byte[] ipv6 = new byte[16];
158+
ipv6[10] = (byte) 0xFF;
159+
ipv6[11] = (byte) 0xFF;
160+
System.arraycopy(ipv4, 0, ipv6, 12, 4);
161+
return ipv6;
162+
}
163+
164+
@Override
165+
public String toString() {
166+
return Arrays.toString(toZeroBasedByteArray(subnet));
167+
}
168+
169+
private static int[] toZeroBasedByteArray(byte[] bytes) {
170+
int[] res = new int[bytes.length];
171+
for (int i = 0; i < bytes.length; i++) {
172+
res[i] = bytes[i] & 0xFF;
173+
}
174+
return res;
175+
}
176+
}

0 commit comments

Comments
 (0)