Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: add secure authentication to interfaces in ClusterController #6042

Merged
merged 35 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6a0a0f3
add secure authentication to interfaces in ClusterController
ggbocoder Nov 17, 2023
6438318
fix codestyle
ggbocoder Nov 17, 2023
7ab84ae
fix codestyle
ggbocoder Nov 17, 2023
abbc2b2
fix
ggbocoder Nov 17, 2023
89adfb3
Merge branch '2.x' into 6012
funky-eyes Nov 17, 2023
34f30e8
use ttl to validate token expired time
ggbocoder Nov 17, 2023
62255d0
Merge remote-tracking branch 'origin/6012' into 6012
ggbocoder Nov 17, 2023
a607b24
Merge branch '2.x' into 6012
ggbocoder Nov 19, 2023
fff0c99
resolve conflicts
ggbocoder Nov 19, 2023
3a941fd
fix
ggbocoder Nov 19, 2023
aa378c9
opt
ggbocoder Nov 21, 2023
d8d3987
Merge branch '2.x' into 6012
funky-eyes Nov 24, 2023
623e43e
fix
ggbocoder Nov 26, 2023
6df8539
Merge branch '2.x' into 6012
funky-eyes Nov 26, 2023
b8a6a7f
Update discovery/seata-discovery-raft/src/main/java/io/seata/discover…
ggbocoder Nov 26, 2023
412700b
Update discovery/seata-discovery-raft/src/main/java/io/seata/discover…
ggbocoder Nov 26, 2023
47680da
Update discovery/seata-discovery-raft/src/main/java/io/seata/discover…
ggbocoder Nov 26, 2023
f8a3d3d
Update discovery/seata-discovery-raft/src/main/java/io/seata/discover…
ggbocoder Nov 26, 2023
509c0a2
add RetryableException
ggbocoder Nov 26, 2023
d9feaba
Merge remote-tracking branch 'origin/6012' into 6012
ggbocoder Nov 26, 2023
b062f88
Merge branch '2.x' into 6012
funky-eyes Nov 27, 2023
862dab2
Merge branch '2.x' into 6012
funky-eyes Nov 27, 2023
a2a2951
fix and add tests
ggbocoder Nov 27, 2023
733e92e
fix
ggbocoder Nov 27, 2023
4fe71b0
fix
ggbocoder Nov 27, 2023
58dd93c
fix
ggbocoder Nov 27, 2023
5f95f06
fix
ggbocoder Nov 27, 2023
631a9ad
fix
ggbocoder Nov 27, 2023
8143a9b
Update discovery/seata-discovery-raft/src/main/java/io/seata/discover…
ggbocoder Nov 28, 2023
3b4fe27
Update discovery/seata-discovery-raft/src/main/java/io/seata/discover…
ggbocoder Nov 28, 2023
129ab4f
Update discovery/seata-discovery-raft/src/main/java/io/seata/discover…
ggbocoder Nov 28, 2023
23109ee
simplify test config
ggbocoder Nov 28, 2023
de2ef50
simplify test config
ggbocoder Nov 28, 2023
4daf69a
fix
ggbocoder Nov 28, 2023
7db947e
Merge branch '2.x' into 6012
funky-eyes Nov 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,9 @@
<artifactId>httpclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.common.exception;


/**
* Exception indicating authentication failure. This exception is typically thrown
* when the authentication process fails, and it extends SecurityException to
* signal that it is a security-related issue.
*/
public class AuthenticationFailedException extends SecurityException {

/**
* Constructs a new AuthenticationFailedException with no detailed message.
*/
public AuthenticationFailedException() {
super();
}

/**
* Constructs a new AuthenticationFailedException with the specified detail message.
*
* @param message the detail message (which is saved for later retrieval
* by the getMessage() method).
*/
public AuthenticationFailedException(String message) {
super(message);
}

/**
* Constructs a new AuthenticationFailedException with the specified cause.
*
* @param cause the cause (which is saved for later retrieval by the
* getCause() method).
*/
public AuthenticationFailedException(Throwable cause) {
super(cause);
}

/**
* Constructs a new AuthenticationFailedException with the specified detail
* message and cause.
*
* @param message the detail message (which is saved for later retrieval
* by the getMessage() method).
* @param cause the cause (which is saved for later retrieval by the
* getCause() method).
*/
public AuthenticationFailedException(String message, Throwable cause) {
super(message, cause);
}
}
35 changes: 23 additions & 12 deletions common/src/main/java/io/seata/common/util/HttpClientUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.seata.common.util;


import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class HttpClientUtil {
private static final Map<Integer/*timeout*/, CloseableHttpClient> HTTP_CLIENT_MAP = new ConcurrentHashMap<>();

private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER =
new PoolingHttpClientConnectionManager();
new PoolingHttpClientConnectionManager();

static {
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
Expand All @@ -66,25 +67,35 @@ public class HttpClientUtil {
})));
}


