Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFD-3808: Pipeline submits its own per-dataset metrics to CloudWatch #2524

Merged
merged 10 commits into from
Jan 27, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ public final class AppConfiguration extends BaseAppConfiguration {
Set.of(
"FissClaimRdaSink.change.latency.millis",
"McsClaimRdaSink.change.latency.millis",
CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME,
DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ private PipelineOutcome createJobsAndRunPipeline(

PipelineOutcome pipelineOutcome = pipelineManager.awaitCompletion();

// Ensures that any CloudWatch metrics are published prior to the stop of the Pipeline
appMeters.close();

if (pipelineManager.getError() != null) {
throw new FatalAppException(
"Pipeline job threw exception", pipelineManager.getError(), EXIT_CODE_JOB_FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -223,6 +225,14 @@ public PipelineJobOutcome call() throws Exception {
final Set<Instant> finalManifestTimestamps =
getTimestampsFromManifestLists(finalManifestLists);

// This is a failsafe in the possible case where the final manifest of a dataset was loaded
// but the final manifest list had not yet arrived. In that case the timers started for these
// dataset(s) were never stopped, and so we should make sure they're stopped now
finalManifestLists.stream()
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false));

listener.noDataAvailable();
statusReporter.reportNothingToDo();
// Ensure all manifests from the manifest lists are accounted for and completed.
Expand Down Expand Up @@ -345,16 +355,35 @@ public PipelineJobOutcome call() throws Exception {
* processing multiple data sets in parallel (which would lead to data
* consistency problems).
*/
final var activeTimer =
loadJobMetrics.createActiveTimerForManifest(manifestToProcess).start();
final var totalTimer = Timer.start();
loadJobMetrics.startTimersForDataset(
aschey-forpeople marked this conversation as resolved.
Show resolved Hide resolved
manifestToProcess.getTimestampText(), manifestToProcess.isSyntheticData());
loadJobMetrics.startTimersForManifest(manifestToProcess);
statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsStarted(manifestRecord);
listener.dataAvailable(rifFilesEvent);
statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsProcessed(manifestRecord);
activeTimer.stop();
totalTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess));
loadJobMetrics.stopTimersForManifest(manifestToProcess);
if (!manifestToProcess.isSyntheticData()) {
// Non-synthetic datasets are typically one manifest to one RIF, so we need to look for
// the final manifest list that corresponds to the just-loaded manifest and ensure, via the
// database, that the dataset associated with the manifest that was just loaded is
// fully complete before submitting dataset metrics. If there's no final manifest list, no
// corresponding list, or the database indicates not all manifests are loaded, the timers
// will not be stopped as the dataset has not completed loading. Note that there is an edge
// case if the current manifest was the last to be loaded for a dataset but the final
// manifest list has not yet arrived. There is a failsafe above for this possibility
dataSetQueue.readFinalManifestLists().stream()
.filter(l -> l.getTimestampText().equals(manifestToProcess.getTimestampText()))
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false));
} else {
// Synthetic datasets contain only a single manifest, so if the currently loading manifest
// is synthetic we can stop the dataset timers immediately after it has loaded
loadJobMetrics.stopTimersForDataset(manifestToProcess.getTimestampText(), true);
}

LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE);

