Skip to content

Commit

Permalink
Add ZK multi set with updater to ZKBaseDataAccesor (apache#2770)
Browse files Browse the repository at this point in the history
* Add ZK multi set with updated to ZKBaseAccesor. Allows for version based transactional sets.

* Fix exception handling for multi set.
  • Loading branch information
zpinto authored Mar 4, 2024
1 parent 4a77e86 commit 7b724aa
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 0 deletions.
13 changes: 13 additions & 0 deletions helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
*/

import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
Expand Down Expand Up @@ -91,6 +93,17 @@ default boolean create(String path, T record, int options, long ttl) {
*/
boolean update(String path, DataUpdater<T> updater, int options);

/**
* This will attempt to update the data using the updater using
* each updater for the corresponding path.
* This should be used on existing ZNodes only.
* @param updaterByPath updaters for each path to update
* @return true if all the updates succeeded, false otherwise
*/
default boolean multiSet(Map<String, DataUpdater<T>> updaterByPath) {
throw new NotImplementedException("multiSet is not implemented");
}

/**
* This will remove the ZNode and all its descendants if any
* @param path path to the root ZNode to remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
Expand Down Expand Up @@ -485,6 +488,53 @@ public AccessResult doUpdate(String path, DataUpdater<T> updater, int options) {
return result;
}

/**
* transactional sync set
*/
@Override
public boolean multiSet(Map<String, DataUpdater<T>> updaterByPath) {
AccessResult result = doMultiSet(updaterByPath);
return result._retCode == RetCode.OK;
}

private AccessResult doMultiSet(Map<String, DataUpdater<T>> updaterByPath) {
AccessResult result = new AccessResult();
boolean retry;
do {
retry = false;
List<Op> ops = new ArrayList<>();
try {
for (Map.Entry<String, DataUpdater<T>> entry : updaterByPath.entrySet()) {
String path = entry.getKey();
DataUpdater<T> updater = entry.getValue();
Stat readStat = new Stat();
T oldData = (T) _zkClient.readData(path, readStat);
T newData = updater.update(oldData);
if (newData != null) {
ops.add(Op.setData(path, _zkClient.serialize(newData, path), readStat.getVersion()));
}
}
} catch (Exception e1) {
LOG.error("Exception while reading paths: " + updaterByPath.keySet(), e1);
result._retCode = RetCode.ERROR;
return result;
}

try {
_zkClient.multi(ops);
} catch (ZkBadVersionException e) {
retry = true;
} catch (Exception e1) {
LOG.error("Exception while updating paths: " + updaterByPath.keySet(), e1);
result._retCode = RetCode.ERROR;
return result;
}
} while (retry);

result._retCode = RetCode.OK;
return result;
}

/**
* sync get
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -419,6 +420,84 @@ public ZNRecord update(ZNRecord currentData) {
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncMultiSet() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

String path1 = String.format("/%s/%s", _rootPath, "foo");
String path2 = String.format("/%s/%s", _rootPath, "bar");
ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT);
accessor.create(path2, new ZNRecord("bar"), AccessOption.PERSISTENT);

boolean success = accessor.multiSet(Map.of(path1, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
currentData.setMapField("key", Map.of("key1", "value1"));
return currentData;
}
}, path2, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
currentData.setSimpleField("key", "value");
return currentData;
}
}));
Assert.assertTrue(success);
ZNRecord getRecord1 = _gZkClient.readData(path1);
ZNRecord getRecord2 = _gZkClient.readData(path2);

Assert.assertNotNull(getRecord1);
Assert.assertEquals(getRecord1.getId(), "foo");
Assert.assertEquals(getRecord1.getMapField("key"), Map.of("key1", "value1"));

Assert.assertNotNull(getRecord2);
Assert.assertEquals(getRecord2.getId(), "bar");
Assert.assertEquals(getRecord2.getSimpleField("key"), "value");

System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncMultiSetOneRecordNoExist() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

String path1 = String.format("/%s/%s", _rootPath, "foo");
String path2 = String.format("/%s/%s", _rootPath, "bar");
ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
accessor.create(path1, new ZNRecord("foo"), AccessOption.PERSISTENT);

boolean success = accessor.multiSet(Map.of(path1, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
currentData.setMapField("key", Map.of("key1", "value1"));
return currentData;
}
}, path2, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
currentData.setSimpleField("key", "value");
return currentData;
}
}));
Assert.assertFalse(success);
ZNRecord getRecord1 = _gZkClient.readData(path1);

Assert.assertNotNull(getRecord1);
Assert.assertEquals(getRecord1.getId(), "foo");
Assert.assertNotSame(getRecord1.getMapField("key"), Map.of("key1", "value1"));

System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncRemove() {
String className = TestHelper.getTestClassName();
Expand Down

0 comments on commit 7b724aa

Please sign in to comment.