diff --git a/common/src/main/java/org/apache/atlas/service/Services.java b/common/src/main/java/org/apache/atlas/service/Services.java index d1c5d37ccc..1f1a18e0b5 100644 --- a/common/src/main/java/org/apache/atlas/service/Services.java +++ b/common/src/main/java/org/apache/atlas/service/Services.java @@ -148,16 +148,6 @@ public static void printHashMapInTableFormatDescendingOrder(Map ma // Sort the list by values in descending order list.sort((entry1, entry2) -> entry2.getValue().compareTo(entry1.getValue())); - // Find the longest key to determine column width - int maxKeyLength = list.stream().map(entry -> entry.getKey().length()).max(Integer::compare).orElse(0); - - // Create format string for printing each row - String rowFormat = "| %-" + maxKeyLength + "s | %-10s |\n"; - - // Print table header - LOG.info(System.out.printf(rowFormat, "Key", value).toString()); - LOG.info(new String(new char[maxKeyLength + 15]).replace('\0', '-')); - LOG.info("Capturing Service startup time {}", AtlasType.toJson(list)); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/StartupTimeLogger.java b/repository/src/main/java/org/apache/atlas/repository/audit/StartupTimeLogger.java index ebe0a0b50f..dc816d3ed0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/StartupTimeLogger.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/StartupTimeLogger.java @@ -36,16 +36,6 @@ public static void printHashMapInTableFormatDescendingOrder(Map ma // Sort the list by values in descending order list.sort((entry1, entry2) -> entry2.getValue().compareTo(entry1.getValue())); - // Find the longest key to determine column width - int maxKeyLength = list.stream().map(entry -> entry.getKey().length()).max(Integer::compare).orElse(0); - - // Create format string for printing each row - String rowFormat = "| %-" + maxKeyLength + "s | %-1s |\n"; - - // Print table header - LOG.info(System.out.printf(rowFormat, "Key", value).toString()); - LOG.info(new String(new char[maxKeyLength + 1]).replace('\0', '-')); - LOG.info("Capturing Bean creation time {}", AtlasType.toJson(list)); } } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java index d9ae5800e4..1a90968b83 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java @@ -41,6 +41,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; @@ -66,9 +67,9 @@ public AtlasPatchRegistry(AtlasGraph graph) { LOG.info("AtlasPatchRegistry: found {} patches", patchNameStatusMap.size()); - for (Map.Entry entry : patchNameStatusMap.entrySet()) { - LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue()); - } +// for (Map.Entry entry : patchNameStatusMap.entrySet()) { +// LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue()); +// } } public boolean isApplicable(String incomingId, String patchFile, int index) { @@ -146,13 +147,12 @@ private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, String setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser()); } finally { graph.commit(); - patchNameStatusMap.put(patchId, patchStatus); } } private static Map getPatchNameStatusForAllRegistered(AtlasGraph graph) { - Map ret = new HashMap<>(); + Map ret = new ConcurrentHashMap<>(); AtlasPatches patches = getAllPatches(graph); for (AtlasPatch patch : patches.getPatches()) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 2a867452b6..f15156e152 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -26,7 +26,6 @@ import org.apache.atlas.RequestContext; import org.apache.atlas.authorize.AtlasAuthorizerFactory; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.featureflag.FeatureFlagStore; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.TypeCategory; @@ -75,6 +74,8 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; @@ -147,7 +148,7 @@ private void loadBootstrapTypeDefs() { AtlasPatchRegistry patchRegistry = new AtlasPatchRegistry(graph); if (modelsDirContents != null && modelsDirContents.length > 0) { - Arrays.sort(modelsDirContents); + Arrays.sort(modelsDirContents); for (File folder : modelsDirContents) { if (folder.isFile()) { @@ -178,21 +179,26 @@ private void loadModelsInFolder(File typesDir, AtlasPatchRegistry patchRegistry) File[] typeDefFiles = typesDir.exists() ? typesDir.listFiles() : null; if (typeDefFiles == null || typeDefFiles.length == 0) { - LOG.info("Types directory {} does not exist or not readable or has no typedef files", typesDirName ); + LOG.info("Types directory {} does not exist or not readable or has no typedef files", typesDirName); } else { // sort the files by filename Arrays.sort(typeDefFiles); + List> futures = new ArrayList<>(); + for (File typeDefFile : typeDefFiles) { - if (typeDefFile.isFile()) { + if (!typeDefFile.isFile()) { + continue; + } + + CompletableFuture future = CompletableFuture.runAsync(() -> { try { - String jsonStr = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8); + String jsonStr = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8); AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class); if (typesDef == null || typesDef.isEmpty()) { LOG.info("No type in file {}", typeDefFile.getAbsolutePath()); - - continue; + return; } AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, typeRegistry); @@ -200,18 +206,19 @@ private void loadModelsInFolder(File typesDir, AtlasPatchRegistry patchRegistry) if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) { typeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate); - LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath()); } else { LOG.info("No new type in file {}", typeDefFile.getAbsolutePath()); } - } catch (Throwable t) { LOG.error("error while registering types in file {}", typeDefFile.getAbsolutePath(), t); } - } - } + }); + futures.add(future); + } + // Wait for all futures to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); applyTypePatches(typesDir.getPath(), patchRegistry); } LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir); @@ -439,8 +446,8 @@ private static boolean isTypeUpdateApplicable(AtlasBaseTypeDef oldTypeDef, Atlas private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegistry) { String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME; - File typePatchesDir = new File(typePatchesDirName); - File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null; + File typePatchesDir = new File(typePatchesDirName); + File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null; if (typePatchFiles == null || typePatchFiles.length == 0) { LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName); @@ -450,7 +457,7 @@ private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegis // sort the files by filename Arrays.sort(typePatchFiles); - PatchHandler[] patchHandlers = new PatchHandler[] { + PatchHandler[] patchHandlers = new PatchHandler[]{ new UpdateEnumDefPatchHandler(typeDefStore, typeRegistry), new AddAttributePatchHandler(typeDefStore, typeRegistry), new UpdateAttributePatchHandler(typeDefStore, typeRegistry), @@ -462,7 +469,7 @@ private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegis new AddMandatoryAttributePatchHandler(typeDefStore, typeRegistry) }; - Map patchHandlerRegistry = new HashMap<>(); + Map patchHandlerRegistry = new ConcurrentHashMap<>(); for (PatchHandler patchHandler : patchHandlers) { for (String supportedAction : patchHandler.getSupportedActions()) { @@ -470,54 +477,68 @@ private void applyTypePatches(String typesDirName, AtlasPatchRegistry patchRegis } } + List> futures = new ArrayList<>(); + for (File typePatchFile : typePatchFiles) { - if (typePatchFile.isFile()) { - String patchFile = typePatchFile.getAbsolutePath(); + if (!typePatchFile.isFile()) { + continue; + } + CompletableFuture future = CompletableFuture.runAsync(() -> { + String patchFile = typePatchFile.getAbsolutePath(); LOG.info("Applying patches in file {}", patchFile); try { - String jsonStr = new String(Files.readAllBytes(typePatchFile.toPath()), StandardCharsets.UTF_8); + String jsonStr = new String(Files.readAllBytes(typePatchFile.toPath()), StandardCharsets.UTF_8); TypeDefPatches patches = AtlasType.fromJson(jsonStr, TypeDefPatches.class); - if (patches == null || CollectionUtils.isEmpty(patches.getPatches())) { + if (patches == null || patches.getPatches().isEmpty()) { LOG.info("No patches in file {}", patchFile); - - continue; + return; } - int patchIndex = 0; - for (TypeDefPatch patch : patches.getPatches()) { - PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction()); + applyPatches(patchFile, patches, patchRegistry, patchHandlerRegistry); + } catch (Throwable t) { + LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t); + } + }); - if (patchHandler == null) { - LOG.error("Unknown patch action {} in file {}. Ignored", patch.getAction(), patchFile); - continue; - } + futures.add(future); + } - if (patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) { - PatchStatus status; + // Wait for all futures to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + } - try { - status = patchHandler.applyPatch(patch); - } catch (AtlasBaseException ex) { - status = FAILED; + private void applyPatches(String patchFile, TypeDefPatches patches, AtlasPatchRegistry patchRegistry, Map patchHandlerRegistry) { + int patchIndex = 0; + for (TypeDefPatch patch : patches.getPatches()) { + PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction()); - LOG.error("Failed to apply {} (status: {}; action: {}) in file: {}. Ignored.", - patch.getId(), status.toString(), patch.getAction(), patchFile); - } + if (patchHandler == null) { + LOG.error("Unknown patch action {} in file {}. Ignored", patch.getAction(), patchFile); + continue; + } - patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status); - LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile); - } else { - LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString()); - } - } - } catch (Throwable t) { - LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t); - } - } + if (!patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) { + LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString()); + continue; } + + PatchStatus status = applyPatch(patchHandler, patch, patchFile); + patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status); + LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile); + } + } + + private PatchStatus applyPatch(PatchHandler patchHandler, TypeDefPatch patch, String patchFile) { + try { + return patchHandler.applyPatch(patch); + } catch (AtlasBaseException ex) { + LOG.error("Failed to apply {} (status: FAILED; action: {}) in file: {}. Ignored.", + patch.getId(), patch.getAction(), patchFile); + return FAILED; } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index a4f818980f..f359aab553 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -43,12 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.Arrays; +import java.util.*; import static org.apache.atlas.model.discovery.SearchParameters.ALL_ENTITY_TYPES; import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATION_TYPES; @@ -878,42 +873,31 @@ private void rectifyTypeErrorsIfAny(AtlasTypesDef typesDef) { } private void removeDuplicateTypeIfAny(List defList) { + if (defList == null || defList.isEmpty()) { + return; + } + final Set entityDefNames = new HashSet<>(); + Iterator iterator = defList.iterator(); - for (int i = 0; i < defList.size(); i++) { - if (!entityDefNames.add((defList.get(i)).getName())) { - LOG.warn(" Found Duplicate Type => " + defList.get(i).getName()); - defList.remove(i); - i--; + while (iterator.hasNext()) { + T def = iterator.next(); + if (!entityDefNames.add(def.getName())) { + LOG.warn("Found Duplicate Type => " + def.getName()); + iterator.remove(); } } } - private void rectifyAttributesIfNeeded(final Set entityNames, AtlasStructDef structDef) { List attributeDefs = structDef.getAttributeDefs(); - if (CollectionUtils.isNotEmpty(attributeDefs)) { - for (AtlasAttributeDef attributeDef : attributeDefs) { - if (!hasOwnedReferenceConstraint(attributeDef.getConstraints())) { - continue; - } - - Set referencedTypeNames = AtlasTypeUtil.getReferencedTypeNames(attributeDef.getTypeName()); - - boolean valid = false; - - for (String referencedTypeName : referencedTypeNames) { - if (entityNames.contains(referencedTypeName)) { - valid = true; - break; - } - } - - if (!valid) { - rectifyOwnedReferenceError(structDef, attributeDef); - } - } + if (attributeDefs != null) { + attributeDefs.stream() + .filter(attributeDef -> hasOwnedReferenceConstraint(attributeDef.getConstraints())) + .filter(attributeDef -> AtlasTypeUtil.getReferencedTypeNames(attributeDef.getTypeName()).stream() + .noneMatch(entityNames::contains)) + .forEach(attributeDef -> rectifyOwnedReferenceError(structDef, attributeDef)); } } @@ -932,16 +916,18 @@ private boolean hasOwnedReferenceConstraint(List constraints private void rectifyOwnedReferenceError(AtlasStructDef structDef, AtlasAttributeDef attributeDef) { List constraints = attributeDef.getConstraints(); - if (CollectionUtils.isNotEmpty(constraints)) { - for (int i = 0; i < constraints.size(); i++) { - AtlasConstraintDef constraint = constraints.get(i); + if (constraints == null || constraints.isEmpty()) { + return; + } - if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) { - LOG.warn("Invalid constraint ownedRef for attribute {}.{}", structDef.getName(), attributeDef.getName()); + Iterator iterator = constraints.iterator(); - constraints.remove(i); - i--; - } + while (iterator.hasNext()) { + AtlasConstraintDef constraint = iterator.next(); + + if (constraint.isConstraintType(AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF)) { + LOG.warn("Invalid constraint ownedRef for attribute {}.{}", structDef.getName(), attributeDef.getName()); + iterator.remove(); } } }