From dd2db6ad59826460c8d0a5b6ce5cedf2d4162280 Mon Sep 17 00:00:00 2001 From: Sunjeet Singh Date: Tue, 5 Nov 2024 09:33:46 -0800 Subject: [PATCH] Producer- write monotonically increasing delta chain version counter --- .../api/producer/AbstractHollowProducer.java | 15 +++++++++++++++ .../netflix/hollow/core/HollowStateEngine.java | 5 +++++ .../core/util/HollowWriteStateCreatorTest.java | 3 +++ .../core/write/HollowWriteStateEngineTest.java | 3 +++ 4 files changed, 26 insertions(+) diff --git a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java index 8b7b0e3e18..e3127b645b 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java +++ b/hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java @@ -17,6 +17,7 @@ package com.netflix.hollow.api.producer; import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED; import static java.lang.System.currentTimeMillis; import static java.util.stream.Collectors.toList; @@ -524,6 +525,20 @@ private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion } writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion)); writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED); + + long prevDeltaChainVersionCounter = 0l; + if (readStates.hasCurrent()) { + String str = readStates.current().getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER); + if (str != null) { + try { + prevDeltaChainVersionCounter = Long.valueOf(str); + } catch (NumberFormatException e) { + prevDeltaChainVersionCounter = 0; + } + } + } + long deltaChainVersionCounter = prevDeltaChainVersionCounter + 1; + writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(deltaChainVersionCounter)); } void populate( diff --git a/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java b/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java index 2b6bf016d3..f8c3b9af3a 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java +++ b/hollow/src/main/java/com/netflix/hollow/core/HollowStateEngine.java @@ -75,6 +75,11 @@ public interface HollowStateEngine extends HollowDataset { */ String HEADER_TAG_PRODUCER_TO_VERSION = "hollow.blob.to.version"; + /** + * A header tag indicating monotonically increasing version in the same delta chain + */ + String HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER = "hollow.delta.chain.version.counter"; + @Override List getSchemas(); diff --git a/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java b/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java index 9fe82e99db..7b17a59336 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/util/HollowWriteStateCreatorTest.java @@ -16,6 +16,7 @@ */ package com.netflix.hollow.core.util; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; import static org.junit.Assert.assertEquals; @@ -46,6 +47,7 @@ public void recreatesUsingReadEngine() throws IOException { } writeEngine.addHeaderTag("CopyTag", "copied"); writeEngine.addHeaderTag(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis())); + writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, "1"); String toVersion = String.valueOf(System.currentTimeMillis()); writeEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, toVersion); @@ -55,6 +57,7 @@ public void recreatesUsingReadEngine() throws IOException { HollowWriteStateEngine recreatedWriteEngine = HollowWriteStateCreator.recreateAndPopulateUsingReadEngine(readEngine); assertEquals(cycleStartTime, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_METRIC_CYCLE_START)); assertEquals(readEngineToVersion, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION)); + assertEquals("1", recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER)); assertEquals(8, recreatedWriteEngine.getTypeState("Integer").getNumShards()); HollowReadStateEngine recreatedReadEngine = StateEngineRoundTripper.roundTripSnapshot(recreatedWriteEngine); diff --git a/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java b/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java index 0f39affe80..8d4829a24a 100644 --- a/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java +++ b/hollow/src/test/java/com/netflix/hollow/core/write/HollowWriteStateEngineTest.java @@ -1,5 +1,6 @@ package com.netflix.hollow.core.write; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED; import static org.junit.Assert.assertEquals; @@ -61,10 +62,12 @@ public void testHeaderTagsOnDeltaAndReverseDelta() { consumer.triggerRefreshTo(version3); // delta transition assertEquals("3", consumer.getStateEngine().getHeaderTag(TEST_TAG)); assertEquals(String.valueOf(version3), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION)); + assertEquals("3", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER)); consumer.triggerRefreshTo(version2); // reverse delta transition assertEquals("2", consumer.getStateEngine().getHeaderTag(TEST_TAG)); assertEquals(String.valueOf(version2), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION)); + assertEquals("2", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER)); }