From 424e4afae9d2ff3602ea340f3eee217f8e7e1742 Mon Sep 17 00:00:00 2001 From: Mengqing Wang Date: Fri, 16 Aug 2024 15:52:13 -0700 Subject: [PATCH] IHollowUpdatePlanner --- .../api/client/HollowClientUpdater.java | 36 ++++++++++------ .../api/client/HollowUpdatePlanner.java | 2 +- .../api/client/IHollowUpdatePlanner.java | 6 +++ .../hollow/api/consumer/HollowConsumer.java | 41 ++++++++++++++----- 4 files changed, 61 insertions(+), 24 deletions(-) create mode 100644 hollow/src/main/java/com/netflix/hollow/api/client/IHollowUpdatePlanner.java diff --git a/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java b/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java index 82590dd3f4..fa15e42d4c 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java +++ b/hollow/src/main/java/com/netflix/hollow/api/client/HollowClientUpdater.java @@ -44,7 +44,7 @@ public class HollowClientUpdater { private volatile HollowDataHolder hollowDataHolderVolatile; - private final HollowUpdatePlanner planner; + private final IHollowUpdatePlanner planner; private final CompletableFuture initialLoad; private boolean forceDoubleSnapshot = false; private final FailedTransitionTracker failedTransitionTracker; @@ -73,7 +73,20 @@ public HollowClientUpdater(HollowConsumer.BlobRetriever transitionCreator, HollowConsumer.ObjectLongevityDetector objectLongevityDetector, HollowConsumerMetrics metrics, HollowMetricsCollector metricsCollector) { - this.planner = new HollowUpdatePlanner(transitionCreator, doubleSnapshotConfig); + this(refreshListeners, apiFactory, doubleSnapshotConfig, hashCodeFinder, memoryMode, objectLongevityConfig, objectLongevityDetector, metrics, metricsCollector, new HollowUpdatePlanner(transitionCreator, doubleSnapshotConfig)); + } + + public HollowClientUpdater(List refreshListeners, + HollowAPIFactory apiFactory, + HollowConsumer.DoubleSnapshotConfig doubleSnapshotConfig, + HollowObjectHashCodeFinder hashCodeFinder, + MemoryMode memoryMode, + HollowConsumer.ObjectLongevityConfig objectLongevityConfig, + HollowConsumer.ObjectLongevityDetector objectLongevityDetector, + HollowConsumerMetrics metrics, + HollowMetricsCollector metricsCollector, + IHollowUpdatePlanner planner) { + this.planner = planner; this.failedTransitionTracker = new FailedTransitionTracker(); this.staleReferenceDetector = new StaleHollowReferenceDetector(objectLongevityConfig, objectLongevityDetector); // Create a copy of the listeners, removing any duplicates @@ -117,7 +130,6 @@ public synchronized boolean updateTo(long requestedVersion) throws Throwable { return updateTo(new HollowConsumer.VersionInfo(requestedVersion)); } public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersionInfo) throws Throwable { - metrics.setLastRefreshStartNs(System.nanoTime()); long requestedVersion = requestedVersionInfo.getVersion(); if (requestedVersion == getCurrentVersionId()) { if (requestedVersion == HollowConstants.VERSION_NONE && hollowDataHolderVolatile == null) { @@ -146,9 +158,9 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion try { HollowUpdatePlan updatePlan = shouldCreateSnapshotPlan(requestedVersionInfo) - ? planner.planInitializingUpdate(requestedVersion) - : planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion, - doubleSnapshotConfig.allowDoubleSnapshot()); + ? planner.planUpdate(HollowConstants.VERSION_NONE, requestedVersion, true) + : planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion, + doubleSnapshotConfig.allowDoubleSnapshot()); for (HollowConsumer.RefreshListener listener : localListeners) if (listener instanceof HollowConsumer.TransitionAwareRefreshListener) @@ -206,7 +218,6 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion metricsCollector.collect(metrics); initialLoad.complete(getCurrentVersionId()); // only set the first time - metrics.setLastRefreshEndNs(System.nanoTime()); return getCurrentVersionId() == requestedVersion; } catch(Throwable th) { forceDoubleSnapshotNextUpdate(); @@ -218,14 +229,13 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion // intentionally omitting a call to initialLoad.completeExceptionally(th), for producers // that write often a consumer has a chance to try another snapshot that might succeed - metrics.setLastRefreshEndNs(System.nanoTime()); throw th; } } public synchronized void addRefreshListener(HollowConsumer.RefreshListener refreshListener, - HollowConsumer c) { + HollowConsumer c) { if (refreshListener instanceof HollowConsumer.RefreshRegistrationListener) { if (!refreshListeners.contains(refreshListener)) { ((HollowConsumer.RefreshRegistrationListener)refreshListener).onBeforeAddition(c); @@ -237,7 +247,7 @@ public synchronized void addRefreshListener(HollowConsumer.RefreshListener refre } public synchronized void removeRefreshListener(HollowConsumer.RefreshListener refreshListener, - HollowConsumer c) { + HollowConsumer c) { if (refreshListeners.remove(refreshListener)) { if (refreshListener instanceof HollowConsumer.RefreshRegistrationListener) { ((HollowConsumer.RefreshRegistrationListener)refreshListener).onAfterRemoval(c); @@ -248,7 +258,7 @@ public synchronized void removeRefreshListener(HollowConsumer.RefreshListener re public long getCurrentVersionId() { HollowDataHolder hollowDataHolderLocal = hollowDataHolderVolatile; return hollowDataHolderLocal != null ? hollowDataHolderLocal.getCurrentVersion() - : HollowConstants.VERSION_NONE; + : HollowConstants.VERSION_NONE; } public void forceDoubleSnapshotNextUpdate() { @@ -260,14 +270,14 @@ public void forceDoubleSnapshotNextUpdate() { */ boolean shouldCreateSnapshotPlan(HollowConsumer.VersionInfo incomingVersionInfo) { if (getCurrentVersionId() == HollowConstants.VERSION_NONE - || (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) { + || (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) { return true; } if (doubleSnapshotConfig.doubleSnapshotOnSchemaChange() == true) { // double snapshot on schema change relies on presence of a header tag in incoming version metadata if (incomingVersionInfo.getAnnouncementMetadata() == null - || !incomingVersionInfo.getAnnouncementMetadata().isPresent()) { + || !incomingVersionInfo.getAnnouncementMetadata().isPresent()) { LOG.warning("Double snapshots on schema change are enabled and its functioning depends on " + "visibility into incoming version's schema through metadata but NO metadata was available " + "for version " + incomingVersionInfo.getVersion() + ". Check that the mechanism that triggered " + diff --git a/hollow/src/main/java/com/netflix/hollow/api/client/HollowUpdatePlanner.java b/hollow/src/main/java/com/netflix/hollow/api/client/HollowUpdatePlanner.java index 4fa9c8862a..b86e6023b4 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/client/HollowUpdatePlanner.java +++ b/hollow/src/main/java/com/netflix/hollow/api/client/HollowUpdatePlanner.java @@ -23,7 +23,7 @@ * The HollowUpdatePlanner defines the logic responsible for interacting with a {@link HollowBlobRetriever} * to create a {@link HollowUpdatePlan}. */ -public class HollowUpdatePlanner { +public class HollowUpdatePlanner implements IHollowUpdatePlanner { private final HollowConsumer.BlobRetriever transitionCreator; private final HollowConsumer.DoubleSnapshotConfig doubleSnapshotConfig; diff --git a/hollow/src/main/java/com/netflix/hollow/api/client/IHollowUpdatePlanner.java b/hollow/src/main/java/com/netflix/hollow/api/client/IHollowUpdatePlanner.java new file mode 100644 index 0000000000..b7ba1c5c7d --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/api/client/IHollowUpdatePlanner.java @@ -0,0 +1,6 @@ +package com.netflix.hollow.api.client; + +public interface IHollowUpdatePlanner { + public HollowUpdatePlan planUpdate(long currentVersion, long desiredVersion, boolean allowSnapshot) throws Exception; + +} diff --git a/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java b/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java index 327e4902c1..b20b6c0a5c 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/consumer/HollowConsumer.java @@ -24,6 +24,7 @@ import com.netflix.hollow.api.client.FailedTransitionTracker; import com.netflix.hollow.api.client.HollowAPIFactory; import com.netflix.hollow.api.client.HollowClientUpdater; +import com.netflix.hollow.api.client.IHollowUpdatePlanner; import com.netflix.hollow.api.client.StaleHollowReferenceDetector; import com.netflix.hollow.api.codegen.HollowAPIClassJavaGenerator; import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever; @@ -190,16 +191,30 @@ protected > HollowConsumer(B builder) { // duplicated with HollowConsumer(...) constructor above. We cannot chain constructor calls because that // constructor subscribes to the announcement watcher and we have more setup to do first this.metrics = new HollowConsumerMetrics(); - this.updater = new HollowClientUpdater(builder.blobRetriever, - builder.refreshListeners, - builder.apiFactory, - builder.doubleSnapshotConfig, - builder.hashCodeFinder, - builder.memoryMode, - builder.objectLongevityConfig, - builder.objectLongevityDetector, - metrics, - builder.metricsCollector); + if (builder.hollowUpdatePlanner != null) { + this.updater = new HollowClientUpdater( + builder.refreshListeners, + builder.apiFactory, + builder.doubleSnapshotConfig, + builder.hashCodeFinder, + builder.memoryMode, + builder.objectLongevityConfig, + builder.objectLongevityDetector, + metrics, + builder.metricsCollector, + builder.hollowUpdatePlanner); + } else { + this.updater = new HollowClientUpdater(builder.blobRetriever, + builder.refreshListeners, + builder.apiFactory, + builder.doubleSnapshotConfig, + builder.hashCodeFinder, + builder.memoryMode, + builder.objectLongevityConfig, + builder.objectLongevityDetector, + metrics, + builder.metricsCollector); + } updater.setFilter(builder.typeFilter); if(builder.skipTypeShardUpdateWithNoAdditions) updater.setSkipShardUpdateWithNoAdditions(true); @@ -1085,6 +1100,7 @@ public static class Builder> { protected MemoryMode memoryMode = MemoryMode.ON_HEAP; protected HollowMetricsCollector metricsCollector; protected boolean skipTypeShardUpdateWithNoAdditions = false; + protected IHollowUpdatePlanner hollowUpdatePlanner = null; public B withBlobRetriever(HollowConsumer.BlobRetriever blobRetriever) { this.blobRetriever = blobRetriever; @@ -1156,6 +1172,11 @@ public B withRefreshListeners(HollowConsumer.RefreshListener... refreshListeners return (B)this; } + public B withHollowUpdatePlanner(IHollowUpdatePlanner hollowUpdatePlanner) { + this.hollowUpdatePlanner = hollowUpdatePlanner; + return (B)this; + } + /** * Provide the code generated API class that extends {@link HollowAPI} with one or more types * cached for direct field reads.