Skip to content

Commit

Permalink
Add optional partition filters for ZK partitioning store (#1021)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Aug 6, 2024
1 parent 9995de0 commit f203207
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected void startUp() throws Exception {
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework);
searchMetadataStore = new SearchMetadataStore(curatorFramework, false);
cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework);
cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework);
cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework, cacheNodeId);
cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework);

if (Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.slack.astra.metadata.core.AstraPartitioningMetadataStore;
import com.slack.astra.proto.metadata.Metadata;
import java.util.List;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.zookeeper.CreateMode;

Expand All @@ -18,6 +19,16 @@ public CacheNodeAssignmentStore(AsyncCuratorFramework curator) {
CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH);
}

/** Restricts the cache node assignment store to only watching events for cacheNodeId */
public CacheNodeAssignmentStore(AsyncCuratorFramework curator, String cacheNodeId) {
super(
curator,
CreateMode.PERSISTENT,
new CacheNodeAssignmentSerializer().toModelSerializer(),
CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH,
List.of(cacheNodeId));
}

public ListenableFuture<?> updateAssignmentState(
final CacheNodeAssignment cacheNodeAssignment,
final Metadata.CacheNodeAssignment.CacheNodeAssignmentState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,28 @@ public class AstraPartitioningMetadataStore<T extends AstraPartitionedMetadata>
private final CreateMode createMode;
protected final ModelSerializer<T> modelSerializer;
private final Watcher watcher;
private final List<String> partitionFilters;

public AstraPartitioningMetadataStore(
AsyncCuratorFramework curator,
CreateMode createMode,
ModelSerializer<T> modelSerializer,
String storeFolder) {
this(curator, createMode, modelSerializer, storeFolder, List.of());
}

public AstraPartitioningMetadataStore(
AsyncCuratorFramework curator,
CreateMode createMode,
ModelSerializer<T> modelSerializer,
String storeFolder,
List<String> partitionFilters) {
this.curator = curator;
this.storeFolder = storeFolder;
this.createMode = createMode;
this.modelSerializer = modelSerializer;
this.watcher = buildWatcher();
this.partitionFilters = partitionFilters;

// register watchers for when partitions are added or removed
curator
Expand All @@ -88,15 +99,32 @@ public AstraPartitioningMetadataStore(
return CompletableFuture.failedFuture(throwable);
}
})
.thenAccept((children) -> children.forEach(this::getOrCreateMetadataStore))
.thenAccept(
(children) -> {
if (partitionFilters.isEmpty()) {
children.forEach(this::getOrCreateMetadataStore);
} else {
children.stream()
.filter(partitionFilters::contains)
.forEach(this::getOrCreateMetadataStore);
}
})
.toCompletableFuture()
// wait for all the stores to be initialized prior to exiting the constructor
.join();

LOG.info(
"The metadata store for folder '{}' was initialized with {} partitions",
storeFolder,
metadataStoreMap.size());
if (partitionFilters.isEmpty()) {
LOG.info(
"The metadata store for folder '{}' was initialized with {} partitions",
storeFolder,
metadataStoreMap.size());
} else {
LOG.info(
"The metadata store for folder '{}' was initialized with {} partitions (using partition filters: {})",
storeFolder,
metadataStoreMap.size(),
String.join(",", partitionFilters));
}
}

