From 60d6ec439c49253bbbab87257f8e0ad6e54febb7 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Sat, 24 Aug 2024 21:38:11 +0800 Subject: [PATCH 1/9] feat: use curator instead of zkclient in config model --- .../config/ConfigurationChangeEvent.java | 11 + config/seata-config-zk/pom.xml | 8 + .../config/zk/ZookeeperConfiguration.java | 266 +++++++++++------- .../seata/config/zk/ZkConfigurationTest.java | 122 ++++++++ dependencies/pom.xml | 14 +- 5 files changed, 320 insertions(+), 101 deletions(-) create mode 100644 config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java diff --git a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java index 215805a3011..714f8b17678 100644 --- a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java +++ b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeEvent.java @@ -141,4 +141,15 @@ public ConfigurationChangeEvent setNamespace(String namespace) { this.namespace = namespace; return this; } + + @Override + public String toString() { + return "ConfigurationChangeEvent{" + + "dataId='" + dataId + '\'' + + ", oldValue='" + oldValue + '\'' + + ", newValue='" + newValue + '\'' + + ", namespace='" + namespace + '\'' + + ", changeType=" + changeType + + '}'; + } } diff --git a/config/seata-config-zk/pom.xml b/config/seata-config-zk/pom.xml index eacb1611367..3e847041fc0 100644 --- a/config/seata-config-zk/pom.xml +++ b/config/seata-config-zk/pom.xml @@ -45,6 +45,14 @@ + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-framework + diff --git a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java index 8c29ad20865..b045984f668 100644 --- a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java +++ b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/ZookeeperConfiguration.java @@ -17,7 +17,8 @@ package org.apache.seata.config.zk; import java.io.IOException; -import java.lang.reflect.Constructor; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.Map; import java.util.Properties; @@ -29,7 +30,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.seata.common.exception.NotSupportYetException; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.CollectionUtils; @@ -41,10 +47,7 @@ import org.apache.seata.config.ConfigurationChangeType; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.config.processor.ConfigProcessor; -import org.I0Itec.zkclient.IZkDataListener; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.serialize.ZkSerializer; -import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +57,6 @@ /** * The type Zookeeper configuration. - * */ public class ZookeeperConfiguration extends AbstractConfiguration { private final static Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfiguration.class); @@ -75,15 +77,17 @@ public class ZookeeperConfiguration extends AbstractConfiguration { private static final int DEFAULT_CONNECT_TIMEOUT = 2000; private static final String DEFAULT_CONFIG_PATH = ROOT_PATH + "/seata.properties"; private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + CONFIG_TYPE - + FILE_CONFIG_SPLIT_CHAR; + + FILE_CONFIG_SPLIT_CHAR; private static final ExecutorService CONFIG_EXECUTOR = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, - Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM)); - private static volatile ZkClient zkClient; + Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + new NamedThreadFactory("ZKConfigThread", THREAD_POOL_NUM)); + private static volatile CuratorFramework zkClient; private static final int MAP_INITIAL_CAPACITY = 8; - private static final ConcurrentMap> CONFIG_LISTENERS_MAP - = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); + private static final ConcurrentMap> CONFIG_LISTENERS_MAP + = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); private static volatile Properties seataConfig = new Properties(); + static final Charset CHARSET = StandardCharsets.UTF_8; + private static Map nodeCacheMap = new ConcurrentHashMap<>(); /** * Instantiates a new Zookeeper configuration. @@ -93,26 +97,51 @@ public ZookeeperConfiguration() { if (zkClient == null) { synchronized (ZookeeperConfiguration.class) { if (zkClient == null) { - ZkSerializer zkSerializer = getZkSerializer(); String serverAddr = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY); int sessionTimeout = FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT); int connectTimeout = FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + CONNECT_TIMEOUT_KEY, DEFAULT_CONNECT_TIMEOUT); - zkClient = new ZkClient(serverAddr, sessionTimeout, connectTimeout, zkSerializer); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(serverAddr) + .retryPolicy(new RetryNTimes(1, 1000)) + .connectionTimeoutMs(connectTimeout) + .sessionTimeoutMs(sessionTimeout); String username = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_USERNAME); String password = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_PASSWORD); if (!StringUtils.isBlank(username) && !StringUtils.isBlank(password)) { StringBuilder auth = new StringBuilder(username).append(":").append(password); - zkClient.addAuthInfo("digest", auth.toString().getBytes()); + builder.authorization("digest", auth.toString().getBytes()); } + zkClient = builder.build(); + zkClient.start(); } } - if (!zkClient.exists(ROOT_PATH)) { - zkClient.createPersistent(ROOT_PATH, true); + if (!checkExists(ROOT_PATH)) { + createPersistent(ROOT_PATH); } initSeataConfig(); } } + public void createPersistent(String path) { + try { + zkClient.create().forPath(path); + } catch (KeeperException.NodeExistsException e) { + LOGGER.warn("ZNode " + path + " already exists.", e); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + public boolean checkExists(String path) { + try { + if (zkClient.checkExists().forPath(path) != null) { + return true; + } + } catch (Exception e) { + } + return false; + } + @Override public String getTypeName() { return CONFIG_TYPE; @@ -125,13 +154,13 @@ public String getLatestConfig(String dataId, String defaultValue, long timeoutMi return value; } FutureTask future = new FutureTask<>(() -> { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (!zkClient.exists(path)) { + String path = buildPath(dataId); + if (!checkExists(path)) { LOGGER.warn("config {} is not existed, return defaultValue {} ", - dataId, defaultValue); + dataId, defaultValue); return defaultValue; } - String value1 = zkClient.readData(path); + String value1 = readData(path); return StringUtils.isNullOrEmpty(value1) ? defaultValue : value1; }); CONFIG_EXECUTOR.execute(future); @@ -139,25 +168,37 @@ public String getLatestConfig(String dataId, String defaultValue, long timeoutMi return future.get(timeoutMills, TimeUnit.MILLISECONDS); } catch (Exception e) { LOGGER.error("getConfig {} error or timeout, return defaultValue {}, exception:{} ", - dataId, defaultValue, e.getMessage()); + dataId, defaultValue, e.getMessage()); return defaultValue; } } + public String readData(String path) { + try { + byte[] dataBytes = zkClient.getData().forPath(path); + return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET); + } catch (KeeperException.NoNodeException e) { + // ignore NoNode Exception. + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + return null; + } + @Override public boolean putConfig(String dataId, String content, long timeoutMills) { if (!seataConfig.isEmpty()) { seataConfig.setProperty(dataId, content); - zkClient.writeData(getConfigPath(), getSeataConfigStr()); + createPersistent(getConfigPath(), getSeataConfigStr()); return true; } FutureTask future = new FutureTask<>(() -> { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (!zkClient.exists(path)) { - zkClient.create(path, content, CreateMode.PERSISTENT); + String path = buildPath(dataId); + if (!checkExists(path)) { + createPersistent(path, content); } else { - zkClient.writeData(path, content); + createPersistent(path, content); } return true; }); @@ -166,11 +207,31 @@ public boolean putConfig(String dataId, String content, long timeoutMills) { return future.get(timeoutMills, TimeUnit.MILLISECONDS); } catch (Exception e) { LOGGER.error("putConfig {}, value: {} is error or timeout, exception: {}", - dataId, content, e.getMessage()); + dataId, content, e.getMessage()); return false; } } + public String buildPath(String dataId) { + String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; + return path; + } + + protected void createPersistent(String path, String data) { + byte[] dataBytes = data.getBytes(CHARSET); + try { + zkClient.create().forPath(path, dataBytes); + } catch (KeeperException.NodeExistsException e) { + try { + zkClient.setData().forPath(path, dataBytes); + } catch (Exception e1) { + throw new IllegalStateException(e.getMessage(), e1); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + @Override public boolean putConfigIfAbsent(String dataId, String content, long timeoutMills) { throw new NotSupportYetException("not support atomic operation putConfigIfAbsent"); @@ -180,13 +241,13 @@ public boolean putConfigIfAbsent(String dataId, String content, long timeoutMill public boolean removeConfig(String dataId, long timeoutMills) { if (!seataConfig.isEmpty()) { seataConfig.remove(dataId); - zkClient.writeData(getConfigPath(), getSeataConfigStr()); + createPersistent(getConfigPath(), getSeataConfigStr()); return true; } FutureTask future = new FutureTask<>(() -> { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - return zkClient.delete(path); + String path = buildPath(dataId); + return deletePath(path); }); CONFIG_EXECUTOR.execute(future); try { @@ -198,25 +259,36 @@ public boolean removeConfig(String dataId, long timeoutMills) { } + protected boolean deletePath(String path) { + try { + zkClient.delete().deletingChildrenIfNeeded().forPath(path); + return true; + } catch (KeeperException.NoNodeException ignored) { + return true; + } catch (Exception e) { + LOGGER.error("deletePath {} is error or timeout", path, e); + return false; + } + } + @Override public void addConfigListener(String dataId, ConfigurationChangeListener listener) { if (StringUtils.isBlank(dataId) || listener == null) { return; } - + String path = buildPath(dataId); if (!seataConfig.isEmpty()) { - ZKListener zkListener = new ZKListener(dataId, listener); + NodeCacheListenerImpl zkListener = new NodeCacheListenerImpl(dataId, listener); + CuratorCacheListener.builder().forAll(zkListener).build(); CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new ConcurrentHashMap<>()) - .put(listener, zkListener); + .put(listener, zkListener); return; } - - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (zkClient.exists(path)) { - ZKListener zkListener = new ZKListener(path, listener); + if (checkExists(path)) { + NodeCacheListenerImpl zkListener = new NodeCacheListenerImpl(path, listener); CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new ConcurrentHashMap<>()) - .put(listener, zkListener); - zkClient.subscribeDataChanges(path, zkListener); + .put(listener, zkListener); + addDataListener(path, zkListener); } } @@ -227,18 +299,18 @@ public void removeConfigListener(String dataId, ConfigurationChangeListener list } Set configChangeListeners = getConfigListeners(dataId); if (CollectionUtils.isNotEmpty(configChangeListeners)) { - String path = ROOT_PATH + ZK_PATH_SPLIT_CHAR + dataId; - if (zkClient.exists(path)) { + String path = buildPath(dataId); + if (checkExists(path)) { for (ConfigurationChangeListener entry : configChangeListeners) { if (listener.equals(entry)) { - ZKListener zkListener = null; - Map configListeners = CONFIG_LISTENERS_MAP.get(dataId); + NodeCacheListenerImpl zkListener = null; + Map configListeners = CONFIG_LISTENERS_MAP.get(dataId); if (configListeners != null) { zkListener = configListeners.get(listener); configListeners.remove(entry); } if (zkListener != null) { - zkClient.unsubscribeDataChanges(path, zkListener); + removeDataListener(path, zkListener); } break; } @@ -249,7 +321,7 @@ public void removeConfigListener(String dataId, ConfigurationChangeListener list @Override public Set getConfigListeners(String dataId) { - ConcurrentMap configListeners = CONFIG_LISTENERS_MAP.get(dataId); + ConcurrentMap configListeners = CONFIG_LISTENERS_MAP.get(dataId); if (CollectionUtils.isNotEmpty(configListeners)) { return configListeners.keySet(); } else { @@ -259,15 +331,14 @@ public Set getConfigListeners(String dataId) { private void initSeataConfig() { String configPath = getConfigPath(); - String config = zkClient.readData(configPath, true); + String config = readData(configPath); if (StringUtils.isNotBlank(config)) { try { seataConfig = ConfigProcessor.processConfig(config, getZkDataType()); } catch (IOException e) { LOGGER.error("init config properties error", e); } - ZKListener zkListener = new ZKListener(configPath, null); - zkClient.subscribeDataChanges(configPath, zkListener); + addDataListener(configPath, new NodeCacheListenerImpl(configPath, null)); } } @@ -292,28 +363,25 @@ private static String getSeataConfigStr() { return sb.toString(); } - /** - * The type Zk listener. - */ - public static class ZKListener implements IZkDataListener { - + public static class NodeCacheListenerImpl implements CuratorCacheListener { private String path; private ConfigurationChangeListener listener; - /** - * Instantiates a new Zk listener. - * - * @param path the path - * @param listener the listener - */ - public ZKListener(String path, ConfigurationChangeListener listener) { + public NodeCacheListenerImpl(String path, ConfigurationChangeListener listener) { this.path = path; this.listener = listener; } @Override - public void handleDataChange(String s, Object o) { - if (s.equals(getConfigPath())) { + public void event(Type type, ChildData oldData, ChildData data) { + + String o; + if (type == Type.NODE_DELETED) { + o = ""; + } else { + o = new String(data.getData()); + } + if (path.equals(getConfigPath())) { Properties seataConfigNew = new Properties(); if (StringUtils.isNotBlank(o.toString())) { try { @@ -325,17 +393,17 @@ public void handleDataChange(String s, Object o) { } } - for (Map.Entry> entry : CONFIG_LISTENERS_MAP.entrySet()) { + for (Map.Entry> entry : CONFIG_LISTENERS_MAP.entrySet()) { String listenedDataId = entry.getKey(); String propertyOld = seataConfig.getProperty(listenedDataId, ""); String propertyNew = seataConfigNew.getProperty(listenedDataId, ""); if (!propertyOld.equals(propertyNew)) { ConfigurationChangeEvent event = new ConfigurationChangeEvent() - .setDataId(listenedDataId) - .setNewValue(propertyNew) - .setChangeType(ConfigurationChangeType.MODIFY); + .setDataId(listenedDataId) + .setNewValue(propertyNew) + .setChangeType(ConfigurationChangeType.MODIFY); - ConcurrentMap configListeners = entry.getValue(); + ConcurrentMap configListeners = entry.getValue(); for (ConfigurationChangeListener configListener : configListeners.keySet()) { configListener.onProcessEvent(event); } @@ -344,42 +412,42 @@ public void handleDataChange(String s, Object o) { seataConfig = seataConfigNew; return; + } else { + if (type == Type.NODE_DELETED) { + // Node is deleted. + String dataId = path.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); + ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setChangeType( + ConfigurationChangeType.DELETE); + listener.onProcessEvent(event); + } else { + // Node is changed. + String dataId = path.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); + ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString()) + .setChangeType(ConfigurationChangeType.MODIFY); + listener.onProcessEvent(event); + } } - String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); - ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setNewValue(o.toString()) - .setChangeType(ConfigurationChangeType.MODIFY); - listener.onProcessEvent(event); - } - - @Override - public void handleDataDeleted(String s) { - String dataId = s.replaceFirst(ROOT_PATH + ZK_PATH_SPLIT_CHAR, ""); - ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setChangeType( - ConfigurationChangeType.DELETE); - listener.onProcessEvent(event); } } - private ZkSerializer getZkSerializer() { - ZkSerializer zkSerializer = null; - String serializer = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERIALIZER_KEY); - if (StringUtils.isNotBlank(serializer)) { - try { - Class clazz = Class.forName(serializer); - Constructor constructor = clazz.getDeclaredConstructor(); - constructor.setAccessible(true); - zkSerializer = (ZkSerializer) constructor.newInstance(); - } catch (ClassNotFoundException cfe) { - LOGGER.warn("No zk serializer class found, serializer:{}", serializer, cfe); - } catch (Throwable cause) { - LOGGER.warn("found zk serializer encountered an unknown exception", cause); + protected void addDataListener(String path, NodeCacheListenerImpl nodeCacheListener) { + try { + CuratorCache nodeCache = CuratorCache.build(zkClient, path); + if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) { + return; } + nodeCache.listenable().addListener(nodeCacheListener); + nodeCache.start(); + } catch (Exception e) { + throw new IllegalStateException("Add nodeCache listener for path:" + path, e); } - if (zkSerializer == null) { - zkSerializer = new DefaultZkSerializer(); - LOGGER.info("Use default zk serializer: org.apache.seata.config.zk.DefaultZkSerializer."); - } - return zkSerializer; } + protected void removeDataListener(String path, NodeCacheListenerImpl nodeCacheListener) { + CuratorCache nodeCache = nodeCacheMap.get(path); + if (nodeCache != null) { + nodeCache.listenable().removeListener(nodeCacheListener); + } + nodeCacheListener.listener = null; + } } diff --git a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java new file mode 100644 index 00000000000..987e982b977 --- /dev/null +++ b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.config.zk; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.curator.test.TestingServer; +import org.apache.seata.config.ConfigurationChangeEvent; +import org.apache.seata.config.ConfigurationChangeListener; +import org.apache.seata.config.ConfigurationChangeType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * The type zk configuration test + */ +public class ZkConfigurationTest { + + protected static TestingServer server = null; + + @BeforeAll + public static void adBeforeClass() throws Exception { + System.setProperty("config.type", "zk"); + System.setProperty("config.zk.serverAddr", "127.0.0.1:2181"); + server = new TestingServer(2182, true); + server.start(); + } + + @AfterAll + public static void adAfterClass() throws Exception { + if (server != null) { + server.stop(); + } + } + + @Test + public void testCheckExist() { + ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); + boolean exist = zookeeperConfiguration.checkExists("/"); + Assertions.assertTrue(exist); + } + + @Test + public void testPutConfig() { + ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); + CountDownLatch countDownLatch = new CountDownLatch(1); + final boolean[] listened = {false}; + String dataId = "mockDataId"; + zookeeperConfiguration.putConfig(dataId, "value"); + ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + Assertions.assertEquals("value2", event.getNewValue()); + Assertions.assertEquals(ConfigurationChangeType.MODIFY, event.getChangeType()); + countDownLatch.countDown(); + listened[0] = true; + } + }; + zookeeperConfiguration.addConfigListener(dataId, changeListener); + zookeeperConfiguration.putConfig(dataId, "value2"); + try { + countDownLatch.await(3000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + Assertions.assertTrue(listened[0]); + + zookeeperConfiguration.removeConfig(dataId); + + zookeeperConfiguration.removeConfigListener(dataId, changeListener); + } + + @Test + public void testRemoveConfig() { + ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); + CountDownLatch countDownLatch = new CountDownLatch(1); + final boolean[] listened = {false}; + String dataId = "mockDataId"; + zookeeperConfiguration.putConfig(dataId, "value"); + ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + Assertions.assertNull(event.getNewValue()); + Assertions.assertEquals(ConfigurationChangeType.DELETE, event.getChangeType()); + countDownLatch.countDown(); + listened[0] = true; + } + }; + + zookeeperConfiguration.addConfigListener(dataId, changeListener); + + zookeeperConfiguration.putConfig(dataId, "value2"); + + zookeeperConfiguration.deletePath(zookeeperConfiguration.buildPath(dataId)); + boolean remove = zookeeperConfiguration.removeConfig(dataId); + Assertions.assertTrue(remove); + try { + countDownLatch.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assertions.assertTrue(listened[0]); + } + +} diff --git a/dependencies/pom.xml b/dependencies/pom.xml index e4887f5ca42..29f24dcdedc 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -50,7 +50,7 @@ 1.0 0.11 3.7.2 - 5.1.0 + 5.1.0 1.0.2 2.0.1 1.10.18 @@ -352,10 +352,20 @@ + + org.apache.curator + curator-recipes + ${curator.version} + + + org.apache.curator + curator-framework + ${curator.version} + org.apache.curator curator-test - ${curator-test.version} + ${curator.version} com.alipay.sofa From 58dfd883e78a14568cf8bf7adc96b21b406878f6 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Mon, 26 Aug 2024 11:16:58 +0800 Subject: [PATCH 2/9] chore: add mvn dep pom --- config/seata-config-zk/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/config/seata-config-zk/pom.xml b/config/seata-config-zk/pom.xml index 3e847041fc0..bb145f33dd3 100644 --- a/config/seata-config-zk/pom.xml +++ b/config/seata-config-zk/pom.xml @@ -53,6 +53,10 @@ org.apache.curator curator-framework + + org.apache.curator + curator-test + From a10fd3f81cde8f7f05ff582daaff8f49e1200eb0 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Mon, 26 Aug 2024 11:26:24 +0800 Subject: [PATCH 3/9] chore: add mvn dep pom --- .../org/apache/seata/config/zk/ZkConfigurationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java index 987e982b977..347148731ef 100644 --- a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java +++ b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java @@ -38,7 +38,7 @@ public class ZkConfigurationTest { public static void adBeforeClass() throws Exception { System.setProperty("config.type", "zk"); System.setProperty("config.zk.serverAddr", "127.0.0.1:2181"); - server = new TestingServer(2182, true); + server = new TestingServer(2181, true); server.start(); } @@ -61,7 +61,7 @@ public void testPutConfig() { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); CountDownLatch countDownLatch = new CountDownLatch(1); final boolean[] listened = {false}; - String dataId = "mockDataId"; + String dataId = "putMockDataId"; zookeeperConfiguration.putConfig(dataId, "value"); ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { @Override @@ -92,7 +92,7 @@ public void testRemoveConfig() { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(); CountDownLatch countDownLatch = new CountDownLatch(1); final boolean[] listened = {false}; - String dataId = "mockDataId"; + String dataId = "removeMockDataId"; zookeeperConfiguration.putConfig(dataId, "value"); ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { @Override From c4c3168bcb13e0a6d0418b06027ce5b07428bad3 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Thu, 29 Aug 2024 09:48:11 +0800 Subject: [PATCH 4/9] chore: add mvn dep pom --- config/seata-config-zk/pom.xml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/config/seata-config-zk/pom.xml b/config/seata-config-zk/pom.xml index bb145f33dd3..c84196de83b 100644 --- a/config/seata-config-zk/pom.xml +++ b/config/seata-config-zk/pom.xml @@ -35,16 +35,6 @@ seata-config-core ${project.version} - - com.101tec - zkclient - - - log4j - log4j - - - org.apache.curator curator-recipes From 50b0a035791facafd18b5b370278717d50b77872 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Mon, 2 Sep 2024 09:58:56 +0800 Subject: [PATCH 5/9] chore: add mvn dep pom --- .../seata/config/zk/DefaultZkSerializer.java | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java diff --git a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java b/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java deleted file mode 100644 index bd764d75c39..00000000000 --- a/config/seata-config-zk/src/main/java/org/apache/seata/config/zk/DefaultZkSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.config.zk; - -import org.I0Itec.zkclient.exception.ZkMarshallingError; -import org.I0Itec.zkclient.serialize.ZkSerializer; - -import java.nio.charset.StandardCharsets; - -/** - * Default zk serializer. - *

- * If the user is not configured in config.zk.serializer configuration item, then use default serializer. - * - * @since 1.3.0 - */ -public class DefaultZkSerializer implements ZkSerializer { - - @Override - public byte[] serialize(Object data) throws ZkMarshallingError { - return String.valueOf(data).getBytes(StandardCharsets.UTF_8); - } - - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - return new String(bytes, StandardCharsets.UTF_8); - } -} From 4797062e22b43cffc7eec7c06d477bd7244f4594 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Wed, 11 Sep 2024 08:18:56 +0800 Subject: [PATCH 6/9] doc: add doc --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index f20ffd37ecd..6222b711786 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -88,6 +88,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6819](https://github.com/apache/incubator-seata/pull/6819)] merge the packaging processes of namingserver and seata-server - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] rename namingserver registry type - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] add independent nacos for the CI process +- [[#6779](https://github.com/apache/incubator-seata/pull/6779)] use curator instead of zkclient in config model ### refactor: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 05d8cb958f4..ba3490cbc45 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -88,7 +88,7 @@ - [[#6819](https://github.com/apache/incubator-seata/pull/6819)] namingserver与server的合并打包 - [[#6827](https://github.com/apache/incubator-seata/pull/6827)] 重命名namingserver注册类型改为seata - [[#6836](https://github.com/apache/incubator-seata/pull/6836)] 为CI流程增加独立nacos - +- [[#6779](https://github.com/apache/incubator-seata/pull/6779)] 在config模块中使用curator替代zkclient ### refactor: From 8105b6d0e717a77d263de1aea3d827b23e8a0716 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Wed, 11 Sep 2024 08:26:44 +0800 Subject: [PATCH 7/9] doc: add doc --- .../java/org/apache/seata/config/zk/ZkConfigurationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java index 347148731ef..0be69f00ef9 100644 --- a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java +++ b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java @@ -38,7 +38,7 @@ public class ZkConfigurationTest { public static void adBeforeClass() throws Exception { System.setProperty("config.type", "zk"); System.setProperty("config.zk.serverAddr", "127.0.0.1:2181"); - server = new TestingServer(2181, true); + server = new TestingServer(2181); server.start(); } @@ -75,7 +75,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) { zookeeperConfiguration.addConfigListener(dataId, changeListener); zookeeperConfiguration.putConfig(dataId, "value2"); try { - countDownLatch.await(3000, TimeUnit.MILLISECONDS); + countDownLatch.await(10000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } From 954535c1e9ca7f1a279bf296081743a185df6ca6 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Wed, 11 Sep 2024 10:32:06 +0800 Subject: [PATCH 8/9] test: try to fix test case --- .../seata/config/zk/ZkConfigurationTest.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java index 0be69f00ef9..d55cc0e42c1 100644 --- a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java +++ b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java @@ -62,16 +62,17 @@ public void testPutConfig() { CountDownLatch countDownLatch = new CountDownLatch(1); final boolean[] listened = {false}; String dataId = "putMockDataId"; - zookeeperConfiguration.putConfig(dataId, "value"); ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { - Assertions.assertEquals("value2", event.getNewValue()); - Assertions.assertEquals(ConfigurationChangeType.MODIFY, event.getChangeType()); - countDownLatch.countDown(); - listened[0] = true; + if (event.getChangeType() == ConfigurationChangeType.MODIFY) { + Assertions.assertEquals("value2", event.getNewValue()); + countDownLatch.countDown(); + listened[0] = true; + } } }; + zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId), "value"); zookeeperConfiguration.addConfigListener(dataId, changeListener); zookeeperConfiguration.putConfig(dataId, "value2"); try { @@ -93,22 +94,20 @@ public void testRemoveConfig() { CountDownLatch countDownLatch = new CountDownLatch(1); final boolean[] listened = {false}; String dataId = "removeMockDataId"; - zookeeperConfiguration.putConfig(dataId, "value"); + zookeeperConfiguration.createPersistent(zookeeperConfiguration.buildPath(dataId), "value"); ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { - Assertions.assertNull(event.getNewValue()); - Assertions.assertEquals(ConfigurationChangeType.DELETE, event.getChangeType()); - countDownLatch.countDown(); - listened[0] = true; + if (event.getChangeType() == ConfigurationChangeType.DELETE) { + Assertions.assertNull(event.getNewValue()); + countDownLatch.countDown(); + listened[0] = true; + } } }; zookeeperConfiguration.addConfigListener(dataId, changeListener); - zookeeperConfiguration.putConfig(dataId, "value2"); - - zookeeperConfiguration.deletePath(zookeeperConfiguration.buildPath(dataId)); boolean remove = zookeeperConfiguration.removeConfig(dataId); Assertions.assertTrue(remove); try { From 4328e05f8e4ce56a89e86728680a26af7f214d36 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Wed, 11 Sep 2024 11:29:17 +0800 Subject: [PATCH 9/9] test: try to fix test case --- .../apache/seata/config/zk/ZkConfigurationTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java index d55cc0e42c1..a5cc19442f9 100644 --- a/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java +++ b/config/seata-config-zk/src/test/java/org/apache/seata/config/zk/ZkConfigurationTest.java @@ -26,12 +26,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The type zk configuration test */ public class ZkConfigurationTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ZkConfigurationTest.class); + protected static TestingServer server = null; @BeforeAll @@ -65,10 +69,11 @@ public void testPutConfig() { ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { + LOGGER.info("onChangeEvent:{}", event); if (event.getChangeType() == ConfigurationChangeType.MODIFY) { Assertions.assertEquals("value2", event.getNewValue()); - countDownLatch.countDown(); listened[0] = true; + countDownLatch.countDown(); } } }; @@ -98,10 +103,11 @@ public void testRemoveConfig() { ConfigurationChangeListener changeListener = new ConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { + LOGGER.info("onChangeEvent:{}", event); if (event.getChangeType() == ConfigurationChangeType.DELETE) { Assertions.assertNull(event.getNewValue()); - countDownLatch.countDown(); listened[0] = true; + countDownLatch.countDown(); } } };