Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beta| Startup time #2801

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions common/src/main/java/org/apache/atlas/service/Services.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,6 @@ public static void printHashMapInTableFormatDescendingOrder(Map<String, Long> ma
// Sort the list by values in descending order
list.sort((entry1, entry2) -> entry2.getValue().compareTo(entry1.getValue()));

// Find the longest key to determine column width
int maxKeyLength = list.stream().map(entry -> entry.getKey().length()).max(Integer::compare).orElse(0);

// Create format string for printing each row
String rowFormat = "| %-" + maxKeyLength + "s | %-10s |\n";

// Print table header
LOG.info(System.out.printf(rowFormat, "Key", value).toString());
LOG.info(new String(new char[maxKeyLength + 15]).replace('\0', '-'));

LOG.info("Capturing Service startup time {}", AtlasType.toJson(list));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ public static void printHashMapInTableFormatDescendingOrder(Map<String, Long> ma
// Sort the list by values in descending order
list.sort((entry1, entry2) -> entry2.getValue().compareTo(entry1.getValue()));

// Find the longest key to determine column width
int maxKeyLength = list.stream().map(entry -> entry.getKey().length()).max(Integer::compare).orElse(0);

// Create format string for printing each row
String rowFormat = "| %-" + maxKeyLength + "s | %-1s |\n";

// Print table header
LOG.info(System.out.printf(rowFormat, "Key", value).toString());
LOG.info(new String(new char[maxKeyLength + 1]).replace('\0', '-'));

LOG.info("Capturing Bean creation time {}", AtlasType.toJson(list));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
Expand All @@ -66,9 +67,9 @@ public AtlasPatchRegistry(AtlasGraph graph) {

LOG.info("AtlasPatchRegistry: found {} patches", patchNameStatusMap.size());

for (Map.Entry<String, PatchStatus> entry : patchNameStatusMap.entrySet()) {
LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue());
}
// for (Map.Entry<String, PatchStatus> entry : patchNameStatusMap.entrySet()) {
// LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue());
// }
}

public boolean isApplicable(String incomingId, String patchFile, int index) {
Expand Down Expand Up @@ -146,13 +147,12 @@ private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, String
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
} finally {
graph.commit();

patchNameStatusMap.put(patchId, patchStatus);
}
}

private static Map<String, PatchStatus> getPatchNameStatusForAllRegistered(AtlasGraph graph) {
Map<String, PatchStatus> ret = new HashMap<>();
Map<String, PatchStatus> ret = new ConcurrentHashMap<>();
AtlasPatches patches = getAllPatches(graph);

for (AtlasPatch patch : patches.getPatches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizerFactory;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.featureflag.FeatureFlagStore;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.TypeCategory;
Expand Down Expand Up @@ -75,6 +74,8 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down Expand Up @@ -147,7 +148,7 @@ private void loadBootstrapTypeDefs() {
AtlasPatchRegistry patchRegistry = new AtlasPatchRegistry(graph);

if (modelsDirContents != null && modelsDirContents.length > 0) {
Arrays.sort(modelsDirContents);
Arrays.sort(modelsDirContents);

for (File folder : modelsDirContents) {
if (folder.isFile()) {
Expand Down Expand Up @@ -178,40 +179,46 @@ private void loadModelsInFolder(File typesDir, AtlasPatchRegistry patchRegistry)
File[] typeDefFiles = typesDir.exists() ? typesDir.listFiles() : null;

if (typeDefFiles == null || typeDefFiles.length == 0) {
LOG.info("Types directory {} does not exist or not readable or has no typedef files", typesDirName );
LOG.info("Types directory {} does not exist or not readable or has no typedef files", typesDirName);
} else {
// sort the files by filename
Arrays.sort(typeDefFiles);

List<CompletableFuture<Void>> futures = new ArrayList<>();

for (File typeDefFile : typeDefFiles) {
if (typeDefFile.isFile()) {
if (!typeDefFile.isFile()) {
continue;
}

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
String jsonStr = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8);
String jsonStr = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8);
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class);

if (typesDef == null || typesDef.isEmpty()) {
LOG.info("No type in file {}", typeDefFile.getAbsolutePath());

continue;
return;
}

AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, typeRegistry);
AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, typeRegistry, true);

if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) {
typeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate);

LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath());
} else {
LOG.info("No new type in file {}", typeDefFile.getAbsolutePath());
}

} catch (Throwable t) {
LOG.error("error while registering types in file {}", typeDefFile.getAbsolutePath(), t);
}
}
}
});