/**
Expand All @@ -118,8 +146,14 @@ private Watcher buildWatcher() {
.forPath(storeFolder)
.thenAcceptAsync(
(partitions) -> {
// create internal stores foreach partition that do not already exist
partitions.forEach(this::getOrCreateMetadataStore);
if (partitionFilters.isEmpty()) {
// create internal stores foreach partition that do not already exist
partitions.forEach(this::getOrCreateMetadataStore);
} else {
partitions.stream()
.filter(partitionFilters::contains)
.forEach(this::getOrCreateMetadataStore);
}

// remove metadata stores that exist in memory but no longer exist on ZK
Set<String> partitionsToRemove =
Expand Down Expand Up @@ -253,6 +287,15 @@ public List<T> listSync() {
}

private AstraMetadataStore<T> getOrCreateMetadataStore(String partition) {
if (!partitionFilters.isEmpty() && !partitionFilters.contains(partition)) {
LOG.error(
"Partitioning metadata store attempted to use partition {}, filters restricted to {}",
partition,
String.join(",", partitionFilters));
throw new InternalMetadataStoreException(
"Partitioning metadata store using filters that does not include provided partition");
}

return metadataStoreMap.computeIfAbsent(
partition,
(p1) -> {
Expand Down Expand Up @@ -289,12 +332,12 @@ public void addListener(AstraMetadataStoreChangeListener<T> watcher) {
// add this watcher to the list for new stores to add
listeners.add(watcher);
// add this watcher to existing stores
metadataStoreMap.forEach((partition, store) -> store.addListener(watcher));
metadataStoreMap.forEach((_, store) -> store.addListener(watcher));
}

public void removeListener(AstraMetadataStoreChangeListener<T> watcher) {
listeners.remove(watcher);
metadataStoreMap.forEach((partition, store) -> store.removeListener(watcher));
metadataStoreMap.forEach((_, store) -> store.removeListener(watcher));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -112,17 +113,22 @@ public ExampleMetadata fromJsonStr(String data) {

static class FixedPartitionExampleMetadata extends AstraPartitionedMetadata {

private final String partitionId;
private String extraField = null;

public FixedPartitionExampleMetadata(String name) {
public FixedPartitionExampleMetadata(String name, String partitionId) {
super(name);
this.partitionId = partitionId;
}

@JsonCreator
public FixedPartitionExampleMetadata(
@JsonProperty("name") String name, @JsonProperty("extraField") String extraField) {
@JsonProperty("name") String name,
@JsonProperty("extraField") String extraField,
@JsonProperty("partitionId") String partitionId) {
super(name);
this.extraField = extraField;
this.partitionId = partitionId;
}

public void setExtraField(String extraField) {
Expand All @@ -137,7 +143,7 @@ public String getExtraField() {
@JsonIgnore
public String getPartition() {
// use a fixed partition
return "1";
return partitionId;
}

@Override
Expand Down Expand Up @@ -640,16 +646,18 @@ public TestMetadataStore() {
store.addListener(beforeListener);

AtomicInteger afterCounter = new AtomicInteger(0);
FixedPartitionExampleMetadata metadata0 = new FixedPartitionExampleMetadata("foo0", "val1");
FixedPartitionExampleMetadata metadata0 =
new FixedPartitionExampleMetadata("foo0", "val1", "1");
store.createSync(metadata0);

FixedPartitionExampleMetadata metadata1 = new FixedPartitionExampleMetadata("foo1", "val1");
FixedPartitionExampleMetadata metadata1 =
new FixedPartitionExampleMetadata("foo1", "val1", "1");
store.createSync(metadata1);

// create metadata
for (int i = 2; i < 10; i++) {
FixedPartitionExampleMetadata otherMetadata =
new FixedPartitionExampleMetadata("foo" + i, "val1");
new FixedPartitionExampleMetadata("foo" + i, "val1", "1");
store.createSync(otherMetadata);
}

Expand Down Expand Up @@ -678,4 +686,56 @@ public TestMetadataStore() {
await().until(afterCounter::get, (value) -> value == 2);
}
}

@Test
void testPartitionFilters() throws Exception {
final String partitionStoreFolder = "/test_partition_filters";

class TestMetadataStore extends AstraPartitioningMetadataStore<FixedPartitionExampleMetadata> {
public TestMetadataStore() {
super(
curatorFramework,
CreateMode.PERSISTENT,
new FixedPartitionMetadataSerializer().toModelSerializer(),
partitionStoreFolder);
}
}

class FilteredTestMetadataStore
extends AstraPartitioningMetadataStore<FixedPartitionExampleMetadata> {
public FilteredTestMetadataStore() {
super(
curatorFramework,
CreateMode.PERSISTENT,
new FixedPartitionMetadataSerializer().toModelSerializer(),
partitionStoreFolder,
List.of("1", "2", "4"));
}
}

try (AstraPartitioningMetadataStore<FixedPartitionExampleMetadata> store =
new TestMetadataStore()) {
store.createSync(new FixedPartitionExampleMetadata("example1", "1"));
store.createSync(new FixedPartitionExampleMetadata("example2", "2"));
store.createSync(new FixedPartitionExampleMetadata("example3", "3"));
store.createSync(new FixedPartitionExampleMetadata("example4", "3"));
}

AtomicInteger counter = new AtomicInteger(0);
try (AstraPartitioningMetadataStore<FixedPartitionExampleMetadata> store =
new FilteredTestMetadataStore()) {
store.addListener(_ -> counter.incrementAndGet());

store.createSync(new FixedPartitionExampleMetadata("example5", "1"));
await().until(() -> counter.get() >= 1);

assertThatThrownBy(
() -> store.createSync(new FixedPartitionExampleMetadata("example6", "3")));

// example1, example2, and example 5
assertThat(store.listSync().size()).isEqualTo(3);

store.createSync(new FixedPartitionExampleMetadata("example7", "4"));
}
}
}

0 comments on commit f203207

Please sign in to comment.