Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into feature/refactor-getLastIn…
Browse files Browse the repository at this point in the history
…gestionRun
  • Loading branch information
trialiya authored Feb 21, 2025
2 parents ee2e5ec + 3b4acf0 commit 8f6048c
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private Row buildNewUsersRow(@Nonnull final SearchEntity entity) {
private AnalyticsChart getNewUsersChart(OperationContext opContext) {
try {
final List<String> columns = ImmutableList.of("Name", "Title", "Email");
final String newUsersTitle = "Active Users (Last 30 Days)";
final String newUsersTitle = "New Users (Last 30 Days)";
final SearchResult result = searchForNewUsers(opContext);
final List<Row> newUserRows = new ArrayList<>();
for (SearchEntity entity : result.getEntities()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public class Constants {
public static final String EXECUTION_REQUEST_STATUS_CANCELLED = "CANCELLED";
public static final String EXECUTION_REQUEST_STATUS_ABORTED = "ABORTED";
public static final String EXECUTION_REQUEST_STATUS_DUPLICATE = "DUPLICATE";
public static final String EXECUTION_REQUEST_STATUS_ROLLING_BACK = "ROLLING_BACK";

// DataHub Access Token
public static final String ACCESS_TOKEN_KEY_ASPECT_NAME = "dataHubAccessTokenKey";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ROLLING_BACK;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS;

import com.linkedin.execution.ExecutionRequestResult;
Expand Down Expand Up @@ -56,11 +57,22 @@ protected Stream<AspectValidationException> validatePreCommitAspects(

if (IMMUTABLE_STATUS.contains(existingResult.getStatus())) {
ExecutionRequestResult currentResult = item.getAspect(ExecutionRequestResult.class);
return AspectValidationException.forItem(
item,
String.format(
"Invalid update to immutable state for aspect dataHubExecutionRequestResult. Execution urn: %s previous status: %s. Denied status update: %s",
item.getUrn(), existingResult.getStatus(), currentResult.getStatus()));
if (currentResult != null
&& !EXECUTION_REQUEST_STATUS_ROLLING_BACK.equals(currentResult.getStatus())) {
if (!Objects.equals(existingResult.getStatus(), currentResult.getStatus())) {
return AspectValidationException.forItem(
item,
String.format(
"Invalid update to immutable state for aspect dataHubExecutionRequestResult. Execution urn: %s Requested status: %s => %s. Denied update.",
item.getUrn(), existingResult.getStatus(), currentResult.getStatus()));
} else {
return AspectValidationException.forFilter(
item,
String.format(
"Invalid update to immutable state for aspect dataHubExecutionRequestResult. Execution urn: %s Requested status: %s => %s. Ignored update.",
item.getUrn(), existingResult.getStatus(), currentResult.getStatus()));
}
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2549,7 +2549,7 @@ RollbackResult deleteAspectWithoutMCL(

// 1. Fetch the latest existing version of the aspect.
final SystemAspect latest =
aspectDao.getLatestAspect(opContext, urn, aspectName, true);
aspectDao.getLatestAspect(opContext, urn, aspectName, false);

// 1.1 If no latest exists, skip this aspect
if (latest == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_FAILURE;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ROLLING_BACK;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_RUNNING;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS;
import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_TIMEOUT;
Expand Down Expand Up @@ -163,4 +164,131 @@ public void testDenied() {
deniedUpdateStates.size() * destinationStates.size(),
"Expected ALL items to be denied.");
}

@Test
public void testRollingBackTransitionAllowed() {
ExecutionRequestResultValidator test = new ExecutionRequestResultValidator();
test.setConfig(TEST_PLUGIN_CONFIG);

Set<String> immutableStates =
Set.of(
EXECUTION_REQUEST_STATUS_ABORTED,
EXECUTION_REQUEST_STATUS_CANCELLED,
EXECUTION_REQUEST_STATUS_SUCCESS,
EXECUTION_REQUEST_STATUS_DUPLICATE);

List<ChangeMCP> testItems =
new ArrayList<>(
immutableStates.stream()
.map(
prevState -> {
SystemAspect prevData = mock(SystemAspect.class);
when(prevData.getRecordTemplate())
.thenReturn(new ExecutionRequestResult().setStatus(prevState));
return ChangeItemImpl.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_URN)
.aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME)
.recordTemplate(
new ExecutionRequestResult()
.setStatus(EXECUTION_REQUEST_STATUS_ROLLING_BACK))
.previousSystemAspect(prevData)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(TEST_CONTEXT.getAspectRetriever());
})
.toList());

List<AspectValidationException> result =
test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList();

assertTrue(result.isEmpty(), "Expected all transitions to ROLLING_BACK to be allowed");
}

@Test
public void testRollingBackToOtherStatesDenied() {
ExecutionRequestResultValidator test = new ExecutionRequestResultValidator();
test.setConfig(TEST_PLUGIN_CONFIG);

Set<String> destinationStates =
new HashSet<>(
Set.of(
EXECUTION_REQUEST_STATUS_RUNNING,
EXECUTION_REQUEST_STATUS_FAILURE,
EXECUTION_REQUEST_STATUS_TIMEOUT,
EXECUTION_REQUEST_STATUS_ABORTED,
EXECUTION_REQUEST_STATUS_CANCELLED,
EXECUTION_REQUEST_STATUS_SUCCESS,
EXECUTION_REQUEST_STATUS_DUPLICATE));

List<ChangeMCP> testItems =
new ArrayList<>(
destinationStates.stream()
.map(
destState -> {
SystemAspect prevData = mock(SystemAspect.class);
when(prevData.getRecordTemplate())
.thenReturn(new ExecutionRequestResult().setStatus(destState));
return ChangeItemImpl.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_URN)
.aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME)
.recordTemplate(
new ExecutionRequestResult()
.setStatus(EXECUTION_REQUEST_STATUS_ROLLING_BACK))
.previousSystemAspect(prevData)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(TEST_CONTEXT.getAspectRetriever());
})
.toList());

List<AspectValidationException> result =
test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList();

assertEquals(result.size(), 0, "Expected all transitions to ROLLING_BACK to be allowed");
}

@Test
public void testSameStatusUpdateFiltered() {
ExecutionRequestResultValidator test = new ExecutionRequestResultValidator();
test.setConfig(TEST_PLUGIN_CONFIG);

Set<String> immutableStates =
Set.of(
EXECUTION_REQUEST_STATUS_ABORTED,
EXECUTION_REQUEST_STATUS_CANCELLED,
EXECUTION_REQUEST_STATUS_SUCCESS,
EXECUTION_REQUEST_STATUS_DUPLICATE);

List<ChangeMCP> testItems =
new ArrayList<>(
immutableStates.stream()
.map(
state -> {
SystemAspect prevData = mock(SystemAspect.class);
when(prevData.getRecordTemplate())
.thenReturn(new ExecutionRequestResult().setStatus(state));
return ChangeItemImpl.builder()
.changeType(ChangeType.UPSERT)
.urn(TEST_URN)
.aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME)
.recordTemplate(new ExecutionRequestResult().setStatus(state))
.previousSystemAspect(prevData)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(TEST_CONTEXT.getAspectRetriever());
})
.toList());

List<AspectValidationException> result =
test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList();

assertEquals(
result.size(), immutableStates.size(), "Expected all same-status updates to be filtered");

// Verify that all exceptions are of type "filter"
for (AspectValidationException exception : result) {
assertTrue(
exception.getMessage().contains("Ignored update"),
"Expected filter type exception with 'Ignored update' message");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.ebean.test.LoggedSql;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -73,6 +74,7 @@ public void setupTest() {
entityService =
new EntityServiceImpl(aspectDao, mock(EventProducer.class), false, preProcessHooks, true);
entityService.setUpdateIndicesService(mock(UpdateIndicesService.class));
entityService.setRetentionService(null);
}

@Test
Expand All @@ -83,13 +85,15 @@ public void testEmptyORMOptimization() {
0,
0,
0,
"empty");
"empty",
"");
}

@Test
public void testUpsertOptimization() {
Urn testUrn1 =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,testUpsertOptimization,PROD)");
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:opt,testOptimization,PROD)");
final String mustInclude = "urn:li:dataPlatform:opt";

// single insert (non-existing)
assertSQL(
Expand All @@ -108,7 +112,8 @@ public void testUpsertOptimization() {
nonExistingBaseCount + 1,
1,
0,
"initial: single insert");
"initial: single insert",
mustInclude);

// single update (existing from previous - no-op)
// 1. nextVersion
Expand All @@ -129,7 +134,8 @@ public void testUpsertOptimization() {
existingBaseCount + 2,
0,
1,
"existing: single no-op");
"existing: single no-op",
mustInclude);

// multiple (existing from previous - multiple no-ops)
assertSQL(
Expand All @@ -155,7 +161,8 @@ public void testUpsertOptimization() {
existingBaseCount + 2,
0,
1,
"existing: multiple no-ops. expected no additional interactions vs single no-op");
"existing: multiple no-ops. expected no additional interactions vs single no-op",
mustInclude);

// single update (existing from previous - with actual change)
// 1. nextVersion
Expand All @@ -176,7 +183,8 @@ public void testUpsertOptimization() {
existingBaseCount + 2,
1,
1,
"existing: single change");
"existing: single change",
mustInclude);

// multiple update (existing from previous - with 2 actual changes)
// 1. nextVersion
Expand Down Expand Up @@ -204,15 +212,17 @@ public void testUpsertOptimization() {
existingBaseCount + 2,
1,
1,
"existing: multiple change. expected no additional statements over single change");
"existing: multiple change. expected no additional statements over single change",
mustInclude);
}

