diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java index 110b6810ef..162ffb05d2 100644 --- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java @@ -20,7 +20,9 @@ */ import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; @@ -91,6 +93,17 @@ default boolean create(String path, T record, int options, long ttl) { */ boolean update(String path, DataUpdater updater, int options); + /** + * This will attempt to update the data using the updater using + * each updater for the corresponding path. + * This should be used on existing ZNodes only. + * @param updaterByPath updaters for each path to update + * @return true if all the updates succeeded, false otherwise + */ + default boolean multiSet(Map> updaterByPath) { + throw new NotImplementedException("multiSet is not implemented"); + } + /** * This will remove the ZNode and all its descendants if any * @param path path to the root ZNode to remove diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index 0c7a94f1b6..e074a22e68 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -56,7 +56,10 @@ import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import org.slf4j.Logger; @@ -485,6 +488,53 @@ public AccessResult doUpdate(String path, DataUpdater updater, int options) { return result; } + /** + * transactional sync set + */ + @Override + public boolean multiSet(Map> updaterByPath) { + AccessResult result = doMultiSet(updaterByPath); + return result._retCode == RetCode.OK; + } + + private AccessResult doMultiSet(Map> updaterByPath) { + AccessResult result = new AccessResult(); + boolean retry; + do { + retry = false; + List ops = new ArrayList<>(); + try { + for (Map.Entry> entry : updaterByPath.entrySet()) { + String path = entry.getKey(); + DataUpdater updater = entry.getValue(); + Stat readStat = new Stat(); + T oldData = (T) _zkClient.readData(path, readStat); + T newData = updater.update(oldData); + if (newData != null) { + ops.add(Op.setData(path, _zkClient.serialize(newData, path), readStat.getVersion())); + } + } + } catch (Exception e1) { + LOG.error("Exception while reading paths: " + updaterByPath.keySet(), e1); + result._retCode = RetCode.ERROR; + return result; + } + + try { + _zkClient.multi(ops); + } catch (ZkBadVersionException e) { + retry = true; + } catch (Exception e1) { + LOG.error("Exception while updating paths: " + updaterByPath.keySet(), e1); + result._retCode = RetCode.ERROR; + return result; + } + } while (retry); + + result._retCode = RetCode.OK; + return result; + } + /** * sync get */ diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java index 8d9cd29d97..bf4f342ddb 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; @@ -419,6 +420,84 @@ public ZNRecord update(ZNRecord currentData) { System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); } + @Test + public void testSyncMultiSet() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path1 = String.format("/%s/%s", _rootPath, "foo"); + String path2 = String.format("/%s/%s", _rootPath, "bar"); + ZkBaseDataAccessor accessor = new ZkBaseDataAccessor(_gZkClient); + accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT); + accessor.create(path2, new ZNRecord("bar"), AccessOption.PERSISTENT); + + boolean success = accessor.multiSet(Map.of(path1, new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + currentData.setMapField("key", Map.of("key1", "value1")); + return currentData; + } + }, path2, new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + currentData.setSimpleField("key", "value"); + return currentData; + } + })); + Assert.assertTrue(success); + ZNRecord getRecord1 = _gZkClient.readData(path1); + ZNRecord getRecord2 = _gZkClient.readData(path2); + + Assert.assertNotNull(getRecord1); + Assert.assertEquals(getRecord1.getId(), "foo"); + Assert.assertEquals(getRecord1.getMapField("key"), Map.of("key1", "value1")); + + Assert.assertNotNull(getRecord2); + Assert.assertEquals(getRecord2.getId(), "bar"); + Assert.assertEquals(getRecord2.getSimpleField("key"), "value"); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSyncMultiSetOneRecordNoExist() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String testName = className + "_" + methodName; + + System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis())); + + String path1 = String.format("/%s/%s", _rootPath, "foo"); + String path2 = String.format("/%s/%s", _rootPath, "bar"); + ZkBaseDataAccessor accessor = new ZkBaseDataAccessor(_gZkClient); + accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT); + + boolean success = accessor.multiSet(Map.of(path1, new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + currentData.setMapField("key", Map.of("key1", "value1")); + return currentData; + } + }, path2, new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + currentData.setSimpleField("key", "value"); + return currentData; + } + })); + Assert.assertFalse(success); + ZNRecord getRecord1 = _gZkClient.readData(path1); + + Assert.assertNotNull(getRecord1); + Assert.assertEquals(getRecord1.getId(), "foo"); + Assert.assertNotSame(getRecord1.getMapField("key"), Map.of("key1", "value1")); + + System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); + } + @Test public void testSyncRemove() { String className = TestHelper.getTestClassName();