Skip to content

Commit

Permalink
Merge pull request #3876 from atlanhq/dev/janusupgrade
Browse files Browse the repository at this point in the history
Janusupgrade to 1.0.0 with a smart marker logic to reduce Cassandra Calls
  • Loading branch information
aarshi0301 authored Dec 11, 2024
2 parents 174d59e + 7630c8c commit 21e72d1
Show file tree
Hide file tree
Showing 20 changed files with 133 additions and 100 deletions.
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ unzip -o keycloak-15.0.2.1.zip -d ~/.m2/repository/org
echo "Maven Building"

if [ "$1" == "build_without_dashboard" ]; then
mvn -pl '!test-tools,!addons/hdfs-model,!addons/hive-bridge,!addons/hive-bridge-shim,!addons/falcon-bridge-shim,!addons/falcon-bridge,!addons/sqoop-bridge,!addons/sqoop-bridge-shim,!addons/hbase-bridge,!addons/hbase-bridge-shim,!addons/hbase-testing-util,!addons/kafka-bridge,!addons/impala-hook-api,!addons/impala-bridge-shim,!addons/impala-bridge,!dashboardv2,!dashboardv3' -Dmaven.test.skip -DskipTests -Drat.skip=true -DskipOverlay -DskipEnunciate=true package -Pdist
mvn -pl '!addons/hdfs-model,!addons/hive-bridge,!addons/hive-bridge-shim,!addons/falcon-bridge-shim,!addons/falcon-bridge,!addons/sqoop-bridge,!addons/sqoop-bridge-shim,!addons/hbase-bridge,!addons/hbase-bridge-shim,!addons/hbase-testing-util,!addons/kafka-bridge,!addons/impala-hook-api,!addons/impala-bridge-shim,!addons/impala-bridge,!dashboardv2,!dashboardv3' -Dmaven.test.skip -DskipTests -Drat.skip=true -DskipOverlay -DskipEnunciate=true package -Pdist
else
mvn -pl '!test-tools,!addons/hdfs-model,!addons/hive-bridge,!addons/hive-bridge-shim,!addons/falcon-bridge-shim,!addons/falcon-bridge,!addons/sqoop-bridge,!addons/sqoop-bridge-shim,!addons/hbase-bridge,!addons/hbase-bridge-shim,!addons/hbase-testing-util,!addons/kafka-bridge,!addons/impala-hook-api,!addons/impala-bridge-shim,!addons/impala-bridge' -Dmaven.test.skip -DskipTests -Drat.skip=true -DskipEnunciate=true package -Pdist
mvn -pl '!addons/hdfs-model,!addons/hive-bridge,!addons/hive-bridge-shim,!addons/falcon-bridge-shim,!addons/falcon-bridge,!addons/sqoop-bridge,!addons/sqoop-bridge-shim,!addons/hbase-bridge,!addons/hbase-bridge-shim,!addons/hbase-testing-util,!addons/kafka-bridge,!addons/impala-hook-api,!addons/impala-bridge-shim,!addons/impala-bridge' -Dmaven.test.skip -DskipTests -Drat.skip=true -DskipEnunciate=true package -Pdist
fi

echo "[DEBUG listing distro/target"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,4 @@ public interface AtlasGraph<V, E> {
*/
AtlasGraphIndexClient getGraphIndexClient()throws AtlasException;


void setEnableCache(boolean enableCache);

Boolean isCacheEnabled();
}
4 changes: 4 additions & 0 deletions graphdb/janus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
</exclusion>
<exclusion>
<groupId>cassandra-hadoop-util</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,14 +684,6 @@ private Edge getFirstActiveEdge(GraphTraversal gt) {
return null;
}

public void setEnableCache(boolean enableCache) {
this.janusGraph.setEnableCache(enableCache);
}

public Boolean isCacheEnabled() {
return this.janusGraph.isCacheEnabled();
}

