Skip to content

Commit

Permalink
Consumer- report delta chain version counter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Nov 6, 2024
1 parent 45197ba commit b4bf852
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.hollow.api.consumer.metrics;

import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START;

Expand Down Expand Up @@ -59,12 +60,15 @@ public abstract class AbstractRefreshMetricsListener extends AbstractRefreshList
private final Map<Long, Long> announcementTimestamps;
private volatile boolean namespacePinnedPreviously;

private final Map<Long, Long> cycleVersionDeltaCounters; // delta chain version counter for each cycle version

public AbstractRefreshMetricsListener() {
lastRefreshTimeNanoOptional = OptionalLong.empty();
consecutiveFailures = 0l;
cycleVersionStartTimes = new HashMap<>();
announcementTimestamps = new HashMap<>();
namespacePinnedPreviously = false;
cycleVersionDeltaCounters = new HashMap<>();
}

public void refreshStarted(long currentVersion, long requestedVersion) {
Expand All @@ -73,7 +77,9 @@ public void refreshStarted(long currentVersion, long requestedVersion) {
refreshMetricsBuilder = new ConsumerRefreshMetrics.Builder();
refreshMetricsBuilder.setIsInitialLoad(currentVersion == VERSION_NONE);
refreshMetricsBuilder.setUpdatePlanDetails(updatePlanDetails);
cycleVersionStartTimes.clear(); // clear map to avoid accumulation over time
// clear maps to avoid accumulation over time
cycleVersionStartTimes.clear();
cycleVersionDeltaCounters.clear();
}

@Override
Expand All @@ -91,7 +97,7 @@ public void versionDetected(HollowConsumer.VersionInfo requestedVersionInfo) {
// or for the newVersion). Don't record this metric when a namespace was pinned previously and gets unpinned
// in the next cycle because this metric will record the refresh duration from the latest announced version.
if (!(namespacePinnedPreviously || isPinned)) {
trackTimestampsFromHeaders(requestedVersionInfo.getVersion(),
trackHeaderTagInVersion(requestedVersionInfo.getVersion(),
requestedVersionInfo.getAnnouncementMetadata().get(), HEADER_TAG_METRIC_ANNOUNCEMENT, announcementTimestamps);
}
namespacePinnedPreviously = isPinned;
Expand Down Expand Up @@ -160,6 +166,9 @@ public void refreshSuccessful(long beforeVersion, long afterVersion, long reques
if (cycleVersionStartTimes.containsKey(afterVersion)) {
refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion));
}
if (cycleVersionDeltaCounters.containsKey(afterVersion)) {
refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion));
}

if (afterVersion == requestedVersion && announcementTimestamps.containsKey(afterVersion)) {
refreshMetricsBuilder.setAnnouncementTimestamp(announcementTimestamps.get(afterVersion));
Expand All @@ -186,32 +195,37 @@ public void refreshFailed(long beforeVersion, long afterVersion, long requestedV
if (cycleVersionStartTimes.containsKey(afterVersion)) {
refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion));
}
if (cycleVersionDeltaCounters.containsKey(afterVersion)) {
refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion));
}

noFailRefreshEndMetricsReporting(refreshMetricsBuilder.build());
}

@Override
public void snapshotUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) {
trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters);
}

@Override
public void deltaUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) {
trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes);
trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters);
}

/**
* If the blob header contains the timestamps like producer cycle start and announcement then save those values in
* the maps tracking version to cycle start time and version to announcement respectively.
* If the blob header contains a value for the given header tag (like producer cycle start time) then save that value in
* a maps tracking the value per version in this refresh.
*/
private void trackTimestampsFromHeaders(long version, Map<String, String> headers, String headerTag, Map<Long, Long> timestampsMap) {
private void trackHeaderTagInVersion(long version, Map<String, String> headers, String headerTag, Map<Long, Long> tracker) {
if (headers != null) {
String headerTagValue = headers.get(headerTag);
if (headerTagValue != null && !headerTagValue.isEmpty()) {
try {
Long timestamp = Long.valueOf(headerTagValue);
if (timestamp != null) {
timestampsMap.put(version, timestamp);
Long val = Long.valueOf(headerTagValue);
if (val != null) {
tracker.put(version, val);
}
} catch (NumberFormatException e) {
log.log(Level.WARNING, "Blob header contained " + headerTag + " but its value could"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ConsumerRefreshMetrics {
private long refreshEndTimeNano; // monotonic system time when refresh ended
private OptionalLong cycleStartTimestamp; // timestamp in millis of when cycle started for the loaded data version
private OptionalLong announcementTimestamp; // timestamp in milliseconds to mark announcement for the loaded data version
private OptionalLong deltaChainVersionCounter; // the sequence number of a version in a delta chain

/**
* A class that contains details of the consumer refresh update plan that may be useful to report as metrics or logs.
Expand Down Expand Up @@ -84,8 +85,12 @@ public long getRefreshEndTimeNano() {
public OptionalLong getCycleStartTimestamp() {
return cycleStartTimestamp;
}

public OptionalLong getAnnouncementTimestamp() { return announcementTimestamp; }
public OptionalLong getAnnouncementTimestamp() {
return announcementTimestamp;
}
public OptionalLong getDeltaChainVersionCounter() {
return deltaChainVersionCounter;
}

private ConsumerRefreshMetrics(Builder builder) {
this.durationMillis = builder.durationMillis;
Expand All @@ -98,6 +103,7 @@ private ConsumerRefreshMetrics(Builder builder) {
this.refreshEndTimeNano = builder.refreshEndTimeNano;
this.cycleStartTimestamp = builder.cycleStartTimestamp;
this.announcementTimestamp = builder.announcementTimestamp;
this.deltaChainVersionCounter = builder.deltaChainVersionCounter;
}

public static final class Builder {
Expand All @@ -111,11 +117,13 @@ public static final class Builder {
private long refreshEndTimeNano;
private OptionalLong cycleStartTimestamp;
private OptionalLong announcementTimestamp;
private OptionalLong deltaChainVersionCounter;

public Builder() {
refreshSuccessAgeMillisOptional = OptionalLong.empty();
cycleStartTimestamp = OptionalLong.empty();
announcementTimestamp = OptionalLong.empty();
deltaChainVersionCounter = OptionalLong.empty();
}

public Builder setDurationMillis(long durationMillis) {
Expand Down Expand Up @@ -160,6 +168,10 @@ public Builder setAnnouncementTimestamp(long announcementTimestamp) {
this.announcementTimestamp = OptionalLong.of(announcementTimestamp);
return this;
}
public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) {
this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter);
return this;
}

public ConsumerRefreshMetrics build() {
return new ConsumerRefreshMetrics(this);
Expand Down

0 comments on commit b4bf852

Please sign in to comment.