/*
Expand Down Expand Up @@ -556,6 +585,19 @@ private boolean isProcessingRequired(S3DataFile dataFileRecord) {
/** Micrometer metrics and helpers for measuring {@link CcwRifLoadJob} operations. */
@RequiredArgsConstructor
public static final class Metrics {
/**
* Name of the per-dataset {@link LongTaskTimer}s that actively, at each Micrometer reporting
* interval, records and reports the duration of processing of a given dataset.
*/
public static final String DATASET_PROCESSING_ACTIVE_TIMER_NAME =
String.format("%s.dataset_processing.active", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-dataset {@link Timer}s that report the final duration of processing once the
* dataset is processed.
*/
public static final String DATASET_PROCESSING_TOTAL_TIMER_NAME =
String.format("%s.dataset_processing.total", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-{@link DataSetManifest} {@link LongTaskTimer}s that actively, at each
Expand Down Expand Up @@ -589,39 +631,131 @@ public static final class Metrics {
/** Micrometer {@link MeterRegistry} for the Pipeline application. */
private final MeterRegistry appMetrics;

/** Map of a {@link DataSetManifest} to its active {@link ManifestTimerSet} timer metrics. */
private final Map<DataSetManifest, ManifestTimerSet> activeManifestTimersMap = new HashMap<>();

/** Map of a dataset to its active {@link DatasetTimerSet} timer metrics. */
private final Map<String, DatasetTimerSet> activeDatasetTimersMap = new HashMap<>();

/**
* Starts the active and total processing time timers for a {@link DataSetManifest} that is
* beginning to be processed. Will not start new timers if the {@link DataSetManifest} is
* already being timed.
*
* @param manifest the {@link DataSetManifest} to measure processing time for
*/
void startTimersForManifest(DataSetManifest manifest) {
activeManifestTimersMap.putIfAbsent(
manifest,
new ManifestTimerSet(createActiveTimerForManifest(manifest).start(), Timer.start()));
}

/**
* Stops the active and total processing time timers for a {@link DataSetManifest}, if they
* exist.
*
* @param manifest the {@link DataSetManifest} for which its started timers will be stopped
*/
void stopTimersForManifest(DataSetManifest manifest) {
if (!activeManifestTimersMap.containsKey(manifest)) return;

final var manifestTimers = activeManifestTimersMap.get(manifest);
manifestTimers.activeTimer.stop();
manifestTimers.totalTimer.stop(createTotalTimerForManifest(manifest));
}

/**
* Starts the active and total processing time timers for a dataset that is beginning to be
* processed. Will not start new timers if the dataset is already being timed.
*
* @param datasetTimestampText the dataset to measure processing time for
* @param isSynthetic whether the dataset is synthetic
*/
void startTimersForDataset(String datasetTimestampText, boolean isSynthetic) {
activeDatasetTimersMap.putIfAbsent(
datasetTimestampText,
new DatasetTimerSet(
createActiveTimerForDataset(datasetTimestampText, isSynthetic).start(),
Timer.start()));
}

/**
* Stops the active and total processing time timers for a {@link DataSetManifest}, if they
* exist.
*
* @param datasetTimestampText the dataset for which its processing time timers will be stopped
* @param isSynthetic whether the dataset is synthetic
*/
void stopTimersForDataset(String datasetTimestampText, boolean isSynthetic) {
if (!activeDatasetTimersMap.containsKey(datasetTimestampText)) return;

final var datasetTimers = activeDatasetTimersMap.get(datasetTimestampText);
datasetTimers.activeTimer.stop();
datasetTimers.totalTimer.stop(createTotalTimerForDataset(datasetTimestampText, isSynthetic));
}

/**
* Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes
* to process the manifest can be measured and recorded while processing is ongoing. Should be
* called prior to processing a {@link DataSetManifest}.
* to process the manifest can be measured and recorded while processing is ongoing.
*
* @param manifest the {@link DataSetManifest} to time
* @return the {@link LongTaskTimer} that will be used to actively measure and record the time
* taken to load the {@link DataSetManifest}
*/
LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
private LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
return LongTaskTimer.builder(MANIFEST_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTags(manifest))
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
}

/**
* Creates a {@link Timer} for a given {@link DataSetManifest} so that the total time it takes
* to process the manifest can be recorded. Should be used with {@link Timer.Sample#stop(Timer)}
* after processing a {@link DataSetManifest} to record the total duration.
* to process the manifest can be recorded.
*
* @param manifest the {@link DataSetManifest} to time
* @return the {@link LongTaskTimer} that will be used to record the total time taken to load
* the {@link DataSetManifest}
*/
Timer createTotalTimerForManifest(DataSetManifest manifest) {
private Timer createTotalTimerForManifest(DataSetManifest manifest) {
return Timer.builder(MANIFEST_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTags(manifest))
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
}

/**
* Creates an "active" {@link LongTaskTimer} for the provided dataset so that the running time
* it takes to process the dataset can be recorded.
*
* @param datasetTimestamp the timestamp text of the dataset to time
* @param isSynthetic whether the dataset is synthetic
* @return the {@link LongTaskTimer} that will be used to actively record the time it is taking
* to processing the dataset
*/
private LongTaskTimer createActiveTimerForDataset(
String datasetTimestamp, boolean isSynthetic) {
return LongTaskTimer.builder(DATASET_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
}

/**
* Creates a {@link Timer} for a given dataset so that the total time it takes to process the
* dataset can be recorded.
*
* @param datasetTimestamp the dataset to record the total processing time for
* @param isSynthetic whether the dataset is synthetic
* @return the {@link Timer} that will be used to record the total time taken to load the
* dataset
*/
private Timer createTotalTimerForDataset(String datasetTimestamp, boolean isSynthetic) {
return Timer.builder(DATASET_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* based on its corresponding {@link DataSetManifest}.
* for a manifest based on its corresponding {@link DataSetManifest}.
*
* @param manifest {@link DataSetManifest} from which the values of {@link
* DataSetManifest#getTimestampText()}, {@link DataSetManifest#isSyntheticData()} and {@link
Expand All @@ -630,7 +764,7 @@ Timer createTotalTimerForManifest(DataSetManifest manifest) {
* Tag}s, respectively
* @return a {@link List} of {@link Tag}s including relevant information from {@code manifest}
*/
private List<Tag> getTags(DataSetManifest manifest) {
private List<Tag> getTagsForManifestMetrics(DataSetManifest manifest) {
final var manifestFullpath = manifest.getIncomingS3Key();
final var manifestFilename =
manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1);
Expand All @@ -639,5 +773,40 @@ private List<Tag> getTags(DataSetManifest manifest) {
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())),
Tag.of(TAG_MANIFEST, manifestFilename));
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* for a dataset based on its corresponding dataset's timestamp text and whether its synthetic.
*
* @param datasetTimestamp the timestamp text of the dataset
* @param isSynthetic whether the dataset is synthetic
* @return a {@link List} of {@link Tag}s including the dataset's timestamp text and whether it
* is synthetic
*/
private List<Tag> getTagsForDatasetMetrics(String datasetTimestamp, boolean isSynthetic) {
return List.of(
Tag.of(TAG_DATA_SET_TIMESTAMP, datasetTimestamp),
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(isSynthetic)));
}

/**
* A set of started timer metrics for a {@link DataSetManifest}.
*
* @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time
* of the manifest
* @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process
* the manifest
*/
private record ManifestTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {}

/**
* A set of started timer metrics for a dataset.
*
* @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time
* of the dataset
* @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process
* the dataset
*/
private record DatasetTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public class FinalManifestList {
/** Contained list of manifests. */
private final Set<String> manifests;

/** Timestamp text extracted from the S3 prefix. */
private final String timestampText;

/** Timestamp from the S3 prefix. */
private final Instant timestamp;

Expand All @@ -28,8 +31,9 @@ public FinalManifestList(byte[] downloadedFileContent, String key) {
String prefix = key.substring(0, key.lastIndexOf('/'));

String[] components = prefix.split("/");
timestamp = Instant.parse(components[components.length - 1]);
manifests =
this.timestampText = components[components.length - 1];
this.timestamp = Instant.parse(this.timestampText);
this.manifests =
Arrays.stream(fileString.split("\n"))
.filter(l -> !l.isBlank())
.map(l -> prefix + "/" + l)
Expand Down