Skip to content

Commit

Permalink
refactor: simplify getLastIngestionRun method
Browse files Browse the repository at this point in the history
  • Loading branch information
trialiya committed Feb 22, 2025
1 parent 9759a58 commit bbc8b05
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import static com.linkedin.metadata.Constants.DEFAULT_RUN_ID;

import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.mxe.SystemMetadata;
import java.util.ArrayList;
import java.util.List;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -26,40 +28,26 @@ public static String getLastIngestedRunId(@Nonnull EnvelopedAspectMap aspectMap)
}

/**
* Returns a sorted list of all of the most recent ingestion runs based on the most recent aspects
* present for the entity.
* Returns the most recent ingestion run based on the most recent aspects present for the entity.
*/
@Nonnull
public static List<RunInfo> getLastIngestionRuns(@Nonnull EnvelopedAspectMap aspectMap) {
final List<RunInfo> runs = new ArrayList<>();
for (String aspect : aspectMap.keySet()) {
if (aspectMap.get(aspect).hasSystemMetadata()) {
SystemMetadata systemMetadata = aspectMap.get(aspect).getSystemMetadata();
if (systemMetadata.hasLastRunId()
&& !systemMetadata.getLastRunId().equals(DEFAULT_RUN_ID)
&& systemMetadata.hasLastObserved()) {
Long lastObserved = systemMetadata.getLastObserved();
String runId = systemMetadata.getLastRunId();
RunInfo run = new RunInfo(runId, lastObserved);
runs.add(run);
} else if (systemMetadata.hasRunId()
&& !systemMetadata.getRunId().equals(DEFAULT_RUN_ID)
&& systemMetadata.hasLastObserved()) {
// Handle the legacy case: Check original run ids.
Long lastObserved = systemMetadata.getLastObserved();
String runId = systemMetadata.getRunId();
RunInfo run = new RunInfo(runId, lastObserved);
runs.add(run);
}
}
}
runs.sort((a, b) -> Long.compare(b.getTime(), a.getTime()));
return runs;
}

@Nullable
private static RunInfo getLastIngestionRun(@Nonnull EnvelopedAspectMap aspectMap) {
List<RunInfo> runs = getLastIngestionRuns(aspectMap);
return !runs.isEmpty() ? runs.get(0) : null; // Just take the first, to get the most recent run.
return aspectMap.values().stream()
.filter(EnvelopedAspect::hasSystemMetadata)
.map(EnvelopedAspect::getSystemMetadata)
.filter(SystemMetadata::hasLastObserved)
.map(
systemMetadata ->
Optional.ofNullable(systemMetadata.getLastRunId())
.filter(lastRunId -> !lastRunId.equals(DEFAULT_RUN_ID))
.or(
() ->
Optional.ofNullable(systemMetadata.getRunId())
.filter(runId -> !runId.equals(DEFAULT_RUN_ID)))
.map(runId -> new RunInfo(runId, systemMetadata.getLastObserved()))
.orElse(null))
.filter(Objects::nonNull)
.max(Comparator.comparingLong(RunInfo::getTime))
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import static com.linkedin.metadata.Constants.DEFAULT_RUN_ID;
import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.types.common.mappers.util.RunInfo;
import com.linkedin.datahub.graphql.types.common.mappers.util.SystemMetadataUtils;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.mxe.SystemMetadata;
import java.util.List;
import org.testng.annotations.Test;

public class SystemMetadataUtilsTest {
Expand Down Expand Up @@ -63,32 +61,6 @@ public void testGetLastIngestedRunId() {
assertEquals(lastRunId, "real-id-1");
}

@Test
public void testGetLastIngestedRuns() {
EnvelopedAspectMap aspectMap = new EnvelopedAspectMap();
aspectMap.put(
"default-run-id",
new EnvelopedAspect()
.setSystemMetadata(
new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(recentLastObserved)));
aspectMap.put(
"real-run-id",
new EnvelopedAspect()
.setSystemMetadata(
new SystemMetadata().setRunId("real-id-1").setLastObserved(mediumLastObserved)));
aspectMap.put(
"real-run-id2",
new EnvelopedAspect()
.setSystemMetadata(
new SystemMetadata().setRunId("real-id-2").setLastObserved(distantLastObserved)));

List<RunInfo> runs = SystemMetadataUtils.getLastIngestionRuns(aspectMap);

assertEquals(runs.size(), 2);
assertEquals(runs.get(0), new RunInfo("real-id-1", mediumLastObserved));
assertEquals(runs.get(1), new RunInfo("real-id-2", distantLastObserved));
}

@Test
public void testGetLastIngestedTimeAllDefaultRunIds() {
EnvelopedAspectMap aspectMap = new EnvelopedAspectMap();
Expand Down

0 comments on commit bbc8b05

Please sign in to comment.