futures.add(future);
}
// Wait for all futures to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
applyTypePatches(typesDir.getPath(), patchRegistry);
}
LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir);
Expand Down Expand Up @@ -439,8 +446,8 @@ private static boolean isTypeUpdateApplicable(AtlasBaseTypeDef oldTypeDef, Atlas

private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegistry) {
String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
File typePatchesDir = new File(typePatchesDirName);
File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
File typePatchesDir = new File(typePatchesDirName);
File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;

if (typePatchFiles == null || typePatchFiles.length == 0) {
LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName);
Expand All @@ -450,7 +457,7 @@ private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegis
// sort the files by filename
Arrays.sort(typePatchFiles);

PatchHandler[] patchHandlers = new PatchHandler[] {
PatchHandler[] patchHandlers = new PatchHandler[]{
new UpdateEnumDefPatchHandler(typeDefStore, typeRegistry),
new AddAttributePatchHandler(typeDefStore, typeRegistry),
new UpdateAttributePatchHandler(typeDefStore, typeRegistry),
Expand All @@ -462,62 +469,76 @@ private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegis
new AddMandatoryAttributePatchHandler(typeDefStore, typeRegistry)
};

Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
Map<String, PatchHandler> patchHandlerRegistry = new ConcurrentHashMap<>();

for (PatchHandler patchHandler : patchHandlers) {
for (String supportedAction : patchHandler.getSupportedActions()) {
patchHandlerRegistry.put(supportedAction, patchHandler);
}
}

List<CompletableFuture<Void>> futures = new ArrayList<>();

for (File typePatchFile : typePatchFiles) {
if (typePatchFile.isFile()) {
String patchFile = typePatchFile.getAbsolutePath();
if (!typePatchFile.isFile()) {
continue;
}

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
String patchFile = typePatchFile.getAbsolutePath();
LOG.info("Applying patches in file {}", patchFile);

try {
String jsonStr = new String(Files.readAllBytes(typePatchFile.toPath()), StandardCharsets.UTF_8);
String jsonStr = new String(Files.readAllBytes(typePatchFile.toPath()), StandardCharsets.UTF_8);
TypeDefPatches patches = AtlasType.fromJson(jsonStr, TypeDefPatches.class);

if (patches == null || CollectionUtils.isEmpty(patches.getPatches())) {
if (patches == null || patches.getPatches().isEmpty()) {
LOG.info("No patches in file {}", patchFile);

continue;
return;
}

int patchIndex = 0;
for (TypeDefPatch patch : patches.getPatches()) {
PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction());
applyPatches(patchFile, patches, patchRegistry, patchHandlerRegistry);
} catch (Throwable t) {
LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t);
}
});

if (patchHandler == null) {
LOG.error("Unknown patch action {} in file {}. Ignored", patch.getAction(), patchFile);
continue;
}
futures.add(future);
}

if (patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) {
PatchStatus status;
// Wait for all futures to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
}

try {
status = patchHandler.applyPatch(patch);
} catch (AtlasBaseException ex) {
status = FAILED;
private void applyPatches(String patchFile, TypeDefPatches patches, AtlasPatchRegistry patchRegistry, Map<String, PatchHandler> patchHandlerRegistry) {
int patchIndex = 0;
for (TypeDefPatch patch : patches.getPatches()) {
PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction());

LOG.error("Failed to apply {} (status: {}; action: {}) in file: {}. Ignored.",
patch.getId(), status.toString(), patch.getAction(), patchFile);
}
if (patchHandler == null) {
LOG.error("Unknown patch action {} in file {}. Ignored", patch.getAction(), patchFile);
continue;
}

patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status);
LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
} else {
LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
}
}
} catch (Throwable t) {
LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t);
}
}
if (!patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) {
LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
continue;
}

