Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve atlas startup time #2813

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.NAME;
Expand Down Expand Up @@ -148,7 +151,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
servicePolicies.setPolicyUpdateTime(new Date());

if (service != null) {
List<RangerPolicy> allPolicies = getServicePolicies(service);
List<RangerPolicy> allPolicies = getServicePolicies(service, 250);
servicePolicies.setServiceName(serviceName);
servicePolicies.setServiceId(service.getGuid());

Expand All @@ -162,7 +165,7 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
AtlasEntityHeader tagService = getServiceEntity(tagServiceName);

if (tagService != null) {
allPolicies.addAll(getServicePolicies(tagService));
allPolicies.addAll(getServicePolicies(tagService, 0));

TagPolicies tagPolicies = new TagPolicies();

Expand Down Expand Up @@ -200,13 +203,13 @@ public ServicePolicies getPolicies(String serviceName, String pluginId, Long las
return servicePolicies;
}

private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service) throws AtlasBaseException, IOException {
private List<RangerPolicy> getServicePolicies(AtlasEntityHeader service, int batchSize) throws AtlasBaseException, IOException {

List<RangerPolicy> servicePolicies = new ArrayList<>();

String serviceName = (String) service.getAttribute("name");
String serviceType = (String) service.getAttribute("authServiceType");
List<AtlasEntityHeader> atlasPolicies = getAtlasPolicies(serviceName);
List<AtlasEntityHeader> atlasPolicies = getAtlasPolicies(serviceName, batchSize);

if (CollectionUtils.isNotEmpty(atlasPolicies)) {
//transform policies
Expand Down Expand Up @@ -417,9 +420,8 @@ private List<RangerValiditySchedule> getPolicyValiditySchedule(AtlasEntityHeader
return ret;
}

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl."+service+".getAtlasPolicies");

private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl." + serviceName + ".getAtlasPolicies");
List<AtlasEntityHeader> ret = new ArrayList<>();
try {
IndexSearchParams indexSearchParams = new IndexSearchParams();
Expand Down Expand Up @@ -462,6 +464,10 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName) throws Atla
int size = 100;
boolean found = true;

if (batchSize > 0) {
size = batchSize;
}

do {
dsl.put("from", from);
dsl.put("size", size);
Expand All @@ -485,6 +491,93 @@ private List<AtlasEntityHeader> getAtlasPolicies(String serviceName) throws Atla
return ret;
}

// private List<AtlasEntityHeader> getAtlasPolicies(String serviceName, int batchSize) throws AtlasBaseException {
// AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("CachePolicyTransformerImpl." + serviceName + ".getAtlasPolicies");
//
// AtomicReference<CompletableFuture<List<AtlasEntityHeader>>> futureResultRef = new AtomicReference<>(CompletableFuture.completedFuture(new ArrayList<>()));
// try {
// IndexSearchParams indexSearchParams = new IndexSearchParams();
// Set<String> attributes = new HashSet<>();
// attributes.add(NAME);
// attributes.add(ATTR_POLICY_CATEGORY);
// attributes.add(ATTR_POLICY_SUB_CATEGORY);
// attributes.add(ATTR_POLICY_TYPE);
// attributes.add(ATTR_POLICY_SERVICE_NAME);
// attributes.add(ATTR_POLICY_USERS);
// attributes.add(ATTR_POLICY_GROUPS);
// attributes.add(ATTR_POLICY_ROLES);
// attributes.add(ATTR_POLICY_ACTIONS);
// attributes.add(ATTR_POLICY_RESOURCES);
// attributes.add(ATTR_POLICY_RESOURCES_CATEGORY);
// attributes.add(ATTR_POLICY_MASK_TYPE);
// attributes.add(ATTR_POLICY_PRIORITY);
// attributes.add(ATTR_POLICY_VALIDITY);
// attributes.add(ATTR_POLICY_CONDITIONS);
// attributes.add(ATTR_POLICY_IS_ENABLED);
// attributes.add(ATTR_POLICY_CONNECTION_QN);
//
// Map<String, Object> dsl = getMap("size", 0);
//
// List<Map<String, Object>> mustClauseList = new ArrayList<>();
// mustClauseList.add(getMap("term", getMap(ATTR_POLICY_SERVICE_NAME, serviceName)));
// mustClauseList.add(getMap("match", getMap("__state", Id.EntityState.ACTIVE)));
//
// dsl.put("query", getMap("bool", getMap("must", mustClauseList)));
//
// List<Map> sortList = new ArrayList<>(0);
// sortList.add(getMap("__timestamp", getMap("order", "asc")));
// sortList.add(getMap("__guid", getMap("order", "asc")));
// dsl.put("sort", sortList);
//
// indexSearchParams.setDsl(dsl);
// indexSearchParams.setAttributes(attributes);
//
// final int size = batchSize > 0 ? batchSize : 100;
//
// CompletableFuture<Void> loopFuture = CompletableFuture.completedFuture(null);
//
// loopFuture = loopFuture.thenCompose(v -> CompletableFuture.runAsync(() -> {
// boolean found = true;
// int currentFrom = 0;
// while (found) {
// final int finalFrom = currentFrom; // Capture the current 'from' value
// Map<String, Object> currentDsl = new HashMap<>(dsl); // Clone the dsl for the current iteration
// currentDsl.put("from", finalFrom);
// currentDsl.put("size", size);
// indexSearchParams.setDsl(currentDsl);
//
// CompletableFuture<List<AtlasEntityHeader>> headersFuture = CompletableFuture.supplyAsync(() -> {
// try {
// return discoveryService.directIndexSearch(indexSearchParams).getEntities();
// } catch (AtlasBaseException e) {
// throw new RuntimeException(e);
// }
// });
//
// futureResultRef.set(futureResultRef.get().thenCombine(headersFuture, (ret, headers) -> {
// if (headers != null) {
// ret.addAll(headers);
// }
// return ret;
// }));
//
// currentFrom += size;
// found = !headersFuture.join().isEmpty() && futureResultRef.get().join().size() % size == 0;
// }
// }));
//
// loopFuture.join(); // Wait for the loop to complete
// } finally {
// RequestContext.get().endMetricRecord(recorder);
// }
//
// try {
// return futureResultRef.get().get(); // Wait for the result to be ready and return it
// } catch (InterruptedException | ExecutionException e) {
// throw new AtlasBaseException(e);
// }
// }

private AtlasEntityHeader getServiceEntity(String serviceName) throws AtlasBaseException {
IndexSearchParams indexSearchParams = new IndexSearchParams();
Set<String> attributes = new HashSet<>();
Expand Down
37 changes: 36 additions & 1 deletion common/src/main/java/org/apache/atlas/service/Services.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@
package org.apache.atlas.service;

import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
import static org.apache.atlas.AtlasConstants.ATLAS_SERVICES_ENABLED;
Expand All @@ -48,29 +57,45 @@ public class Services {
private final String migrationDirName;
private final boolean migrationEnabled;

private Map<String,Long> durationMap;
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("Services");

@Inject
public Services(List<Service> services, Configuration configuration) {
this.services = services;
this.dataMigrationClassName = configuration.getString("atlas.migration.class.name", DATA_MIGRATION_CLASS_NAME_DEFAULT);
this.servicesEnabled = configuration.getBoolean(ATLAS_SERVICES_ENABLED, true);
this.migrationDirName = configuration.getString(ATLAS_MIGRATION_MODE_FILENAME);
this.migrationEnabled = StringUtils.isNotEmpty(migrationDirName);
durationMap = new HashMap<>();
}

@PostConstruct
public void start() {
AtlasPerfTracer perf = null;
try {

for (Service svc : services) {
if (!isServiceUsed(svc)) {
continue;
}

if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "Service.start(" + svc.getClass().getName() + ")");
}
Instant start = Instant.now();
LOG.info("Starting service {}", svc.getClass().getName());

svc.start();
Instant end = Instant.now();
durationMap.putIfAbsent(svc.getClass().getName(),Duration.between(start, end).toMillis());
}

printHashMapInTableFormatDescendingOrder(durationMap, "startupTime");

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
AtlasPerfTracer.log(perf);
}
}

Expand Down Expand Up @@ -115,4 +140,14 @@ private boolean isNeededForZipFileMigration(Service svc) {
private boolean isDataMigrationService(Service svc) {
return svc.getClass().getSimpleName().equals(dataMigrationClassName);
}

public static void printHashMapInTableFormatDescendingOrder(Map<String, Long> map, String value) {
// Convert map to a list of entries
List<Map.Entry<String, Long>> list = new ArrayList<>(map.entrySet());

// Sort the list by values in descending order
list.sort((entry1, entry2) -> entry2.getValue().compareTo(entry1.getValue()));

LOG.info("Capturing Service startup time {}", AtlasType.toJson(list));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ public class GraphTransactionAdvisor extends AbstractPointcutAdvisor {
private final StaticMethodMatcherPointcut pointcut = new StaticMethodMatcherPointcut() {
@Override
public boolean matches(Method method, Class<?> targetClass) {
boolean annotationPresent = method.isAnnotationPresent(GraphTransaction.class);
if (annotationPresent) {
LOG.info("GraphTransaction intercept for {}.{}", targetClass.getName(), method.getName());
}
return annotationPresent;
return method.isAnnotationPresent(GraphTransaction.class);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.atlas.repository.audit;

import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.logging.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Component
public class StartupTimeLogger implements ApplicationListener<ContextRefreshedEvent> {
private final StartupTimeLoggerBeanPostProcessor beanPostProcessor;

private static final Logger LOG = LoggerFactory.getLogger(StartupTimeLogger.class);

public StartupTimeLogger(StartupTimeLoggerBeanPostProcessor beanPostProcessor) {
this.beanPostProcessor = beanPostProcessor;
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// Print the startup times after all beans are loaded
printHashMapInTableFormatDescendingOrder(beanPostProcessor.getDurationTimeMap(), "creationTime");
}

public static void printHashMapInTableFormatDescendingOrder(Map<String, Long> map, String value) {
// Convert map to a list of entries
List<Map.Entry<String, Long>> list = new ArrayList<>(map.entrySet());

// Sort the list by values in descending order
list.sort((entry1, entry2) -> entry2.getValue().compareTo(entry1.getValue()));

LOG.info("Capturing Bean creation time {}", AtlasType.toJson(list));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.atlas.repository.audit;

import org.apache.atlas.utils.AtlasPerfTracer;
import org.slf4j.Logger;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class StartupTimeLoggerBeanPostProcessor implements BeanPostProcessor {
private final Map<String, Long> startTimeMap = new HashMap<>();

public Map<String, Long> getDurationTimeMap() {
return durationTimeMap;
}

private final Map<String, Long> durationTimeMap = new HashMap<>();

private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("Beans");

private AtlasPerfTracer perf = null;

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
// Record the start time
startTimeMap.put(bean.getClass().getName(), System.currentTimeMillis());
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "Beans.create(" + beanName + ")");
}
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
AtlasPerfTracer.log(perf);
// Calculate and log the startup time
long startTime = startTimeMap.getOrDefault(bean.getClass().getName(), -1L);
long endTime = System.currentTimeMillis();
if (startTime != -1L) {
durationTimeMap.put(bean.getClass().getName(), endTime-startTime);
}
return bean;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,9 @@ private void resolveIndexFieldName(AtlasGraphManagement managementSystem, AtlasA
}

typeRegistry.addIndexFieldName(attribute.getVertexPropertyName(), indexFieldName);

LOG.info("Property {} is mapped to index field name {}", attribute.getQualifiedName(), attribute.getIndexFieldName());
if (LOG.isDebugEnabled()){
LOG.debug("Property {} is mapped to index field name {}", attribute.getQualifiedName(), attribute.getIndexFieldName());
}
} else {
LOG.warn("resolveIndexFieldName(attribute={}): propertyKey is null for vertextPropertyName={}", attribute.getQualifiedName(), attribute.getVertexPropertyName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
Expand All @@ -66,9 +67,9 @@ public AtlasPatchRegistry(AtlasGraph graph) {

LOG.info("AtlasPatchRegistry: found {} patches", patchNameStatusMap.size());

for (Map.Entry<String, PatchStatus> entry : patchNameStatusMap.entrySet()) {
LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue());
}
// for (Map.Entry<String, PatchStatus> entry : patchNameStatusMap.entrySet()) {
// LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue());
// }
}

public boolean isApplicable(String incomingId, String patchFile, int index) {
Expand Down Expand Up @@ -146,13 +147,12 @@ private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, String
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
} finally {
graph.commit();

patchNameStatusMap.put(patchId, patchStatus);
}
}

private static Map<String, PatchStatus> getPatchNameStatusForAllRegistered(AtlasGraph graph) {
Map<String, PatchStatus> ret = new HashMap<>();
Map<String, PatchStatus> ret = new ConcurrentHashMap<>();
AtlasPatches patches = getAllPatches(graph);

for (AtlasPatch patch : patches.getPatches()) {
Expand Down
Loading
Loading