public JanusGraphTransaction getTransaction() {
return this.janusGraph.newThreadBoundTransaction();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,

public AtlasJanusGraphDatabase() {
//update registry
GraphSONMapper.build().addRegistry(JanusGraphIoRegistry.getInstance()).create();
GraphSONMapper.build().addRegistry(JanusGraphIoRegistry.instance()).create();
}

public static Configuration getConfiguration() throws AtlasException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Map<String, Object> directIndexQuery(String query) throws AtlasBaseExcept

@Override
public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices() {
Iterator<JanusGraphIndexQuery.Result<JanusGraphVertex>> results = query.vertices().iterator();
Iterator<JanusGraphIndexQuery.Result<JanusGraphVertex>> results = query.vertexStream().iterator();

Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>> function =
new Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>>() {
Expand All @@ -77,7 +77,7 @@ public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices(int offset, i
Iterator<JanusGraphIndexQuery.Result<JanusGraphVertex>> results = query
.offset(offset)
.limit(limit)
.vertices().iterator();
.vertexStream().iterator();

Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>> function =
new Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>>() {
Expand All @@ -100,7 +100,7 @@ public Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> vertices(int offset, i
.orderBy(sortBy, sortOrder)
.offset(offset)
.limit(limit)
.vertices().iterator();
.vertexStream().iterator();

Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>> function =
new Function<JanusGraphIndexQuery.Result<JanusGraphVertex>, Result<AtlasJanusVertex, AtlasJanusEdge>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.janusgraph.graphdb.query.condition.Not;
import org.janusgraph.graphdb.query.condition.Or;
import org.janusgraph.graphdb.query.condition.PredicateCondition;
import org.janusgraph.graphdb.tinkerpop.optimize.step.Aggregation;
import org.janusgraph.graphdb.types.ParameterType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -672,7 +673,7 @@ public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever info
doc -> doc.getFieldValue(keyIdField).toString());
}

@Override
//@Override
public Long queryCount(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
try {
String collection = query.getStore();
Expand Down Expand Up @@ -1178,6 +1179,16 @@ public boolean exists() throws BackendException {
}
}

@Override
public Number queryAggregation(IndexQuery indexQuery, KeyInformation.IndexRetriever indexRetriever, BaseTransaction baseTransaction, Aggregation aggregation) throws BackendException {
return null;
}

@Override
public void clearStore(String s) throws BackendException {

}

/*
################# UTILITY METHODS #######################
*/
Expand Down
18 changes: 4 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@
<curator.version>4.3.0</curator.version>
<doxia.version>1.8</doxia.version>
<dropwizard-metrics>3.2.2</dropwizard-metrics>
<elasticsearch.version>7.16.2</elasticsearch.version>
<elasticsearch.version>7.17.4</elasticsearch.version>
<entity.repository.impl>org.apache.atlas.repository.audit.InMemoryEntityAuditRepository</entity.repository.impl>
<enunciate-maven-plugin.version>2.13.2</enunciate-maven-plugin.version>
<failsafe.version>2.18.1</failsafe.version>
Expand All @@ -717,7 +717,7 @@
<httpcomponents-httpcore.version>4.4.13</httpcomponents-httpcore.version>
<jackson.databind.version>2.12.4</jackson.databind.version>
<jackson.version>2.12.4</jackson.version>
<janusgraph.version>0.6.03</janusgraph.version>
<janusgraph.version>1.0.0</janusgraph.version>
<janusgraph.cassandra.version>0.5.3</janusgraph.cassandra.version>
<javax-inject.version>1</javax-inject.version>
<javax.servlet.version>3.1.0</javax.servlet.version>
Expand Down Expand Up @@ -773,7 +773,7 @@
<surefire.forkCount>2C</surefire.forkCount>
<surefire.version>3.0.0-M5</surefire.version>
<testng.version>6.9.4</testng.version>
<tinkerpop.version>3.5.1</tinkerpop.version>
<tinkerpop.version>3.7.0</tinkerpop.version>
<woodstox-core.version>5.0.3</woodstox-core.version>
<zookeeper.version>3.5.5</zookeeper.version>
<redis.client.version>3.20.1</redis.client.version>
Expand Down Expand Up @@ -836,16 +836,6 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>github</id>
<url>https://maven.pkg.github.com/atlanhq/atlan-janusgraph</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>hortonworks.repo</id>
<url>https://repo.hortonworks.com/content/repositories/releases</url>
Expand Down Expand Up @@ -1806,7 +1796,7 @@
<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>1.4</version>
<version>1.11.0</version>
</plugin>

<plugin>
Expand Down
14 changes: 13 additions & 1 deletion repository/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@
<artifactId>atlas-graphdb-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-util</artifactId>
<version>${tinkerpop.version}</version>
</dependency>

<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>4.0.1</version>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down Expand Up @@ -111,7 +123,7 @@
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
<artifactId>netty-handler</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl
return null;
}
RequestContext.get().endMetricRecord(elasticSearchQueryMetric);
prepareSearchResult(ret, indexQueryResult, resultAttributes, AtlasConfiguration.FETCH_COLLAPSED_RESULT.getBoolean());
prepareSearchResult(ret, indexQueryResult, resultAttributes, true);

ret.setAggregations(indexQueryResult.getAggregationMap());
ret.setApproximateCount(indexQuery.vertexTotals());
Expand Down Expand Up @@ -1071,7 +1071,7 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}
}

private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean collapseResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
if(LOG.isDebugEnabled()){
Expand Down Expand Up @@ -1099,7 +1099,7 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
if (collapseResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

Set<String> collapseKeys = result.getCollapseKeys();
Expand All @@ -1121,7 +1121,7 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu

DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResultSync(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
Expand All @@ -1145,8 +1145,33 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Map<String, AtlasSearchResult> collapse = new HashMap<>();
Set<String> collapseKeys = result.getCollapseKeys();

for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(searchParams);
Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes));
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());

// Directly iterate over collapse vertices
Iterator<Result> iterator = indexQueryCollapsedResult.getIterator();
while (iterator != null && iterator.hasNext()) {
Result collapseResult = iterator.next();
AtlasVertex collapseVertex = collapseResult.getVertex();
if (collapseVertex == null) continue;

AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes);
collapseRet.addEntity(collapseHeader);
}

collapse.put(collapseKey, collapseRet);
}