PatchStatus status = applyPatch(patchHandler, patch, patchFile);
patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status);
LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
}
}

private PatchStatus applyPatch(PatchHandler patchHandler, TypeDefPatch patch, String patchFile) {
try {
return patchHandler.applyPatch(patch);
} catch (AtlasBaseException ex) {
LOG.error("Failed to apply {} (status: FAILED; action: {}) in file: {}. Ignored.",
patch.getId(), patch.getAction(), patchFile);
return FAILED;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Arrays;
import java.util.*;

import static org.apache.atlas.model.discovery.SearchParameters.ALL_ENTITY_TYPES;
import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATION_TYPES;
Expand Down Expand Up @@ -878,42 +873,31 @@ private void rectifyTypeErrorsIfAny(AtlasTypesDef typesDef) {
}

private <T extends AtlasBaseTypeDef> void removeDuplicateTypeIfAny(List<T> defList) {
if (defList == null || defList.isEmpty()) {
return;
}

final Set<String> entityDefNames = new HashSet<>();
Iterator<T> iterator = defList.iterator();

for (int i = 0; i < defList.size(); i++) {
if (!entityDefNames.add((defList.get(i)).getName())) {
LOG.warn(" Found Duplicate Type => " + defList.get(i).getName());
defList.remove(i);
i--;
while (iterator.hasNext()) {
T def = iterator.next();
if (!entityDefNames.add(def.getName())) {
LOG.warn("Found Duplicate Type => " + def.getName());
iterator.remove();
}
}
}


private void rectifyAttributesIfNeeded(final Set<String> entityNames, AtlasStructDef structDef) {
List<AtlasAttributeDef> attributeDefs = structDef.getAttributeDefs();

if (CollectionUtils.isNotEmpty(attributeDefs)) {
for (AtlasAttributeDef attributeDef : attributeDefs) {
if (!hasOwnedReferenceConstraint(attributeDef.getConstraints())) {
continue;
}

Set<String> referencedTypeNames = AtlasTypeUtil.getReferencedTypeNames(attributeDef.getTypeName());

boolean valid = false;

for (String referencedTypeName : referencedTypeNames) {
if (entityNames.contains(referencedTypeName)) {
valid = true;
break;
}
}

if (!valid) {
rectifyOwnedReferenceError(structDef, attributeDef);
}
}
if (attributeDefs != null) {
attributeDefs.stream()
.filter(attributeDef -> hasOwnedReferenceConstraint(attributeDef.getConstraints()))
.filter(attributeDef -> AtlasTypeUtil.getReferencedTypeNames(attributeDef.getTypeName()).stream()
.noneMatch(entityNames::contains))
.forEach(attributeDef -> rectifyOwnedReferenceError(structDef, attributeDef));
}
}

Expand All @@ -932,16 +916,18 @@ private boolean hasOwnedReferenceConstraint(List<AtlasConstraintDef> constraints
private void rectifyOwnedReferenceError(AtlasStructDef structDef, AtlasAttributeDef attributeDef) {
List<AtlasConstraintDef> constraints = attributeDef.getConstraints();

if (CollectionUtils.isNotEmpty(constraints)) {
for (int i = 0; i < constraints.size(); i++) {
AtlasConstraintDef constraint = constraints.get(i);
if (constraints == null || constraints.isEmpty()) {
return;
}

if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) {
LOG.warn("Invalid constraint ownedRef for attribute {}.{}", structDef.getName(), attributeDef.getName());
Iterator<AtlasConstraintDef> iterator = constraints.iterator();

constraints.remove(i);
i--;
}
while (iterator.hasNext()) {
AtlasConstraintDef constraint = iterator.next();

if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) {
LOG.warn("Invalid constraint ownedRef for attribute {}.{}", structDef.getName(), attributeDef.getName());
iterator.remove();
}
}
}
Expand Down
Loading