diff --git a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java index ec540044cd..a3302438ae 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java +++ b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/AbstractRefreshMetricsListener.java @@ -17,6 +17,7 @@ package com.netflix.hollow.api.consumer.metrics; import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_ANNOUNCEMENT; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START; @@ -59,12 +60,15 @@ public abstract class AbstractRefreshMetricsListener extends AbstractRefreshList private final Map announcementTimestamps; private volatile boolean namespacePinnedPreviously; + private final Map cycleVersionDeltaCounters; // delta chain version counter for each cycle version + public AbstractRefreshMetricsListener() { lastRefreshTimeNanoOptional = OptionalLong.empty(); consecutiveFailures = 0l; cycleVersionStartTimes = new HashMap<>(); announcementTimestamps = new HashMap<>(); namespacePinnedPreviously = false; + cycleVersionDeltaCounters = new HashMap<>(); } public void refreshStarted(long currentVersion, long requestedVersion) { @@ -73,7 +77,9 @@ public void refreshStarted(long currentVersion, long requestedVersion) { refreshMetricsBuilder = new ConsumerRefreshMetrics.Builder(); refreshMetricsBuilder.setIsInitialLoad(currentVersion == VERSION_NONE); refreshMetricsBuilder.setUpdatePlanDetails(updatePlanDetails); - cycleVersionStartTimes.clear(); // clear map to avoid accumulation over time + // clear maps to avoid accumulation over time + cycleVersionStartTimes.clear(); + cycleVersionDeltaCounters.clear(); } @Override @@ -91,7 +97,7 @@ public void versionDetected(HollowConsumer.VersionInfo requestedVersionInfo) { // or for the newVersion). Don't record this metric when a namespace was pinned previously and gets unpinned // in the next cycle because this metric will record the refresh duration from the latest announced version. if (!(namespacePinnedPreviously || isPinned)) { - trackTimestampsFromHeaders(requestedVersionInfo.getVersion(), + trackHeaderTagInVersion(requestedVersionInfo.getVersion(), requestedVersionInfo.getAnnouncementMetadata().get(), HEADER_TAG_METRIC_ANNOUNCEMENT, announcementTimestamps); } namespacePinnedPreviously = isPinned; @@ -160,6 +166,9 @@ public void refreshSuccessful(long beforeVersion, long afterVersion, long reques if (cycleVersionStartTimes.containsKey(afterVersion)) { refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion)); } + if (cycleVersionDeltaCounters.containsKey(afterVersion)) { + refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion)); + } if (afterVersion == requestedVersion && announcementTimestamps.containsKey(afterVersion)) { refreshMetricsBuilder.setAnnouncementTimestamp(announcementTimestamps.get(afterVersion)); @@ -186,32 +195,37 @@ public void refreshFailed(long beforeVersion, long afterVersion, long requestedV if (cycleVersionStartTimes.containsKey(afterVersion)) { refreshMetricsBuilder.setCycleStartTimestamp(cycleVersionStartTimes.get(afterVersion)); } + if (cycleVersionDeltaCounters.containsKey(afterVersion)) { + refreshMetricsBuilder.setDeltaChainVersionCounter(cycleVersionDeltaCounters.get(afterVersion)); + } noFailRefreshEndMetricsReporting(refreshMetricsBuilder.build()); } @Override public void snapshotUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) { - trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters); } @Override public void deltaUpdateOccurred(HollowAPI refreshAPI, HollowReadStateEngine stateEngine, long version) { - trackTimestampsFromHeaders(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_METRIC_CYCLE_START, cycleVersionStartTimes); + trackHeaderTagInVersion(version, stateEngine.getHeaderTags(), HEADER_TAG_DELTA_CHAIN_VERSION_COUNTER, cycleVersionDeltaCounters); } /** - * If the blob header contains the timestamps like producer cycle start and announcement then save those values in - * the maps tracking version to cycle start time and version to announcement respectively. + * If the blob header contains a value for the given header tag (like producer cycle start time) then save that value in + * a maps tracking the value per version in this refresh. */ - private void trackTimestampsFromHeaders(long version, Map headers, String headerTag, Map timestampsMap) { + private void trackHeaderTagInVersion(long version, Map headers, String headerTag, Map tracker) { if (headers != null) { String headerTagValue = headers.get(headerTag); if (headerTagValue != null && !headerTagValue.isEmpty()) { try { - Long timestamp = Long.valueOf(headerTagValue); - if (timestamp != null) { - timestampsMap.put(version, timestamp); + Long val = Long.valueOf(headerTagValue); + if (val != null) { + tracker.put(version, val); } } catch (NumberFormatException e) { log.log(Level.WARNING, "Blob header contained " + headerTag + " but its value could" diff --git a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java index bc845fb0e5..d13c620990 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java +++ b/hollow/src/main/java/com/netflix/hollow/api/consumer/metrics/ConsumerRefreshMetrics.java @@ -32,6 +32,7 @@ public class ConsumerRefreshMetrics { private long refreshEndTimeNano; // monotonic system time when refresh ended private OptionalLong cycleStartTimestamp; // timestamp in millis of when cycle started for the loaded data version private OptionalLong announcementTimestamp; // timestamp in milliseconds to mark announcement for the loaded data version + private OptionalLong deltaChainVersionCounter; // the sequence number of a version in a delta chain /** * A class that contains details of the consumer refresh update plan that may be useful to report as metrics or logs. @@ -84,8 +85,12 @@ public long getRefreshEndTimeNano() { public OptionalLong getCycleStartTimestamp() { return cycleStartTimestamp; } - - public OptionalLong getAnnouncementTimestamp() { return announcementTimestamp; } + public OptionalLong getAnnouncementTimestamp() { + return announcementTimestamp; + } + public OptionalLong getDeltaChainVersionCounter() { + return deltaChainVersionCounter; + } private ConsumerRefreshMetrics(Builder builder) { this.durationMillis = builder.durationMillis; @@ -98,6 +103,7 @@ private ConsumerRefreshMetrics(Builder builder) { this.refreshEndTimeNano = builder.refreshEndTimeNano; this.cycleStartTimestamp = builder.cycleStartTimestamp; this.announcementTimestamp = builder.announcementTimestamp; + this.deltaChainVersionCounter = builder.deltaChainVersionCounter; } public static final class Builder { @@ -111,11 +117,13 @@ public static final class Builder { private long refreshEndTimeNano; private OptionalLong cycleStartTimestamp; private OptionalLong announcementTimestamp; + private OptionalLong deltaChainVersionCounter; public Builder() { refreshSuccessAgeMillisOptional = OptionalLong.empty(); cycleStartTimestamp = OptionalLong.empty(); announcementTimestamp = OptionalLong.empty(); + deltaChainVersionCounter = OptionalLong.empty(); } public Builder setDurationMillis(long durationMillis) { @@ -160,6 +168,10 @@ public Builder setAnnouncementTimestamp(long announcementTimestamp) { this.announcementTimestamp = OptionalLong.of(announcementTimestamp); return this; } + public Builder setDeltaChainVersionCounter(long deltaChainVersionCounter) { + this.deltaChainVersionCounter = OptionalLong.of(deltaChainVersionCounter); + return this; + } public ConsumerRefreshMetrics build() { return new ConsumerRefreshMetrics(this);