Skip to content

Commit

Permalink
fix(11127): Fix ClientWork removeListener and addListener concurrency…
Browse files Browse the repository at this point in the history
… issue. (#11681)
  • Loading branch information
Bo-Qiu authored Feb 2, 2024
1 parent 1b9a22c commit 9fcc4c0
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,16 @@ public void addListeners(String dataId, String group, List<? extends Listener> l
group = blank2defaultGroup(group);
CacheData cache = addCacheDataIfAbsent(dataId, group);
synchronized (cache) {

for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setConsistentWithServer(false);
// make sure cache exists in cacheMap
if (getCache(dataId, group) != cache) {
putCache(GroupKey.getKey(dataId, group), cache);
}
agent.notifyListenConfig();

}
}

Expand All @@ -185,6 +187,10 @@ public void addTenantListeners(String dataId, String group, List<? extends Liste
}
cache.setDiscard(false);
cache.setConsistentWithServer(false);
// ensure cache present in cacheMap
if (getCache(dataId, group, tenant) != cache) {
putCache(GroupKey.getKeyTenant(dataId, group, tenant), cache);
}
agent.notifyListenConfig();
}

Expand Down Expand Up @@ -213,6 +219,10 @@ public void addTenantListenersWithContent(String dataId, String group, String co
}
cache.setDiscard(false);
cache.setConsistentWithServer(false);
// make sure cache exists in cacheMap
if (getCache(dataId, group, tenant) != cache) {
putCache(GroupKey.getKeyTenant(dataId, group, tenant), cache);
}
agent.notifyListenConfig();
}

Expand Down Expand Up @@ -403,6 +413,20 @@ public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant
return cache;
}

/**
* Put cache.
*
* @param key groupKey
* @param cache cache
*/
private void putCache(String key, CacheData cache) {
synchronized (cacheMap) {
Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());
copy.put(key, cache);
cacheMap.set(copy);
}
}

private void increaseTaskIdCount(int taskId) {
taskIdCacheCountList.get(taskId).incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,57 @@

package com.alibaba.nacos.client.config.impl;

import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.listener.AbstractListener;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.env.NacosClientProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;

public class ClientWorkerTest {

private static final String TEST_NAMESPACE = "TEST_NAMESPACE";

private ClientWorker clientWorker;

private ClientWorker clientWorkerSpy;

@Before
public void setUp() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESPACE, TEST_NAMESPACE);
ConfigFilterChainManager filter = new ConfigFilterChainManager(properties);
ServerListManager serverListManager = Mockito.mock(ServerListManager.class);
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
try {
clientWorker = new ClientWorker(filter, serverListManager, nacosClientProperties);
} catch (NacosException e) {
throw new RuntimeException(e);
}
clientWorkerSpy = Mockito.spy(clientWorker);
}

@Test
public void testConstruct() throws NacosException {
Properties prop = new Properties();
Expand Down Expand Up @@ -205,4 +240,129 @@ public void testIsHealthServer() throws NacosException, NoSuchFieldException, Il
Mockito.when(client.isHealthServer()).thenReturn(Boolean.FALSE);
Assert.assertEquals(false, clientWorker.isHealthServer());
}

@Test
public void testPutCache() throws Exception {
// 反射调用私有方法putCacheIfAbsent
Method putCacheMethod = ClientWorker.class.getDeclaredMethod("putCache", String.class, CacheData.class);
putCacheMethod.setAccessible(true);
Properties prop = new Properties();
ConfigFilterChainManager filter = new ConfigFilterChainManager(new Properties());
ServerListManager agent = Mockito.mock(ServerListManager.class);
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop);
ClientWorker clientWorker = new ClientWorker(filter, agent, nacosClientProperties);
String key = "testKey";
CacheData cacheData = new CacheData(filter, "env", "dataId", "group");
putCacheMethod.invoke(clientWorker, key, cacheData);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
// 检查cacheMap是否包含特定的key
assertNotNull(cacheMapRef.get().get(key));
Assert.assertEquals(cacheData, cacheMapRef.get().get(key));
// 测试再次插入相同的key将覆盖原始的值
CacheData newCacheData = new CacheData(filter, "newEnv", "newDataId", "newGroup");
putCacheMethod.invoke(clientWorker, key, newCacheData);
// 检查key对应的value是否改变为newCacheData
Assert.assertEquals(newCacheData, cacheMapRef.get().get(key));
}

