Skip to content

Commit

Permalink
sync 2.x to branch 2.0.0-test (#6053)
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly authored Nov 18, 2023
1 parent 3e9a46e commit f5fddf1
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 122 deletions.
3 changes: 3 additions & 0 deletions changes/en-us/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ The version is updated as follows:
- [[#6018](https://github.com/seata/seata/pull/6018)] fix incorrect metric report
- [[#6024](https://github.com/seata/seata/pull/6024)] fix the white screen after click the "View Global Lock" button on the transaction info page in the console
- [[#6015](https://github.com/seata/seata/pull/6015)] fix can't integrate dubbo with spring
- [[#6049](https://github.com/seata/seata/pull/6049)] fix registry type for raft under the network interruption did not carry out the sleep 1s
- [[#6050](https://github.com/seata/seata/pull/6050)] change RaftServer#destroy to wait all shutdown procedures done

### optimize:
- [[#6033](https://github.com/seata/seata/pull/6033)] optimize the isReference judgment logic in HSFRemotingParser, remove unnecessary judgment about FactoryBean
Expand Down Expand Up @@ -152,6 +154,7 @@ The version is updated as follows:
- [[#5951](https://github.com/seata/seata/pull/5951)] remove un support config in jdk17
- [[#5959](https://github.com/seata/seata/pull/5959)] modify code style and remove unused import
- [[#6002](https://github.com/seata/seata/pull/6002)] remove fst serialization
- [[#6045](https://github.com/seata/seata/pull/6045)] optimize derivative product check base on mysql


### security:
Expand Down
3 changes: 3 additions & 0 deletions changes/zh-cn/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#6018](https://github.com/seata/seata/pull/6018)] 修复错误的 metric 上报
- [[#6024](https://github.com/seata/seata/pull/6024)] 修复控制台点击事务信息页面中的"查看全局锁"按钮之后白屏的问题
- [[#6015](https://github.com/seata/seata/pull/6015)] 修复在spring环境下无法集成dubbo
- [[#6049](https://github.com/seata/seata/pull/6049)] 修复客户端在raft注册中心类型下,网络中断时,watch线程未暂停一秒等待重试的问题
- [[#6050](https://github.com/seata/seata/pull/6050)] 修改 RaftServer#destroy 为等待所有关闭流程结束


### optimize:
Expand Down Expand Up @@ -153,6 +155,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#5951](https://github.com/seata/seata/pull/5951)] 删除在 jdk17 中不支持的配置项
- [[#5959](https://github.com/seata/seata/pull/5959)] 修正代码风格问题及去除无用的类引用
- [[#6002](https://github.com/seata/seata/pull/6002)] 移除fst序列化模块
- [[#6045](https://github.com/seata/seata/pull/6045)] 优化MySQL衍生数据库判断逻辑


### security:
Expand Down
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,10 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
127 changes: 127 additions & 0 deletions common/src/main/java/io/seata/common/util/HttpClientUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.common.util;


import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author funkye
*/
public class HttpClientUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtil.class);

private static final Map<Integer/*timeout*/, CloseableHttpClient> HTTP_CLIENT_MAP = new ConcurrentHashMap<>();

private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER =
new PoolingHttpClientConnectionManager();

static {
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10);
Runtime.getRuntime().addShutdownHook(new Thread(() -> HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> {
try {
client.close();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
})));
}

// post request
public static CloseableHttpResponse doPost(String url, Map<String, String> params, Map<String, String> header,
int timeout) throws IOException {
try {
URIBuilder builder = new URIBuilder(url);
URI uri = builder.build();
HttpPost httpPost = new HttpPost(uri);
if (header != null) {
header.forEach(httpPost::addHeader);
}
List<NameValuePair> nameValuePairs = new ArrayList<>();
params.forEach((k, v) -> {
nameValuePairs.add(new BasicNameValuePair(k, v));
});
String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);

StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
httpPost.setEntity(stringEntity);
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpPost);
} catch (URISyntaxException | ClientProtocolException e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

// get request
public static CloseableHttpResponse doGet(String url, Map<String, String> param, Map<String, String> header,
int timeout) throws IOException {
try {
URIBuilder builder = new URIBuilder(url);
if (param != null) {
for (String key : param.keySet()) {
builder.addParameter(key, param.get(key));
}
}
URI uri = builder.build();
HttpGet httpGet = new HttpGet(uri);
if (header != null) {
header.forEach(httpGet::addHeader);
}
CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpGet);
} catch (URISyntaxException | ClientProtocolException e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -41,26 +40,15 @@
import io.seata.common.metadata.Node;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.HttpClientUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigChangeListener;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,11 +78,6 @@ public class RaftRegistryServiceImpl implements RegistryService<ConfigChangeList

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER =
new PoolingHttpClientConnectionManager();

private static final Map<Integer/*timeout*/, CloseableHttpClient> HTTP_CLIENT_MAP = new ConcurrentHashMap<>();

private static volatile String CURRENT_TRANSACTION_SERVICE_GROUP;

private static volatile String CURRENT_TRANSACTION_CLUSTER_NAME;
Expand All @@ -108,11 +91,6 @@ public class RaftRegistryServiceImpl implements RegistryService<ConfigChangeList
*/
private static final Map<String, List<InetSocketAddress>> ALIVE_NODES = new ConcurrentHashMap<>();

static {
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10);
}

private RaftRegistryServiceImpl() {}

/**
Expand Down Expand Up @@ -191,13 +169,6 @@ protected static void startQueryMetadata() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
CLOSED.compareAndSet(false, true);
REFRESH_METADATA_EXECUTOR.shutdown();
HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> {
try {
client.close();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
});
}));
}
}
Expand Down Expand Up @@ -274,13 +245,13 @@ private static boolean watch() {
for (String group : groupTerms.keySet()) {
String tcAddress = queryHttpAddress(clusterName, group);
try (CloseableHttpResponse response =
doPost("http://" + tcAddress + "/metadata/v1/watch", param, null, 30000)) {
HttpClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, null, 30000)) {
if (response != null) {
StatusLine statusLine = response.getStatusLine();
return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK;
}
} catch (Exception e) {
LOGGER.error("watch cluster fail: {}", e.getMessage());
} catch (IOException e) {
LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, e.getMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
Expand Down Expand Up @@ -320,7 +291,7 @@ private static void acquireClusterMetaData(String clusterName, String group) {
param.put("group", group);
String response = null;
try (CloseableHttpResponse httpResponse =
doGet("http://" + tcAddress + "/metadata/v1/cluster", param, null, 1000)) {
HttpClientUtil.doGet("http://" + tcAddress + "/metadata/v1/cluster", param, null, 1000)) {
if (httpResponse != null && httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8);
}
Expand All @@ -339,64 +310,6 @@ private static void acquireClusterMetaData(String clusterName, String group) {
}
}

public static CloseableHttpResponse doGet(String url, Map<String, String> param, Map<String, String> header,
int timeout) {
CloseableHttpClient client;
try {
URIBuilder builder = new URIBuilder(url);
if (param != null) {
for (String key : param.keySet()) {
builder.addParameter(key, param.get(key));
}
}
URI uri = builder.build();
HttpGet httpGet = new HttpGet(uri);
if (header != null) {
header.forEach(httpGet::addHeader);
}
client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpGet);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

public static CloseableHttpResponse doPost(String url, Map<String, String> params, Map<String, String> header,
int timeout) {
CloseableHttpClient client = null;
try {
URIBuilder builder = new URIBuilder(url);
URI uri = builder.build();
HttpPost httpPost = new HttpPost(uri);
if (header != null) {
header.forEach(httpPost::addHeader);
}
List<NameValuePair> nameValuePairs = new ArrayList<>();
params.forEach((k, v) -> {
nameValuePairs.add(new BasicNameValuePair(k, v));
});
String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);

StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
httpPost.setEntity(stringEntity);
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpPost);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
Expand Down
Loading

0 comments on commit f5fddf1

Please sign in to comment.