Skip to content

Commit

Permalink
fix(tracing): handle noop mcl
Browse files Browse the repository at this point in the history
* handle expected tracing output when no change to record template
  • Loading branch information
david-leifker committed Feb 24, 2025
1 parent 7bee19c commit 5a9c961
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

import com.datahub.util.RecordUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringMap;
import com.linkedin.mxe.SystemMetadata;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SystemMetadataUtils {
private static final String NO_OP_KEY = "isNoOp";

private SystemMetadataUtils() {}

Expand Down Expand Up @@ -42,4 +44,25 @@ public static SystemMetadata parseSystemMetadata(String jsonSystemMetadata) {
}
return RecordUtils.toRecordTemplate(SystemMetadata.class, jsonSystemMetadata);
}

public static boolean isNoOp(@Nullable SystemMetadata systemMetadata) {
if (systemMetadata != null
&& systemMetadata.hasProperties()
&& systemMetadata.getProperties() != null) {
return Boolean.parseBoolean(systemMetadata.getProperties().getOrDefault(NO_OP_KEY, "false"));
}

return false;
}

@Nullable
public static SystemMetadata setNoOp(@Nullable SystemMetadata systemMetadata, boolean isNoOp) {
if (systemMetadata != null) {
if (!systemMetadata.hasProperties() || systemMetadata.getProperties() == null) {
systemMetadata.setProperties(new StringMap());
}
systemMetadata.getProperties().put(NO_OP_KEY, String.valueOf(isNoOp));
}
return systemMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.linkedin.metadata.utils;

import static com.linkedin.metadata.Constants.DEFAULT_RUN_ID;
import static org.testng.Assert.*;

import com.linkedin.data.template.StringMap;
import com.linkedin.mxe.SystemMetadata;
import org.testng.annotations.Test;

public class SystemMetadataUtilsTest {

@Test
public void testCreateDefaultSystemMetadata() {
SystemMetadata metadata = SystemMetadataUtils.createDefaultSystemMetadata();

assertNotNull(metadata);
assertEquals(metadata.getRunId(), DEFAULT_RUN_ID);
assertTrue(metadata.hasLastObserved());
assertTrue(metadata.getLastObserved() > 0);
}

@Test
public void testCreateDefaultSystemMetadataWithRunId() {
String customRunId = "custom-run-id";
SystemMetadata metadata = SystemMetadataUtils.createDefaultSystemMetadata(customRunId);

assertNotNull(metadata);
assertEquals(metadata.getRunId(), customRunId);
assertTrue(metadata.hasLastObserved());
assertTrue(metadata.getLastObserved() > 0);
}

@Test
public void testGenerateSystemMetadataIfEmpty() {
// Test with null input
SystemMetadata nullMetadata = SystemMetadataUtils.generateSystemMetadataIfEmpty(null);
assertNotNull(nullMetadata);
assertEquals(nullMetadata.getRunId(), DEFAULT_RUN_ID);
assertTrue(nullMetadata.hasLastObserved());

// Test with existing metadata
SystemMetadata existingMetadata =
new SystemMetadata().setRunId("existing-run").setLastObserved(1234567890L);
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(existingMetadata);

assertEquals(result.getRunId(), "existing-run");
assertEquals(result.getLastObserved(), 1234567890L);
}

@Test
public void testParseSystemMetadata() {
// Test null input
SystemMetadata nullResult = SystemMetadataUtils.parseSystemMetadata(null);
assertNotNull(nullResult);
assertEquals(nullResult.getRunId(), DEFAULT_RUN_ID);

// Test empty string input
SystemMetadata emptyResult = SystemMetadataUtils.parseSystemMetadata("");
assertNotNull(emptyResult);
assertEquals(emptyResult.getRunId(), DEFAULT_RUN_ID);

// Test valid JSON input
String validJson = "{\"runId\":\"test-run\",\"lastObserved\":1234567890}";
SystemMetadata jsonResult = SystemMetadataUtils.parseSystemMetadata(validJson);
assertNotNull(jsonResult);
assertEquals(jsonResult.getRunId(), "test-run");
assertEquals(jsonResult.getLastObserved(), 1234567890L);
}

@Test
public void testIsNoOp() {
// Test null metadata
assertFalse(SystemMetadataUtils.isNoOp(null));

// Test metadata without properties
SystemMetadata emptyMetadata = new SystemMetadata();
assertFalse(SystemMetadataUtils.isNoOp(emptyMetadata));

// Test metadata with isNoOp=true
SystemMetadata noOpMetadata = new SystemMetadata();
StringMap properties = new StringMap();
properties.put("isNoOp", "true");
noOpMetadata.setProperties(properties);
assertTrue(SystemMetadataUtils.isNoOp(noOpMetadata));

// Test metadata with isNoOp=false
properties.put("isNoOp", "false");
assertFalse(SystemMetadataUtils.isNoOp(noOpMetadata));
}

@Test
public void testSetNoOp() {
// Test with null metadata
assertNull(SystemMetadataUtils.setNoOp(null, true));

// Test setting noOp to true
SystemMetadata metadata = new SystemMetadata();
SystemMetadata result = SystemMetadataUtils.setNoOp(metadata, true);
assertNotNull(result);
assertTrue(result.hasProperties());
assertNotNull(result.getProperties());
assertEquals(result.getProperties().get("isNoOp"), "true");

// Test setting noOp to false
result = SystemMetadataUtils.setNoOp(metadata, false);
assertNotNull(result);
assertTrue(result.hasProperties());
assertNotNull(result.getProperties());
assertEquals(result.getProperties().get("isNoOp"), "false");

// Test with existing properties
StringMap existingProps = new StringMap();
existingProps.put("otherKey", "value");
metadata.setProperties(existingProps);
result = SystemMetadataUtils.setNoOp(metadata, true);
assertNotNull(result);
assertEquals(result.getProperties().get("otherKey"), "value");
assertEquals(result.getProperties().get("isNoOp"), "true");
}
}
1 change: 1 addition & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ dependencies {
testImplementation externalDependency.springBootTest
testImplementation spec.product.pegasus.restliServer
testImplementation externalDependency.ebeanTest
testImplementation externalDependency.opentelemetrySdk

// logback >=1.3 required due to `testcontainers` only
testImplementation 'ch.qos.logback:logback-classic:1.4.7'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
Expand Down Expand Up @@ -142,15 +143,19 @@ default Pair<Optional<EntityAspect>, Optional<EntityAspect>> saveLatestAspect(
.equals(currentVersion0.getSystemMetadataVersion())) {

inserted = insertAspect(txContext, latestAspect.getDatabaseAspect().get(), targetVersion);

// add trace - overwrite if version incremented
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
}

// update version 0
Optional<EntityAspect> updated = Optional.empty();
boolean isNoOp =
Objects.equals(currentVersion0.getRecordTemplate(), newAspect.getRecordTemplate());

if (!Objects.equals(currentVersion0.getSystemMetadata(), newAspect.getSystemMetadata())
|| !Objects.equals(currentVersion0.getRecordTemplate(), newAspect.getRecordTemplate())) {
|| !isNoOp) {
// update no-op used for tracing
SystemMetadataUtils.setNoOp(newAspect.getSystemMetadata(), isNoOp);
// add trace - overwrite if version incremented
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
updated = updateAspect(txContext, newAspect);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.linkedin.metadata.utils.EntityApiUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeLog;
Expand Down Expand Up @@ -2050,7 +2051,8 @@ public Optional<Pair<Future<?>, Boolean>> conditionallyProduceMCLAsync(
Urn entityUrn,
AuditStamp auditStamp,
AspectSpec aspectSpec) {
boolean isNoOp = Objects.equals(oldAspect, newAspect);
boolean isNoOp =
SystemMetadataUtils.isNoOp(newSystemMetadata) || Objects.equals(oldAspect, newAspect);
if (!isNoOp || alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) {
log.info("Producing MCL for ingested aspect {}, urn {}", aspectSpec.getName(), entityUrn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.systemmetadata.TraceService;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
Expand Down Expand Up @@ -168,7 +169,12 @@ private Map<Urn, LinkedHashMap<String, TraceStorageStatus>> tracePrimaryInParall
String aspectName = aspectEntry.getKey();

if (traceId.equals(systemTraceId)) {
aspectStatuses.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.ACTIVE_STATE));
if (SystemMetadataUtils.isNoOp(systemMetadata)) {
aspectStatuses.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.NO_OP));
} else {
aspectStatuses.put(
aspectName, TraceStorageStatus.ok(TraceWriteStatus.ACTIVE_STATE));
}
} else if (traceTimestampMillis <= extractTimestamp(systemTraceId, createdOnMillis)) {
aspectStatuses.put(
aspectName, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE));
Expand Down Expand Up @@ -421,7 +427,9 @@ private static Map<String, TraceStatus> mergeStatus(
storageEntry -> {
String aspectName = storageEntry.getKey();
TraceStorageStatus primaryStatus = storageEntry.getValue();
TraceStorageStatus searchStatus = searchAspectStatus.get(aspectName);
TraceStorageStatus searchStatus =
searchAspectStatus.getOrDefault(
aspectName, TraceStorageStatus.ok(TraceWriteStatus.PENDING));
TraceStatus traceStatus =
TraceStatus.builder()
.primaryStorage(primaryStatus)
Expand All @@ -448,7 +456,7 @@ private static Map<String, TraceStatus> mergeStatus(
}

private static boolean isSuccess(
TraceStorageStatus primaryStatus, TraceStorageStatus searchStatus) {
@Nonnull TraceStorageStatus primaryStatus, @Nonnull TraceStorageStatus searchStatus) {
return !TraceWriteStatus.ERROR.equals(primaryStatus.getWriteStatus())
&& !TraceWriteStatus.ERROR.equals(searchStatus.getWriteStatus());
}
Expand Down
Loading

0 comments on commit 5a9c961

Please sign in to comment.