@Test
public void testAddListenersEnsureCacheDataSafe()
throws NacosException, IllegalAccessException, NoSuchFieldException {
String dataId = "testDataId";
String group = "testGroup";
// 将key-cacheData插入到cacheMap中
CacheData cacheData = new CacheData(null, "env", dataId, group);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
String key = GroupKey.getKey(dataId, group);
cacheMapRef.get().put(key, cacheData);
// 当addCacheDataIfAbsent得到的differentCacheData,同cacheMap中该key对应的cacheData不一致
CacheData differentCacheData = new CacheData(null, "env", dataId, group);
doReturn(differentCacheData).when(clientWorkerSpy).addCacheDataIfAbsent(anyString(), anyString());
// 使用addListeners将differentCacheData插入到cacheMap中
clientWorkerSpy.addListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache1 = clientWorker.getCache(dataId, group);
assertNotNull(cacheDataFromCache1);
assertEquals(cacheDataFromCache1, differentCacheData);
assertFalse(cacheDataFromCache1.isDiscard());
assertFalse(cacheDataFromCache1.isConsistentWithServer());
// 再次调用addListeners,此时addCacheDataIfAbsent得到的cacheData同cacheMap中该key对应的cacheData一致,均为differentCacheData
clientWorkerSpy.addListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache2 = clientWorker.getCache(dataId, group);
assertNotNull(cacheDataFromCache2);
assertEquals(cacheDataFromCache2, differentCacheData);
assertFalse(cacheDataFromCache2.isDiscard());
assertFalse(cacheDataFromCache2.isConsistentWithServer());
}

@Test
public void testAddTenantListenersEnsureCacheDataSafe()
throws NacosException, IllegalAccessException, NoSuchFieldException {
String dataId = "testDataId";
String group = "testGroup";
// 将key-cacheData插入到cacheMap中
CacheData cacheData = new CacheData(null, "env", dataId, group);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
String key = GroupKey.getKeyTenant(dataId, group, TEST_NAMESPACE);
cacheMapRef.get().put(key, cacheData);
// 当addCacheDataIfAbsent得到的differentCacheData,同cacheMap中该key对应的cacheData不一致
CacheData differentCacheData = new CacheData(null, "env", dataId, group);
doReturn(differentCacheData).when(clientWorkerSpy)
.addCacheDataIfAbsent(anyString(), anyString(), eq(TEST_NAMESPACE));
// 使用addListeners将differentCacheData插入到cacheMap中
clientWorkerSpy.addTenantListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache1 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache1);
assertEquals(cacheDataFromCache1, differentCacheData);
assertFalse(cacheDataFromCache1.isDiscard());
assertFalse(cacheDataFromCache1.isConsistentWithServer());
// 再次调用addListeners,此时addCacheDataIfAbsent得到的cacheData同cacheMap中该key对应的cacheData一致,均为differentCacheData
clientWorkerSpy.addTenantListeners(dataId, group, Collections.EMPTY_LIST);
CacheData cacheDataFromCache2 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache2);
assertEquals(cacheDataFromCache2, differentCacheData);
assertFalse(cacheDataFromCache2.isDiscard());
assertFalse(cacheDataFromCache2.isConsistentWithServer());
}

@Test
public void testAddTenantListenersWithContentEnsureCacheDataSafe()
throws NacosException, IllegalAccessException, NoSuchFieldException {
String dataId = "testDataId";
String group = "testGroup";
// 将key-cacheData插入到cacheMap中
CacheData cacheData = new CacheData(null, "env", dataId, group);
Field cacheMapField = ClientWorker.class.getDeclaredField("cacheMap");
cacheMapField.setAccessible(true);
AtomicReference<Map<String, CacheData>> cacheMapRef = (AtomicReference<Map<String, CacheData>>) cacheMapField.get(
clientWorker);
String key = GroupKey.getKeyTenant(dataId, group, TEST_NAMESPACE);
cacheMapRef.get().put(key, cacheData);
// 当addCacheDataIfAbsent得到的differentCacheData,同cacheMap中该key对应的cacheData不一致
CacheData differentCacheData = new CacheData(null, "env", dataId, group);
doReturn(differentCacheData).when(clientWorkerSpy)
.addCacheDataIfAbsent(anyString(), anyString(), eq(TEST_NAMESPACE));
// 使用addListeners将differentCacheData插入到cacheMap中
clientWorkerSpy.addTenantListenersWithContent(dataId, group, "", "", Collections.EMPTY_LIST);
CacheData cacheDataFromCache1 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache1);
assertEquals(cacheDataFromCache1, differentCacheData);
assertFalse(cacheDataFromCache1.isDiscard());
assertFalse(cacheDataFromCache1.isConsistentWithServer());
// 再次调用addListeners,此时addCacheDataIfAbsent得到的cacheData同cacheMap中该key对应的cacheData一致,均为differentCacheData
clientWorkerSpy.addTenantListenersWithContent(dataId, group, "", "", Collections.EMPTY_LIST);
CacheData cacheDataFromCache2 = clientWorker.getCache(dataId, group, TEST_NAMESPACE);
assertNotNull(cacheDataFromCache2);
assertEquals(cacheDataFromCache2, differentCacheData);
assertFalse(cacheDataFromCache2.isDiscard());
assertFalse(cacheDataFromCache2.isConsistentWithServer());
}
}

0 comments on commit 9fcc4c0

Please sign in to comment.