Skip to content

Commit

Permalink
Add per-dataset timer metrics that time the active and total duration…
Browse files Browse the repository at this point in the history
… of dataset processing
  • Loading branch information
malessi committed Jan 21, 2025
1 parent 344b687 commit 41ebf37
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ public final class AppConfiguration extends BaseAppConfiguration {
Set.of(
"FissClaimRdaSink.change.latency.millis",
"McsClaimRdaSink.change.latency.millis",
CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME,
DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -223,6 +225,20 @@ public PipelineJobOutcome call() throws Exception {
final Set<Instant> finalManifestTimestamps =
getTimestampsFromManifestLists(finalManifestLists);

// This is a failsafe in the possible case where the final manifest of a dataset was loaded
// but the final manifest list had not yet arrived. In that case the timers started for these
// dataset(s) were never stopped, and so we should make sure they're stopped now
finalManifestLists.stream()
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(
dataset -> {
if (loadJobMetrics.activeTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopActiveTimerSampleForDataset(dataset);
if (loadJobMetrics.totalTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopTotalTimerSampleForDataset(dataset, false);
});

listener.noDataAvailable();
statusReporter.reportNothingToDo();
// Ensure all manifests from the manifest lists are accounted for and completed.
Expand Down Expand Up @@ -345,16 +361,50 @@ public PipelineJobOutcome call() throws Exception {
* processing multiple data sets in parallel (which would lead to data
* consistency problems).
*/
final var activeTimer =
if (!loadJobMetrics.activeTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.startActiveTimerSampleForDataset(
manifestToProcess.getTimestampText(), manifestToProcess.isSyntheticData());
if (!loadJobMetrics.totalTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.startTotalTimerSampleForDataset(manifestToProcess.getTimestampText());
final var activeManifestTimer =
loadJobMetrics.createActiveTimerForManifest(manifestToProcess).start();
final var totalTimer = Timer.start();
final var totalManifestTimer = Timer.start();
statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsStarted(manifestRecord);
listener.dataAvailable(rifFilesEvent);
statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsProcessed(manifestRecord);
activeTimer.stop();
totalTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess));
totalManifestTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess));
activeManifestTimer.stop();
if (!manifestToProcess.isSyntheticData()) {
// Non-synthetic datasets are typically one manifest to one RIF, so we need to look for
// the final manifest list that corresponds to the just-loaded manifest and ensure, via the
// database, that the dataset associated with the manifest that was just loaded is
// fully complete before submitting dataset metrics. If there's no final manifest list, no
// corresponding list, or the database indicates not all manifests are loaded, the timers
// will not be stopped as the dataset has not completed loading. Note that there is an edge
// case if the current manifest was the last to be loaded for a dataset but the final
// manifest list has not yet arrived. There is a failsafe above for this possibility
dataSetQueue.readFinalManifestLists().stream()
.filter(l -> l.getTimestampText().equals(manifestToProcess.getTimestampText()))
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(
dataset -> {
if (loadJobMetrics.activeTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopActiveTimerSampleForDataset(dataset);
if (loadJobMetrics.totalTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopTotalTimerSampleForDataset(dataset, false);
});
} else {
// Synthetic datasets contain only a single manifest, so if the currently loading manifest
// is synthetic we can stop the dataset timers immediately after it has loaded
if (loadJobMetrics.activeTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.stopActiveTimerSampleForDataset(manifestToProcess.getTimestampText());
if (loadJobMetrics.totalTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.stopTotalTimerSampleForDataset(manifestToProcess.getTimestampText(), true);
}

LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE);

/*
Expand Down Expand Up @@ -556,6 +606,19 @@ private boolean isProcessingRequired(S3DataFile dataFileRecord) {
/** Micrometer metrics and helpers for measuring {@link CcwRifLoadJob} operations. */
@RequiredArgsConstructor
public static final class Metrics {
/**
* Name of the per-dataset {@link LongTaskTimer}s that actively, at each Micrometer reporting
* interval, records and reports the duration of processing of a given dataset.
*/
public static final String DATASET_PROCESSING_ACTIVE_TIMER_NAME =
String.format("%s.dataset_processing.active", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-dataset {@link Timer}s that report the final duration of processing once the
* dataset is processed.
*/
public static final String DATASET_PROCESSING_TOTAL_TIMER_NAME =
String.format("%s.dataset_processing.total", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-{@link DataSetManifest} {@link LongTaskTimer}s that actively, at each
Expand Down Expand Up @@ -589,6 +652,22 @@ public static final class Metrics {
/** Micrometer {@link MeterRegistry} for the Pipeline application. */
private final MeterRegistry appMetrics;

/**
* Maps a dataset's timestamp text to an active, started {@link LongTaskTimer.Sample}.
*
* @implNote This is necessary as the {@link CcwRifLoadJob} does not have any concept of a
* "dataset" as a whole. We cannot sequentially create a timer, start it, and then close it
* in the same method call unlike the other timers; so, we need to store it.
*/
private final Map<String, LongTaskTimer.Sample> activeDatasetTimerSamplesMap = new HashMap<>();

/**
* Maps a dataset's timestamp text to a total, started {@link Timer.Sample}.
*
* @implNote See the implNote for {@link #activeDatasetTimerSamplesMap} for justification.
*/
private final Map<String, Timer.Sample> totalDatasetTimerSamplesMap = new HashMap<>();

/**
* Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes
* to process the manifest can be measured and recorded while processing is ongoing. Should be
Expand All @@ -600,7 +679,7 @@ public static final class Metrics {
*/
LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
return LongTaskTimer.builder(MANIFEST_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTags(manifest))
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
}

Expand All @@ -615,13 +694,90 @@ LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
*/
Timer createTotalTimerForManifest(DataSetManifest manifest) {
return Timer.builder(MANIFEST_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTags(manifest))
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
}

/**
* Starts an active {@link LongTaskTimer.Sample} for the provided dataset given its timestamp
* text and whether it is synthetic or not.
*
* @param datasetTimestamp the timestamp text of the dataset to time
* @param isSynthetic whether the dataset is synthetic
*/
void startActiveTimerSampleForDataset(String datasetTimestamp, boolean isSynthetic) {
LongTaskTimer activeTimer =
LongTaskTimer.builder(DATASET_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
activeDatasetTimerSamplesMap.put(datasetTimestamp, activeTimer.start());
}

/**
* Stops an already existing, active {@link LongTaskTimer.Sample} for the given dataset provided
* its timestamp text.
*
* @param datasetTimestamp the timestamp text of a dataset for which an existing, active {@link
* LongTaskTimer.Sample} will be stopped
*/
void stopActiveTimerSampleForDataset(String datasetTimestamp) {
activeDatasetTimerSamplesMap.get(datasetTimestamp).stop();
activeDatasetTimerSamplesMap.remove(datasetTimestamp);
}

/**
* Returns whether a dataset has an existing, active {@link LongTaskTimer.Sample} actively
* recording the time it is taking to load the dataset.
*
* @param datasetTimestamp the timestamp text of the dataset
* @return {@code true} if a {@link LongTaskTimer.Sample} exists for the dataset, {@code false}
* otherwise
*/
boolean activeTimerSampleExistsForDataset(String datasetTimestamp) {
return activeDatasetTimerSamplesMap.containsKey(datasetTimestamp);
}

/**
* Starts a total {@link Timer.Sample} for the provided dataset given its timestamp text.
*
* @param datasetTimestamp the timestamp text of the dataset to time
*/
void startTotalTimerSampleForDataset(String datasetTimestamp) {
totalDatasetTimerSamplesMap.put(datasetTimestamp, Timer.start());
}

/**
* Stops an already existing, total {@link Timer.Sample} for the given dataset provided its
* timestamp text and whether it is a synthetic dataset.
*
* @param datasetTimestamp the timestamp text of a dataset for which an existing, active {@link
* LongTaskTimer.Sample} will be stopped
* @param isSynthetic whether the dataset is synthetic
*/
void stopTotalTimerSampleForDataset(String datasetTimestamp, boolean isSynthetic) {
Timer totalTimer =
Timer.builder(DATASET_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
totalDatasetTimerSamplesMap.get(datasetTimestamp).stop(totalTimer);
totalDatasetTimerSamplesMap.remove(datasetTimestamp);
}

/**
* Returns whether a dataset has an existing, total {@link Timer.Sample} recording the total
* time it is taking to load the dataset.
*
* @param datasetTimestamp the timestamp text of the dataset
* @return {@code true} if a {@link Timer.Sample} exists for the dataset, {@code false}
* otherwise
*/
boolean totalTimerSampleExistsForDataset(String datasetTimestamp) {
return totalDatasetTimerSamplesMap.containsKey(datasetTimestamp);
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* based on its corresponding {@link DataSetManifest}.
* for a manifest based on its corresponding {@link DataSetManifest}.
*
* @param manifest {@link DataSetManifest} from which the values of {@link
* DataSetManifest#getTimestampText()}, {@link DataSetManifest#isSyntheticData()} and {@link
Expand All @@ -630,7 +786,7 @@ Timer createTotalTimerForManifest(DataSetManifest manifest) {
* Tag}s, respectively
* @return a {@link List} of {@link Tag}s including relevant information from {@code manifest}
*/
private List<Tag> getTags(DataSetManifest manifest) {
private List<Tag> getTagsForManifestMetrics(DataSetManifest manifest) {
final var manifestFullpath = manifest.getIncomingS3Key();
final var manifestFilename =
manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1);
Expand All @@ -639,5 +795,20 @@ private List<Tag> getTags(DataSetManifest manifest) {
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())),
Tag.of(TAG_MANIFEST, manifestFilename));
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* for a dataset based on its corresponding dataset's timestamp text and whether its synthetic.
*
* @param datasetTimestamp the timestamp text of the dataset
* @param isSynthetic whether the dataset is synthetic
* @return a {@link List} of {@link Tag}s including the dataset's timestamp text and whether it
* is synthetic
*/
private List<Tag> getTagsForDatasetMetrics(String datasetTimestamp, boolean isSynthetic) {
return List.of(
Tag.of(TAG_DATA_SET_TIMESTAMP, datasetTimestamp),
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(isSynthetic)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public class FinalManifestList {
/** Contained list of manifests. */
private final Set<String> manifests;

/** Timestamp text extracted from the S3 prefix. */
private final String timestampText;

/** Timestamp from the S3 prefix. */
private final Instant timestamp;

Expand All @@ -28,8 +31,9 @@ public FinalManifestList(byte[] downloadedFileContent, String key) {
String prefix = key.substring(0, key.lastIndexOf('/'));

String[] components = prefix.split("/");
timestamp = Instant.parse(components[components.length - 1]);
manifests =
this.timestampText = components[components.length - 1];
this.timestamp = Instant.parse(this.timestampText);
this.manifests =
Arrays.stream(fileString.split("\n"))
.filter(l -> !l.isBlank())
.map(l -> prefix + "/" + l)
Expand Down

0 comments on commit 41ebf37

Please sign in to comment.