Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature:[loom] replace the usages of synchronized with ReentrantLock #7073

Open
wants to merge 12 commits into
base: 2.x
Choose a base branch
from
3 changes: 2 additions & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ Add changes here for all PR submitted to the 2.x branch.

### feature:

- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] support virtual thread,replace the usages of synchronized with ReentrantLock
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation


### bugfix:

- [[#PR_NO](https://github.com/apache/incubator-seata/pull/#PR_NO)] fix XXX
Expand Down Expand Up @@ -36,6 +36,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [slievrly](https://github.com/slievrly)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
- [lightClouds917](https://github.com/lightClouds917)
- [PeppaO](https://github.com/PeppaO)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### feature:

- [[#7073](https://github.com/apache/incubator-seata/pull/7073)] 支持虚拟线程,用ReentrantLock替换synchronized的用法
- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换

### bugfix:
Expand Down Expand Up @@ -35,6 +36,7 @@
- [slievrly](https://github.com/slievrly)
- [lyl2008dsg](https://github.com/lyl2008dsg)
- [remind](https://github.com/remind)
- [lightClouds917](https://github.com/lightClouds917)
- [PeppaO](https://github.com/PeppaO)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.lock;

import java.util.concurrent.locks.ReentrantLock;

/**
* The ResourceLock extends ReentrantLock and implements AutoCloseable,
* allowing it to be used in try-with-resources blocks without needing
* to unlock in a finally block.
*
* <h3>Example</h3>
* <pre>
* {@code
* private final ResourceLock resourceLock = new ResourceLock();
* try (ResourceLock lock = resourceLock.obtain()) {
* // do something while holding the resource lock
* }
* }
* </pre>
*/
public class ResourceLock extends ReentrantLock implements AutoCloseable {

/**
* Obtain the lock.
*
* @return this ResourceLock
*/
public ResourceLock obtain() {
lock();
return this;
}


/**
* Unlock the resource lock.
*
* <p>This is typically used in try-with-resources blocks to automatically
* unlock the resource lock when the block is exited, regardless of whether
* an exception is thrown or not.
*/
@Override
public void close() {
this.unlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/
package org.apache.seata.common.util;

import org.apache.seata.common.lock.ResourceLock;

/**
* The type Uuid generator.
*/
public class UUIDGenerator {

private static volatile IdWorker idWorker;
private final static ResourceLock RESOURCE_LOCK = new ResourceLock();

/**
* generate UUID using snowflake algorithm
Expand All @@ -30,7 +33,7 @@ public class UUIDGenerator {
*/
public static long generateUUID() {
if (idWorker == null) {
synchronized (UUIDGenerator.class) {
try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
if (idWorker == null) {
init(null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.lock;

import org.apache.seata.common.util.CollectionUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.concurrent.ConcurrentHashMap;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(MockitoExtension.class)
public class ResourceLockTest {

@Test
public void testObtainAndClose() {
ResourceLock resourceLock = new ResourceLock();

// Test obtaining the lock
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}

// After try-with-resources, lock should be released
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after try-with-resources");
}

@Test
public void testMultipleObtainAndClose() {
ResourceLock resourceLock = new ResourceLock();

// First obtain and release
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after first try-with-resources");

// Second obtain and release
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after second try-with-resources");
}

@Test
public void testResourceLockAutoRemovalFromMap() {
ConcurrentHashMap<String, ResourceLock> lockMap = new ConcurrentHashMap<>();
String key = "testKey";
// Use try-with-resources to obtain and release the lock
try (ResourceLock ignored = CollectionUtils.computeIfAbsent(lockMap, key, k -> new ResourceLock()).obtain()) {
// Do something while holding the lock
assertTrue(lockMap.containsKey(key));
assertTrue(lockMap.get(key).isHeldByCurrentThread());
} finally {
assertFalse(lockMap.get(key).isHeldByCurrentThread());
assertTrue(lockMap.containsKey(key));
// Remove the lock from the map
lockMap.remove(key);
assertFalse(lockMap.containsKey(key));
}
// Ensure the lock is removed from the map
assertFalse(lockMap.containsKey(key));
}

@Test
public void testConcurrentLocking() throws InterruptedException {
ResourceLock resourceLock = new ResourceLock();

Thread t1 = new Thread(() -> {
try (ResourceLock lock = resourceLock.obtain()) {
try {
Thread.sleep(100); // Hold the lock for 100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

Thread t2 = new Thread(() -> {
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should not be held by current thread before t1 releases it");
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread after t1 releases it");
}
});

t1.start();
t2.start();

t1.join();
t2.join();

assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after both threads complete");
}

@Test
public void testLockInterruptibly() throws InterruptedException {
ResourceLock resourceLock = new ResourceLock();

Thread t1 = new Thread(() -> {
try (ResourceLock lock = resourceLock.obtain()) {
try {
Thread.sleep(1000); // Hold the lock for 1000ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

t1.start();
Thread.sleep(50); // Wait for t1 to acquire the lock

Thread t2 = new Thread(() -> {
try {
resourceLock.lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

t2.start();
Thread.sleep(50); // Wait for t2 to attempt to acquire the lock

t2.interrupt(); // Interrupt t2

t1.join();
t2.join();

assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after t1 completes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.netflix.discovery.EurekaEventListener;
import com.netflix.discovery.shared.Application;
import org.apache.seata.common.exception.EurekaRegistryException;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static final ConcurrentMap<String, List<EurekaEventListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, List<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Object> CLUSTER_LOCK = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, ResourceLock> CLUSTER_LOCK = new ConcurrentHashMap<>();

private static volatile ApplicationInfoManager applicationInfoManager;
private static volatile CustomEurekaInstanceConfig instanceConfig;
Expand Down Expand Up @@ -140,8 +141,8 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
}
String clusterUpperName = clusterName.toUpperCase();
if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) {
Object lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new Object());
synchronized (lock) {
ResourceLock lock = CLUSTER_LOCK.computeIfAbsent(clusterUpperName, k -> new ResourceLock());
try (ResourceLock ignored = lock.obtain()) {
if (!LISTENER_SERVICE_MAP.containsKey(clusterUpperName)) {
refreshCluster(clusterUpperName);
subscribe(clusterUpperName, event -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.seata.common.lock.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TccHookManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class);
private static final ResourceLock LOCK = new ResourceLock();


private TccHookManager() {

Expand All @@ -40,7 +43,7 @@ private TccHookManager() {
*/
public static List<TccHook> getHooks() {
if (CACHED_UNMODIFIABLE_HOOKS == null) {
synchronized (TccHookManager.class) {
try (ResourceLock ignored = LOCK.obtain()) {
if (CACHED_UNMODIFIABLE_HOOKS == null) {
CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.integration.tx.api.remoting.RemotingDesc;
import org.apache.seata.integration.tx.api.remoting.RemotingParser;
Expand All @@ -43,6 +44,8 @@ public class DefaultRemotingParser {
*/
protected static Map<Object, RemotingDesc> remotingServiceMap = new ConcurrentHashMap<>();

private final ResourceLock resourceLock = new ResourceLock();

private static class SingletonHolder {
private static final DefaultRemotingParser INSTANCE = new DefaultRemotingParser();
}
Expand Down Expand Up @@ -79,7 +82,7 @@ protected void initRemotingParser() {
* @param remotingParser
*/
public boolean registerRemotingParser(RemotingParser remotingParser) {
synchronized (this) {
try (ResourceLock ignored = resourceLock.obtain()) {
return allRemotingParsers.add(remotingParser);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.integration.tx.api.util;

import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.integration.tx.api.interceptor.handler.DefaultInvocationHandler;
import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
import org.apache.seata.integration.tx.api.interceptor.parser.DefaultInterfaceParser;
Expand All @@ -31,6 +32,7 @@
public class ProxyUtil {

private static final Map<Object, Object> PROXYED_SET = new HashMap<>();
private static final ResourceLock RESOURCE_LOCK = new ResourceLock();

public static <T> T createProxy(T target) {
return createProxy(target, target.getClass().getName());
Expand All @@ -53,7 +55,7 @@ public static <T> T createProxy(T target) {
*/
public static <T> T createProxy(T target, String beanName) {
try {
synchronized (PROXYED_SET) {
try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
if (PROXYED_SET.containsKey(target)) {
return (T) PROXYED_SET.get(target);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.seata.rm.datasource.util;

import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.rm.BaseDataSourceResource;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.sqlparser.SqlParserType;
Expand All @@ -33,10 +34,11 @@
public final class JdbcUtils {

private static volatile DbTypeParser dbTypeParser;
private final static ResourceLock RESOURCE_LOCK = new ResourceLock();

static DbTypeParser getDbTypeParser() {
if (dbTypeParser == null) {
synchronized (JdbcUtils.class) {
try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
if (dbTypeParser == null) {
dbTypeParser = EnhancedServiceLoader.load(DbTypeParser.class, SqlParserType.SQL_PARSER_TYPE_DRUID);
}
Expand Down
Loading
Loading