Skip to content

Commit

Permalink
Merge branch 'janusupgrade' into dev/janusupgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshi0301 committed Dec 11, 2024
2 parents 420fe12 + 8eef377 commit 18184d7
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,4 @@ public interface AtlasGraph<V, E> {
*/
AtlasGraphIndexClient getGraphIndexClient()throws AtlasException;


void setEnableCache(boolean enableCache);

Boolean isCacheEnabled();
}
4 changes: 4 additions & 0 deletions graphdb/janus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
</exclusion>
<exclusion>
<groupId>cassandra-hadoop-util</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,14 +684,6 @@ private Edge getFirstActiveEdge(GraphTraversal gt) {
return null;
}

public void setEnableCache(boolean enableCache) {
this.janusGraph.setEnableCache(enableCache);
}

public Boolean isCacheEnabled() {
return this.janusGraph.isCacheEnabled();
}

public JanusGraphTransaction getTransaction() {
return this.janusGraph.newThreadBoundTransaction();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,

public AtlasJanusGraphDatabase() {
//update registry
GraphSONMapper.build().addRegistry(JanusGraphIoRegistry.getInstance()).create();
GraphSONMapper.build().addRegistry(JanusGraphIoRegistry.instance()).create();
}

public static Configuration getConfiguration() throws AtlasException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Map<String, Object> directIndexQuery(String query) throws AtlasBaseExcept

@Override
public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices() {
Iterator<JanusGraphIndexQuery.Result<JanusGraphVertex>> results = query.vertices().iterator();
Iterator<JanusGraphIndexQuery.Result<JanusGraphVertex>> results = query.vertexStream().iterator();

Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>> function =
new Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>>() {
Expand All @@ -77,7 +77,7 @@ public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices(int offset, i
Iterator<JanusGraphIndexQuery.Result<JanusGraphVertex>> results = query
.offset(offset)
.limit(limit)
.vertices().iterator();
.vertexStream().iterator();

Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>> function =
new Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>>() {
Expand All @@ -100,7 +100,7 @@ public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices(int offset, i
.orderBy(sortBy, sortOrder)
.offset(offset)
.limit(limit)
.vertices().iterator();
.vertexStream().iterator();

Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>> function =
new Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.janusgraph.graphdb.query.condition.Not;
import org.janusgraph.graphdb.query.condition.Or;
import org.janusgraph.graphdb.query.condition.PredicateCondition;
import org.janusgraph.graphdb.tinkerpop.optimize.step.Aggregation;
import org.janusgraph.graphdb.types.ParameterType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -672,7 +673,7 @@ public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever info
doc -> doc.getFieldValue(keyIdField).toString());
}

@Override
//@Override
public Long queryCount(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
try {
String collection = query.getStore();
Expand Down Expand Up @@ -1178,6 +1179,16 @@ public boolean exists() throws BackendException {
}
}

@Override
public Number queryAggregation(IndexQuery indexQuery, KeyInformation.IndexRetriever indexRetriever, BaseTransaction baseTransaction, Aggregation aggregation) throws BackendException {
return null;
}

@Override
public void clearStore(String s) throws BackendException {

}

/*
################# UTILITY METHODS #######################
*/
Expand Down
9 changes: 5 additions & 4 deletions intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,16 @@ public enum AtlasConfiguration {
HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"),

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),

/**
* hits elastic search async API
*/
ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false),
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
FETCH_COLLAPSED_RESULT("atlas.indexsearch.fetch.collapsed.result", true),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", true),
/***
* This configuration is used to enable fetching non primitive attributes in index search
*/
ATLAS_INDEXSEARCH_ENABLE_FETCHING_NON_PRIMITIVE_ATTRIBUTES("atlas.indexsearch.enable.fetching.non.primitive.attributes", true),

/**
* enables janus/cassandra optimization
Expand Down
18 changes: 4 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@
<curator.version>4.3.0</curator.version>
<doxia.version>1.8</doxia.version>
<dropwizard-metrics>3.2.2</dropwizard-metrics>
<elasticsearch.version>7.16.2</elasticsearch.version>
<elasticsearch.version>7.17.4</elasticsearch.version>
<entity.repository.impl>org.apache.atlas.repository.audit.InMemoryEntityAuditRepository</entity.repository.impl>
<enunciate-maven-plugin.version>2.13.2</enunciate-maven-plugin.version>
<failsafe.version>2.18.1</failsafe.version>
Expand All @@ -717,7 +717,7 @@
<httpcomponents-httpcore.version>4.4.13</httpcomponents-httpcore.version>
<jackson.databind.version>2.12.4</jackson.databind.version>
<jackson.version>2.12.4</jackson.version>
<janusgraph.version>0.6.03</janusgraph.version>
<janusgraph.version>1.0.0</janusgraph.version>
<janusgraph.cassandra.version>0.5.3</janusgraph.cassandra.version>
<javax-inject.version>1</javax-inject.version>
<javax.servlet.version>3.1.0</javax.servlet.version>
Expand Down Expand Up @@ -773,7 +773,7 @@
<surefire.forkCount>2C</surefire.forkCount>
<surefire.version>3.0.0-M5</surefire.version>
<testng.version>6.9.4</testng.version>
<tinkerpop.version>3.5.1</tinkerpop.version>
<tinkerpop.version>3.7.0</tinkerpop.version>
<woodstox-core.version>5.0.3</woodstox-core.version>
<zookeeper.version>3.5.5</zookeeper.version>
<redis.client.version>3.20.1</redis.client.version>
Expand Down Expand Up @@ -836,16 +836,6 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>github</id>
<url>https://maven.pkg.github.com/atlanhq/atlan-janusgraph</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>hortonworks.repo</id>
<url>https://repo.hortonworks.com/content/repositories/releases</url>
Expand Down Expand Up @@ -1806,7 +1796,7 @@
<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>1.4</version>
<version>1.11.0</version>
</plugin>

<plugin>
Expand Down
14 changes: 13 additions & 1 deletion repository/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@
<artifactId>atlas-graphdb-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-util</artifactId>
<version>${tinkerpop.version}</version>
</dependency>

<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>4.0.1</version>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down Expand Up @@ -111,7 +123,7 @@
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
<artifactId>netty-handler</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl
return null;
}
RequestContext.get().endMetricRecord(elasticSearchQueryMetric);
prepareSearchResult(ret, indexQueryResult, resultAttributes, AtlasConfiguration.FETCH_COLLAPSED_RESULT.getBoolean());
prepareSearchResult(ret, indexQueryResult, resultAttributes, true);

ret.setAggregations(indexQueryResult.getAggregationMap());
ret.setApproximateCount(indexQuery.vertexTotals());
Expand Down Expand Up @@ -1071,7 +1071,7 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}
}

private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean collapseResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
if(LOG.isDebugEnabled()){
Expand Down Expand Up @@ -1099,7 +1099,7 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
if (collapseResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

Set<String> collapseKeys = result.getCollapseKeys();
Expand All @@ -1121,7 +1121,7 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu

DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResultSync(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
Expand All @@ -1145,8 +1145,33 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Map<String, AtlasSearchResult> collapse = new HashMap<>();
Set<String> collapseKeys = result.getCollapseKeys();

for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(searchParams);
Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes));
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());

// Directly iterate over collapse vertices
Iterator<Result> iterator = indexQueryCollapsedResult.getIterator();
while (iterator != null && iterator.hasNext()) {
Result collapseResult = iterator.next();
AtlasVertex collapseVertex = collapseResult.getVertex();
if (collapseVertex == null) continue;

AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes);
collapseRet.addEntity(collapseHeader);
}

collapse.put(collapseKey, collapseRet);
}

return collapse;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,10 @@ public static Boolean isEntityIncomplete(AtlasElement element) {
return ret;
}

public static Boolean isEntityIncomplete(Integer value) {
return value != null && value.equals(INCOMPLETE_ENTITY_VALUE) ? Boolean.TRUE : Boolean.FALSE;
}

public static Boolean getEntityHasLineage(AtlasElement element) {
if (element.getPropertyKeys().contains(HAS_LINEAGE)) {
return element.getProperty(HAS_LINEAGE, Boolean.class);
Expand Down Expand Up @@ -1107,10 +1111,24 @@ public static Map getCustomAttributes(AtlasElement element) {
return ret;
}

public static Map getCustomAttributes(String customAttrsString) {
Map ret = null;

if (customAttrsString != null) {
ret = AtlasType.fromJson(customAttrsString, Map.class);
}

return ret;
}

public static Set<String> getLabels(AtlasElement element) {
return parseLabelsString(element.getProperty(LABELS_PROPERTY_KEY, String.class));
}

public static Set<String> getLabels(String labels) {
return parseLabelsString(labels);
}

public static Integer getProvenanceType(AtlasElement element) {
return element.getProperty(Constants.PROVENANCE_TYPE_KEY, Integer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ private void startMonitoring() {
try {
startTime = recoveryInfoManagement.getStartTime();
Instant newStartTime = Instant.now();
this.graph.setEnableCache(false);
txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime);
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,6 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase

private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate");
this.graph.setEnableCache(RequestContext.get().isCacheEnabled());
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(graph, typeRegistry, entityStream, entityGraphMapper);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
Expand Down
Loading

0 comments on commit 18184d7

Please sign in to comment.