Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hollow planUpdate refactoring prototype (seeking early feedback, no tests yet) #691

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class HollowClientUpdater {

private volatile HollowDataHolder hollowDataHolderVolatile;

private final HollowUpdatePlanner planner;
private final IHollowUpdatePlanner planner;
private final CompletableFuture<Long> initialLoad;
private boolean forceDoubleSnapshot = false;
private final FailedTransitionTracker failedTransitionTracker;
Expand Down Expand Up @@ -73,7 +73,20 @@ public HollowClientUpdater(HollowConsumer.BlobRetriever transitionCreator,
HollowConsumer.ObjectLongevityDetector objectLongevityDetector,
HollowConsumerMetrics metrics,
HollowMetricsCollector<HollowConsumerMetrics> metricsCollector) {
this.planner = new HollowUpdatePlanner(transitionCreator, doubleSnapshotConfig);
this(refreshListeners, apiFactory, doubleSnapshotConfig, hashCodeFinder, memoryMode, objectLongevityConfig, objectLongevityDetector, metrics, metricsCollector, new HollowUpdatePlanner(transitionCreator, doubleSnapshotConfig));
}

public HollowClientUpdater(List<HollowConsumer.RefreshListener> refreshListeners,
HollowAPIFactory apiFactory,
HollowConsumer.DoubleSnapshotConfig doubleSnapshotConfig,
HollowObjectHashCodeFinder hashCodeFinder,
MemoryMode memoryMode,
HollowConsumer.ObjectLongevityConfig objectLongevityConfig,
HollowConsumer.ObjectLongevityDetector objectLongevityDetector,
HollowConsumerMetrics metrics,
HollowMetricsCollector<HollowConsumerMetrics> metricsCollector,
IHollowUpdatePlanner planner) {
this.planner = planner;
this.failedTransitionTracker = new FailedTransitionTracker();
this.staleReferenceDetector = new StaleHollowReferenceDetector(objectLongevityConfig, objectLongevityDetector);
// Create a copy of the listeners, removing any duplicates
Expand Down Expand Up @@ -117,7 +130,6 @@ public synchronized boolean updateTo(long requestedVersion) throws Throwable {
return updateTo(new HollowConsumer.VersionInfo(requestedVersion));
}
public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersionInfo) throws Throwable {
metrics.setLastRefreshStartNs(System.nanoTime());
long requestedVersion = requestedVersionInfo.getVersion();
if (requestedVersion == getCurrentVersionId()) {
if (requestedVersion == HollowConstants.VERSION_NONE && hollowDataHolderVolatile == null) {
Expand Down Expand Up @@ -146,9 +158,9 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion

try {
HollowUpdatePlan updatePlan = shouldCreateSnapshotPlan(requestedVersionInfo)
? planner.planInitializingUpdate(requestedVersion)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
doubleSnapshotConfig.allowDoubleSnapshot());
? planner.planUpdate(HollowConstants.VERSION_NONE, requestedVersion, true)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
doubleSnapshotConfig.allowDoubleSnapshot());

for (HollowConsumer.RefreshListener listener : localListeners)
if (listener instanceof HollowConsumer.TransitionAwareRefreshListener)
Expand Down Expand Up @@ -206,7 +218,6 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
metricsCollector.collect(metrics);

initialLoad.complete(getCurrentVersionId()); // only set the first time
metrics.setLastRefreshEndNs(System.nanoTime());
return getCurrentVersionId() == requestedVersion;
} catch(Throwable th) {
forceDoubleSnapshotNextUpdate();
Expand All @@ -218,14 +229,13 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion

// intentionally omitting a call to initialLoad.completeExceptionally(th), for producers
// that write often a consumer has a chance to try another snapshot that might succeed
metrics.setLastRefreshEndNs(System.nanoTime());

throw th;
}
}

public synchronized void addRefreshListener(HollowConsumer.RefreshListener refreshListener,
HollowConsumer c) {
HollowConsumer c) {
if (refreshListener instanceof HollowConsumer.RefreshRegistrationListener) {
if (!refreshListeners.contains(refreshListener)) {
((HollowConsumer.RefreshRegistrationListener)refreshListener).onBeforeAddition(c);
Expand All @@ -237,7 +247,7 @@ public synchronized void addRefreshListener(HollowConsumer.RefreshListener refre
}

public synchronized void removeRefreshListener(HollowConsumer.RefreshListener refreshListener,
HollowConsumer c) {
HollowConsumer c) {
if (refreshListeners.remove(refreshListener)) {
if (refreshListener instanceof HollowConsumer.RefreshRegistrationListener) {
((HollowConsumer.RefreshRegistrationListener)refreshListener).onAfterRemoval(c);
Expand All @@ -248,7 +258,7 @@ public synchronized void removeRefreshListener(HollowConsumer.RefreshListener re
public long getCurrentVersionId() {
HollowDataHolder hollowDataHolderLocal = hollowDataHolderVolatile;
return hollowDataHolderLocal != null ? hollowDataHolderLocal.getCurrentVersion()
: HollowConstants.VERSION_NONE;
: HollowConstants.VERSION_NONE;
}

public void forceDoubleSnapshotNextUpdate() {
Expand All @@ -260,14 +270,14 @@ public void forceDoubleSnapshotNextUpdate() {
*/
boolean shouldCreateSnapshotPlan(HollowConsumer.VersionInfo incomingVersionInfo) {
if (getCurrentVersionId() == HollowConstants.VERSION_NONE
|| (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) {
|| (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) {
return true;
}

if (doubleSnapshotConfig.doubleSnapshotOnSchemaChange() == true) {
// double snapshot on schema change relies on presence of a header tag in incoming version metadata
if (incomingVersionInfo.getAnnouncementMetadata() == null
|| !incomingVersionInfo.getAnnouncementMetadata().isPresent()) {
|| !incomingVersionInfo.getAnnouncementMetadata().isPresent()) {
LOG.warning("Double snapshots on schema change are enabled and its functioning depends on " +
"visibility into incoming version's schema through metadata but NO metadata was available " +
"for version " + incomingVersionInfo.getVersion() + ". Check that the mechanism that triggered " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* The HollowUpdatePlanner defines the logic responsible for interacting with a {@link HollowBlobRetriever}
* to create a {@link HollowUpdatePlan}.
*/
public class HollowUpdatePlanner {
public class HollowUpdatePlanner implements IHollowUpdatePlanner {

private final HollowConsumer.BlobRetriever transitionCreator;
private final HollowConsumer.DoubleSnapshotConfig doubleSnapshotConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.netflix.hollow.api.client;

public interface IHollowUpdatePlanner {
public HollowUpdatePlan planUpdate(long currentVersion, long desiredVersion, boolean allowSnapshot) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.hollow.api.client.FailedTransitionTracker;
import com.netflix.hollow.api.client.HollowAPIFactory;
import com.netflix.hollow.api.client.HollowClientUpdater;
import com.netflix.hollow.api.client.IHollowUpdatePlanner;
import com.netflix.hollow.api.client.StaleHollowReferenceDetector;
import com.netflix.hollow.api.codegen.HollowAPIClassJavaGenerator;
import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever;
Expand Down Expand Up @@ -190,16 +191,30 @@ protected <B extends Builder<B>> HollowConsumer(B builder) {
// duplicated with HollowConsumer(...) constructor above. We cannot chain constructor calls because that
// constructor subscribes to the announcement watcher and we have more setup to do first
this.metrics = new HollowConsumerMetrics();
this.updater = new HollowClientUpdater(builder.blobRetriever,
builder.refreshListeners,
builder.apiFactory,
builder.doubleSnapshotConfig,
builder.hashCodeFinder,
builder.memoryMode,
builder.objectLongevityConfig,
builder.objectLongevityDetector,
metrics,
builder.metricsCollector);
if (builder.hollowUpdatePlanner != null) {
this.updater = new HollowClientUpdater(
builder.refreshListeners,
builder.apiFactory,
builder.doubleSnapshotConfig,
builder.hashCodeFinder,
builder.memoryMode,
builder.objectLongevityConfig,
builder.objectLongevityDetector,
metrics,
builder.metricsCollector,
builder.hollowUpdatePlanner);
} else {
this.updater = new HollowClientUpdater(builder.blobRetriever,
builder.refreshListeners,
builder.apiFactory,
builder.doubleSnapshotConfig,
builder.hashCodeFinder,
builder.memoryMode,
builder.objectLongevityConfig,
builder.objectLongevityDetector,
metrics,
builder.metricsCollector);
}
updater.setFilter(builder.typeFilter);
if(builder.skipTypeShardUpdateWithNoAdditions)
updater.setSkipShardUpdateWithNoAdditions(true);
Expand Down Expand Up @@ -1085,6 +1100,7 @@ public static class Builder<B extends HollowConsumer.Builder<B>> {
protected MemoryMode memoryMode = MemoryMode.ON_HEAP;
protected HollowMetricsCollector<HollowConsumerMetrics> metricsCollector;
protected boolean skipTypeShardUpdateWithNoAdditions = false;
protected IHollowUpdatePlanner hollowUpdatePlanner = null;

public B withBlobRetriever(HollowConsumer.BlobRetriever blobRetriever) {
this.blobRetriever = blobRetriever;
Expand Down Expand Up @@ -1156,6 +1172,11 @@ public B withRefreshListeners(HollowConsumer.RefreshListener... refreshListeners
return (B)this;
}

public B withHollowUpdatePlanner(IHollowUpdatePlanner hollowUpdatePlanner) {
this.hollowUpdatePlanner = hollowUpdatePlanner;
return (B)this;
}

/**
* Provide the code generated API class that extends {@link HollowAPI} with one or more types
* cached for direct field reads.
Expand Down
Loading