return collapse;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,10 @@ public static Boolean isEntityIncomplete(AtlasElement element) {
return ret;
}

public static Boolean isEntityIncomplete(Integer value) {
return value != null && value.equals(INCOMPLETE_ENTITY_VALUE) ? Boolean.TRUE : Boolean.FALSE;
}

public static Boolean getEntityHasLineage(AtlasElement element) {
if (element.getPropertyKeys().contains(HAS_LINEAGE)) {
return element.getProperty(HAS_LINEAGE, Boolean.class);
Expand Down Expand Up @@ -1107,10 +1111,24 @@ public static Map getCustomAttributes(AtlasElement element) {
return ret;
}

public static Map getCustomAttributes(String customAttrsString) {
Map ret = null;

if (customAttrsString != null) {
ret = AtlasType.fromJson(customAttrsString, Map.class);
}

return ret;
}

public static Set<String> getLabels(AtlasElement element) {
return parseLabelsString(element.getProperty(LABELS_PROPERTY_KEY, String.class));
}

public static Set<String> getLabels(String labels) {
return parseLabelsString(labels);
}

public static Integer getProvenanceType(AtlasElement element) {
return element.getProperty(Constants.PROVENANCE_TYPE_KEY, Integer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ private void startMonitoring() {
try {
startTime = recoveryInfoManagement.getStartTime();
Instant newStartTime = Instant.now();
this.graph.setEnableCache(false);
txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime);
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,6 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase

private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate");
this.graph.setEnableCache(RequestContext.get().isCacheEnabled());
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(graph, typeRegistry, entityStream, entityGraphMapper);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
Expand Down
Loading

0 comments on commit 21e72d1

Please sign in to comment.