// post request
public static CloseableHttpResponse doPost(String url, Map<String, String> params, Map<String, String> header,
int timeout) throws IOException {
int timeout) throws IOException {
try {
URIBuilder builder = new URIBuilder(url);
URI uri = builder.build();
HttpPost httpPost = new HttpPost(uri);
String contentType = "";
if (header != null) {
header.forEach(httpPost::addHeader);
contentType = header.get("Content-Type");
}
if (StringUtils.isNotBlank(contentType)) {
if (ContentType.APPLICATION_FORM_URLENCODED.getMimeType().equals(contentType)) {
List<NameValuePair> nameValuePairs = new ArrayList<>();
params.forEach((k, v) -> {
nameValuePairs.add(new BasicNameValuePair(k, v));
});
String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);
StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
httpPost.setEntity(stringEntity);
} else if (ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) {
ObjectMapper objectMapper = new ObjectMapper();
String requestBody = objectMapper.writeValueAsString(params);
StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_JSON);
httpPost.setEntity(stringEntity);
}
}
List<NameValuePair> nameValuePairs = new ArrayList<>();
params.forEach((k, v) -> {
nameValuePairs.add(new BasicNameValuePair(k, v));
});
String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);

StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
httpPost.setEntity(stringEntity);
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
Expand All @@ -99,7 +110,7 @@ public static CloseableHttpResponse doPost(String url, Map<String, String> param

// get request
public static CloseableHttpResponse doGet(String url, Map<String, String> param, Map<String, String> header,
int timeout) throws IOException {
int timeout) throws IOException {
try {
URIBuilder builder = new URIBuilder(url);
if (param != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
ggbocoder marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -33,8 +34,10 @@
import java.util.stream.Stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.seata.common.ConfigurationKeys;
import io.seata.common.exception.AuthenticationFailedException;
import io.seata.common.metadata.Metadata;
import io.seata.common.metadata.MetadataResponse;
import io.seata.common.metadata.Node;
Expand All @@ -49,7 +52,9 @@
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
import org.apache.http.protocol.HTTP;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,13 +64,31 @@
* @author funkye
*/
public class RaftRegistryServiceImpl implements RegistryService<ConfigChangeListener> {

private static final Logger LOGGER = LoggerFactory.getLogger(RaftRegistryServiceImpl.class);

private static final String REGISTRY_TYPE = "raft";

private static final String PRO_SERVER_ADDR_KEY = "serverAddr";

private static final String PRO_USERNAME_KEY = "username";

private static final String PRO_PASSWORD_KEY = "password";

private static final String AUTHORIZATION_HEADER = "Authorization";

private static final String TOKEN_VALID_TIME_MS_KEY = "tokenValidityInMilliseconds";

private static final long TOKEN_EXPIRE_TIME_IN_MILLISECONDS;

private static final String USERNAME;

private static final String PASSWORD;

public static String jwtToken;

private static long tokenTimeStamp = -1;

private static volatile RaftRegistryServiceImpl instance;

private static final Configuration CONFIG = ConfigurationFactory.getInstance();
Expand All @@ -91,7 +114,20 @@ public class RaftRegistryServiceImpl implements RegistryService<ConfigChangeList
*/
private static final Map<String, List<InetSocketAddress>> ALIVE_NODES = new ConcurrentHashMap<>();

private RaftRegistryServiceImpl() {}
static {
TOKEN_EXPIRE_TIME_IN_MILLISECONDS = CONFIG.getLong(getTokenExpireTimeInMillisecondsKey(), 29 * 60 * 1000L);
USERNAME = CONFIG.getConfig(getRaftUserNameKey());
PASSWORD = CONFIG.getConfig(getRaftPassWordKey());
}

private RaftRegistryServiceImpl() {

try {
refreshToken();
} catch (IOException e) {
throw new RuntimeException("Init fetch token failed!", e);
}
}

/**
* Gets instance.
Expand Down Expand Up @@ -215,6 +251,29 @@ private static String getRaftAddrFileKey() {
REGISTRY_TYPE, PRO_SERVER_ADDR_KEY);
}

private static String getRaftUserNameKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY,
REGISTRY_TYPE, PRO_USERNAME_KEY);
}

private static String getRaftPassWordKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY,
REGISTRY_TYPE, PRO_PASSWORD_KEY);
}

private static String getTokenExpireTimeInMillisecondsKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY,
REGISTRY_TYPE, TOKEN_VALID_TIME_MS_KEY);
}

private static boolean isTokenExpired() {
if (tokenTimeStamp == -1) {
return true;
}
long tokenExpiredTime = tokenTimeStamp + TOKEN_EXPIRE_TIME_IN_MILLISECONDS;
return System.currentTimeMillis() >= tokenExpiredTime;
}

private InetSocketAddress convertInetSocketAddress(Node node) {
Node.Endpoint endpoint = node.getTransaction();
return new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
Expand All @@ -238,17 +297,35 @@ public List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
}

private static boolean watch() {
Map<String, String> header = new HashMap<>();
header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
Map<String, String> param = new HashMap<>();
String clusterName = CURRENT_TRANSACTION_CLUSTER_NAME;
Map<String, Long> groupTerms = METADATA.getClusterTerm(clusterName);
groupTerms.forEach((k, v) -> param.put(k, String.valueOf(v)));
for (String group : groupTerms.keySet()) {
String tcAddress = queryHttpAddress(clusterName, group);
try (CloseableHttpResponse response =
HttpClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, null, 30000)) {
if (response != null) {
StatusLine statusLine = response.getStatusLine();
return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK;
try {
if (isTokenExpired()) {
refreshToken();
}
if (!Objects.isNull(jwtToken)) {
ggbocoder marked this conversation as resolved.
Show resolved Hide resolved
header.put(AUTHORIZATION_HEADER, jwtToken);
}
try (CloseableHttpResponse response =
HttpClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, header, 30000)) {
if (response != null) {
StatusLine statusLine = response.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
if (StringUtils.isNotBlank(USERNAME) && StringUtils.isNotBlank(PASSWORD)) {
LOGGER.warn("Authentication failed!");
throw new IOException();
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password.");
}
}
return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK;
}
}
} catch (IOException e) {
LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, e.getMessage());
Expand All @@ -265,7 +342,7 @@ private static boolean watch() {

@Override
public List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
List<InetSocketAddress> aliveAddress) {
if (METADATA.isRaftMode()) {
Node leader = METADATA.getLeader(getServiceGroup(transactionServiceGroup));
InetSocketAddress leaderAddress = convertInetSocketAddress(leader);
Expand All @@ -286,14 +363,35 @@ private static void acquireClusterMetaDataByClusterName(String clusterName) {

private static void acquireClusterMetaData(String clusterName, String group) {
String tcAddress = queryHttpAddress(clusterName, group);
Map<String, String> header = new HashMap<>();
header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
if (isTokenExpired()) {
try {
refreshToken();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
if (StringUtils.isNotBlank(jwtToken)) {
header.put(AUTHORIZATION_HEADER, jwtToken);
}
if (StringUtils.isNotBlank(tcAddress)) {
Map<String, String> param = new HashMap<>();
param.put("group", group);
String response = null;
try (CloseableHttpResponse httpResponse =
HttpClientUtil.doGet("http://" + tcAddress + "/metadata/v1/cluster", param, null, 1000)) {
if (httpResponse != null && httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8);
HttpClientUtil.doGet("http://" + tcAddress + "/metadata/v1/cluster", param, header, 1000)) {
if (httpResponse != null) {
if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8);
} else if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
if (StringUtils.isNotBlank(USERNAME) && StringUtils.isNotBlank(PASSWORD)) {
LOGGER.warn("Authentication failed !");
throw new IOException();
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password.");
}
}
}
MetadataResponse metadataResponse;
if (StringUtils.isNotBlank(response)) {
Expand All @@ -310,6 +408,45 @@ private static void acquireClusterMetaData(String clusterName, String group) {
}
}

public static void refreshToken() throws IOException {
// if username and password is not in config , return
if (Objects.isNull(USERNAME) || Objects.isNull(PASSWORD)) {
ggbocoder marked this conversation as resolved.
Show resolved Hide resolved
return;
}
String raftClusterAddress = CONFIG.getConfig(getRaftAddrFileKey());
// get token and set it in cache
if (StringUtils.isNotBlank(raftClusterAddress)) {
String[] tcAddressList = raftClusterAddress.split(",");
String tcAddress = tcAddressList[0];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the original ip1 goes offline?

Map<String, String> param = new HashMap<>();
param.put(PRO_USERNAME_KEY, USERNAME);
param.put(PRO_PASSWORD_KEY, PASSWORD);
Map<String, String> header = new HashMap<>();
header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
String response = null;
tokenTimeStamp = System.currentTimeMillis();
try (CloseableHttpResponse httpResponse =
HttpClientUtil.doPost("http://" + tcAddress + "/api/v1/auth/login", param, header, 1000)) {
if (httpResponse != null) {
if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8);
JsonNode jsonNode = OBJECT_MAPPER.readTree(response);
String codeStatus = jsonNode.get("code").asText();
if (!StringUtils.equals(codeStatus, "200")) {
//authorized failed,throw exception to kill process
throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password.");
}
jwtToken = jsonNode.get("data").asText();
} else {
//authorized failed,throw exception to kill process
throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password.");
}
}
}
}
}


@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
Expand Down
Loading
Loading