private void assertSQL(
@Nonnull AspectsBatch batch,
int expectedSelectCount,
int expectedInsertCount,
int expectedUpdateCount,
@Nullable String description) {
@Nullable String description,
@Nonnull String mustInclude) {

// Clear any existing logged statements
LoggedSql.stop();
Expand All @@ -222,11 +232,26 @@ private void assertSQL(

try {
entityService.ingestProposal(opContext, batch, false);
List<String> txnLog =
LoggedSql.collect().stream()
.filter(sql -> sql.startsWith("txn[]"))
.collect(
ArrayList::new,
(ArrayList<String> list, String sql) -> {
// fold into previous line
if (sql.startsWith("txn[] -- ")) {
String current = list.get(list.size() - 1);
list.set(list.size() - 1, current + "\n" + sql);
} else {
list.add(sql);
}
},
ArrayList::addAll);

// Get the captured SQL statements
Map<String, List<String>> statementMap =
LoggedSql.stop().stream()
// only consider transaction statements
.filter(sql -> sql.startsWith("txn[]") && !sql.startsWith("txn[] -- "))
txnLog.stream()
.filter(sql -> sql.contains(mustInclude))
.collect(
Collectors.groupingBy(
sql -> {
Expand All @@ -251,20 +276,20 @@ private void assertSQL(
statementMap.getOrDefault("SELECT", List.of()).size(),
expectedSelectCount,
String.format(
"(%s) Expected SELECT SQL count mismatch: %s",
description, statementMap.get("SELECT")));
"(%s) Expected SELECT SQL count mismatch filtering for (%s): %s",
description, mustInclude, statementMap.get("SELECT")));
assertEquals(
statementMap.getOrDefault("INSERT", List.of()).size(),
expectedInsertCount,
String.format(
"(%s) Expected INSERT SQL count mismatch: %s",
description, statementMap.get("INSERT")));
"(%s) Expected INSERT SQL count mismatch filtering for (%s): %s",
description, mustInclude, statementMap.get("INSERT")));
assertEquals(
statementMap.getOrDefault("UPDATE", List.of()).size(),
expectedUpdateCount,
String.format(
"(%s), Expected UPDATE SQL count mismatch: %s",
description, statementMap.get("UPDATE")));
"(%s), Expected UPDATE SQL count mismatch filtering for (%s): %s",
description, mustInclude, statementMap.get("UPDATE")));
} finally {
// Ensure logging is stopped even if assertions fail
LoggedSql.stop();
Expand Down

0 comments on commit 8f6048c

Please sign in to comment.