Skip to content

Commit

Permalink
Producer- write monotonically increasing delta chain version counter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Nov 5, 2024
1 parent 85f7038 commit dd2db6a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.hollow.api.producer;

import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
import static java.lang.System.currentTimeMillis;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -524,6 +525,20 @@ private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion
}
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED);

long prevDeltaChainVersionCounter = 0l;
if (readStates.hasCurrent()) {
String str = readStates.current().getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER);
if (str != null) {
try {
prevDeltaChainVersionCounter = Long.valueOf(str);
} catch (NumberFormatException e) {
prevDeltaChainVersionCounter = 0;
}
}
}
long deltaChainVersionCounter = prevDeltaChainVersionCounter + 1;
writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, String.valueOf(deltaChainVersionCounter));
}

void populate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public interface HollowStateEngine extends HollowDataset {
*/
String HEADER_TAG_PRODUCER_TO_VERSION = "hollow.blob.to.version";

/**
* A header tag indicating monotonically increasing version in the same delta chain
*/
String HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER = "hollow.delta.chain.version.counter";

@Override
List<HollowSchema> getSchemas();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.hollow.core.util;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -46,6 +47,7 @@ public void recreatesUsingReadEngine() throws IOException {
}
writeEngine.addHeaderTag("CopyTag", "copied");
writeEngine.addHeaderTag(HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis()));
writeEngine.addHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, "1");
String toVersion = String.valueOf(System.currentTimeMillis());
writeEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, toVersion);

Expand All @@ -55,6 +57,7 @@ public void recreatesUsingReadEngine() throws IOException {
HollowWriteStateEngine recreatedWriteEngine = HollowWriteStateCreator.recreateAndPopulateUsingReadEngine(readEngine);
assertEquals(cycleStartTime, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_METRIC_CYCLE_START));
assertEquals(readEngineToVersion, recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION));
assertEquals("1", recreatedWriteEngine.getPreviousHeaderTags().get(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));
assertEquals(8, recreatedWriteEngine.getTypeState("Integer").getNumShards());

HollowReadStateEngine recreatedReadEngine = StateEngineRoundTripper.roundTripSnapshot(recreatedWriteEngine);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.hollow.core.write;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -61,10 +62,12 @@ public void testHeaderTagsOnDeltaAndReverseDelta() {
consumer.triggerRefreshTo(version3); // delta transition
assertEquals("3", consumer.getStateEngine().getHeaderTag(TEST_TAG));
assertEquals(String.valueOf(version3), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION));
assertEquals("3", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));

consumer.triggerRefreshTo(version2); // reverse delta transition
assertEquals("2", consumer.getStateEngine().getHeaderTag(TEST_TAG));
assertEquals(String.valueOf(version2), consumer.getStateEngine().getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION));
assertEquals("2", consumer.getStateEngine().getHeaderTag(HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER));

}

Expand Down

0 comments on commit dd2db6a

Please sign in to comment.