Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,57 @@
<artifactId>commons-codec</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-core</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-core</artifactId>
<version>3.7.0</version>
<exclusions>
<exclusion>
<artifactId>javax.json</artifactId>
<groupId>org.glassfish</groupId>
</exclusion>
<exclusion>
<artifactId>snakeyaml</artifactId>
<groupId>org.yaml</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.openmetadata.service.exception;

public class GraphException extends RuntimeException {
public GraphException(Exception e) {
super(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.openmetadata.service.graph;

import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;

public interface GraphClient {
GraphTraversalSource getReadGraphTraversalSource();
GraphTraversalSource getWriteGraphTraversalSource();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.openmetadata.service.graph;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.janusgraph.util.system.ConfigurationUtil;
import org.openmetadata.service.exception.GraphException;

import java.util.stream.Stream;

import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;

/**
* docker network create --driver bridge mynet
* <p>
* docker run -it -p 8182:8182 --name janusgraph --network mynet janusgraph/janusgraph:latest
* <p>
* UI
* <ul>
* <li> docker run -d --rm -p 8081:8081 --name puppygraph -e PUPPYGRAPH_USERNAME=puppygraph -e PUPPYGRAPH_PASSWORD=puppygraph -e PORT=8081 -e USE_GREMLIN_AUTH=false -e GREMLINSERVER_HOST=janusgraph:8182 --network mynet puppygraph/puppygraph-query:latest
* <li> docker run --rm -d -p 3002:3002 -p 3001:3001 --name=janusgraph-visualizer --network=mynet janusgraph/janusgraph-visualizer:latest
* </ul>
*/
@Slf4j
public class JanusGraphClient implements GraphClient {

public static void main(String[] args) throws Exception {
// PropertiesConfiguration conf =
// ConfigurationUtil.loadPropertiesConfig("conf/remote-graph.properties");
// Cluster cluster = Cluster.open(conf.getString("gremlin.remote.driver.clusterFile"));
// Client client = cluster.connect();

buildVertexIndex();
GraphTraversalSource g = traversal().withRemote("conf/remote-graph.properties");
long numVertices = g.V().count().next();
long numEdges = g.E().count().next();
System.out.println("Number of vertices: " + numVertices);
System.out.println("Number of edges: " + numEdges);
System.out.println("V label: " + g.V().label().dedup().toList());
System.out.println("E label: " + g.E().label().dedup().toList());
System.out.println("V properties: " + g.V().properties().key().dedup().toList());
System.out.println("E properties: " + g.E().properties().key().dedup().toList());
g.close();
}

public static void buildVertexIndex() throws Exception {
PropertiesConfiguration conf = ConfigurationUtil.loadPropertiesConfig("conf/remote-graph.properties");
Cluster cluster = Cluster.open(conf.getString("gremlin.remote.driver.clusterFile"));
Client client = cluster.connect();
final StringBuilder req = new StringBuilder();
req.append("JanusGraphManagement mgmt = graph.openManagement();");
req.append("graph.tx().rollback();");


req.append("mgmt.makePropertyKey(\"roles\").dataType(String.class).make();");
req.append("mgmt.makePropertyKey(\"json\").dataType(String.class).make();");
req.append("mgmt.makePropertyKey(\"href\").dataType(String.class).make();");
req.append("mgmt.makePropertyKey(\"changeDescription\").dataType(String.class).make();");
req.append("mgmt.makePropertyKey(\"votes\").dataType(String.class).make();");
req.append("mgmt.makePropertyKey(\"lifeCycle\").dataType(String.class).make();");

req.append("PropertyKey uid = mgmt.makePropertyKey(\"uid\").dataType(String.class).make();");
req.append("PropertyKey name = mgmt.makePropertyKey(\"name\").dataType(String.class).make();");
req.append("PropertyKey fqnHash = mgmt.makePropertyKey(\"fqnHash\").dataType(String.class).make();");
req.append("PropertyKey nameHash = mgmt.makePropertyKey(\"nameHash\").dataType(String.class).make();");

req.append("mgmt.buildIndex(\"uid_idx\", Vertex.class).addKey(uid).buildCompositeIndex();");
req.append("mgmt.buildIndex(\"name_idx\", Vertex.class).addKey(name).buildCompositeIndex();");
req.append("mgmt.buildIndex(\"fqnHash_idx\", Vertex.class).addKey(fqnHash).buildCompositeIndex();");
req.append("mgmt.buildIndex(\"nameHash_idx\", Vertex.class).addKey(nameHash).buildCompositeIndex();");

req.append("mgmt.commit();");
req.append("graph.tx().commit();");
final ResultSet resultSet = client.submit(req.toString());
Stream<Result> futureList = resultSet.stream();
futureList.map(Result::toString).forEach(System.out::println);
}

@Override
public GraphTraversalSource getReadGraphTraversalSource() {
try {
return traversal().withRemote("conf/remote-graph.properties");
} catch (Exception e) {
throw new GraphException(e);
}
}

@Override
public GraphTraversalSource getWriteGraphTraversalSource() {
try {
return traversal().withRemote("conf/remote-graph.properties");
} catch (Exception e) {
throw new GraphException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.openmetadata.service.graph;

import lombok.extern.slf4j.Slf4j;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;

// TODO - Implement NuGraphClient
@Slf4j
public class NuGraphClient implements GraphClient {

@Override
public GraphTraversalSource getReadGraphTraversalSource() {
return null;
}

@Override
public GraphTraversalSource getWriteGraphTraversalSource() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.openmetadata.service.jdbi3;

import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inV;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Relationship.CONTAINS;
import static org.openmetadata.schema.type.Relationship.MENTIONED_IN;
Expand All @@ -37,8 +38,11 @@
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.val;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.jdbi.v3.core.extension.ExtensionMethod;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.statement.StatementException;
Expand Down Expand Up @@ -123,6 +127,7 @@
import org.openmetadata.schema.util.ServicesCount;
import org.openmetadata.schema.utils.EntityInterfaceUtil;
import org.openmetadata.service.Entity;
import org.openmetadata.service.graph.JanusGraphClient;
import org.openmetadata.service.jdbi3.CollectionDAO.TagUsageDAO.TagLabelMapper;
import org.openmetadata.service.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper;
import org.openmetadata.service.jdbi3.FeedRepository.FilterType;
Expand All @@ -137,6 +142,7 @@
import org.openmetadata.service.util.jdbi.BindUUID;

public interface CollectionDAO {
org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CollectionDAO.class);
@CreateSqlObject
DatabaseDAO databaseDAO();

Expand Down Expand Up @@ -757,14 +763,53 @@ default void bulkInsertToRelationship(
+ "(:fromId, :toId, :fromEntity, :toEntity, :relation, (:json :: jsonb)) "
+ "ON CONFLICT (fromId, toId, relation) DO UPDATE SET json = EXCLUDED.json",
connectionType = POSTGRES)
void insert(
void insert0(
@BindUUID("fromId") UUID fromId,
@BindUUID("toId") UUID toId,
@Bind("fromEntity") String fromEntity,
@Bind("toEntity") String toEntity,
@Bind("relation") int relation,
@Bind("json") String json);

default Relationship fromOrdinal(int ordinal) {
for (Relationship relationship : Relationship.values()) {
if (ordinal == relationship.ordinal()) {
return relationship;
}
}
throw new IllegalArgumentException("Invalid ordinal[" + ordinal + "] for Relationship");
}

default void insert(UUID fromId, UUID toId, String fromEntity, String toEntity, int relation, String json) {
insert0(fromId, toId, fromEntity, toEntity, relation, json);
insertGraph(fromId, toId, fromEntity, toEntity, fromOrdinal(relation), json);
}

default void insertGraph(UUID fromId, UUID toId, String fromEntity, String toEntity, Relationship relationship, String json) {
try (GraphTraversalSource g = new JanusGraphClient().getWriteGraphTraversalSource()) {
val fromOpt = g.V().has("uid", fromId.toString()).tryNext();
val toOpt = g.V().has("uid", toId.toString()).tryNext();
if (fromOpt.isEmpty() || toOpt.isEmpty()) {
return;
}
val e = g.V().has("uid", fromId.toString())
.outE(relationship.value())
.filter(inV().has("uid", toId.toString()))
.tryNext()
.orElseGet(() ->
g.addE(relationship.value())
.from(fromOpt.get())
.to(toOpt.get())
.next());
g.E(e).property("json", json)
.property("fromEntity", fromEntity)
.property("toEntity", toEntity)
.iterate();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

@ConnectionAwareSqlUpdate(
value =
"INSERT IGNORE INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation) VALUES <values>",
Expand Down Expand Up @@ -881,13 +926,30 @@ String getRelation(
"DELETE from entity_relationship WHERE fromId = :fromId "
+ "AND fromEntity = :fromEntity AND toId = :toId AND toEntity = :toEntity "
+ "AND relation = :relation")
int delete(
int delete0(
@BindUUID("fromId") UUID fromId,
@Bind("fromEntity") String fromEntity,
@BindUUID("toId") UUID toId,
@Bind("toEntity") String toEntity,
@Bind("relation") int relation);

default int delete(UUID fromId, String fromEntity, UUID toId, String toEntity, int relation) {
deleteGraph(fromId, fromEntity, toId, toEntity, fromOrdinal(relation));
return delete0(fromId, fromEntity, toId, toEntity, relation);
}

default void deleteGraph(UUID fromId, String fromEntity, UUID toId, String toEntity, Relationship relationship) {
try (GraphTraversalSource g = new JanusGraphClient().getWriteGraphTraversalSource()) {
g.V().has("uid", fromId.toString())
.outE(relationship.value())
.filter(inV().has("uid", toId.toString()))
.drop()
.iterate();
} catch (Exception e) {
e.printStackTrace();
}
}

// Delete all the entity relationship fromID --- relation --> entity of type toEntity
@SqlUpdate(
"DELETE from entity_relationship WHERE fromId = :fromId AND fromEntity = :fromEntity "
Expand Down Expand Up @@ -1370,7 +1432,7 @@ interface FieldRelationshipDAO {
+ "VALUES (:fromFQNHash, :toFQNHash, :fromFQN, :toFQN, :fromType, :toType, :relation, (:json :: jsonb)) "
+ "ON CONFLICT (fromFQNHash, toFQNHash, relation) DO NOTHING",
connectionType = POSTGRES)
void insert(
void insert0(
@BindFQN("fromFQNHash") String fromFQNHash,
@BindFQN("toFQNHash") String toFQNHash,
@Bind("fromFQN") String fromFQN,
Expand All @@ -1380,6 +1442,10 @@ void insert(
@Bind("relation") int relation,
@Bind("json") String json);

default void insert(String fromFQNHash, String toFQNHash, String fromFQN, String toFQN, String fromType, String toType, int relation, String json) {
insert0(fromFQNHash, toFQNHash, fromFQN, toFQN, fromType, toType, relation, json);
}

@ConnectionAwareSqlUpdate(
value =
"INSERT INTO field_relationship(fromFQNHash, toFQNHash, fromFQN, toFQN, fromType, toType, relation, jsonSchema, json) "
Expand Down Expand Up @@ -2417,6 +2483,36 @@ void applyTag(
@Bind("labelType") int labelType,
@Bind("state") int state);

default void applyTagGraph(
TagLabel.TagSource source,
String nameHashColumn, String tagFQN,
String targetNameHashColumn, String targetFQN,
TagLabel.LabelType labelType, TagLabel.State state) {
try (GraphTraversalSource g = new JanusGraphClient().getWriteGraphTraversalSource()) {
val fromOpt = g.V().has(nameHashColumn, FullyQualifiedName.buildHash(tagFQN)).tryNext();
val toOpt = g.V().has(targetNameHashColumn, FullyQualifiedName.buildHash(targetFQN)).tryNext();
if (fromOpt.isPresent() && toOpt.isPresent()) {
val e = g.V(fromOpt.get())
.outE(Relationship.TAG_TO.value())
.filter(inV().hasId(toOpt.get().id()))
.tryNext()
.orElseGet(() ->
g.addE(Relationship.TAG_TO.value())
.from(fromOpt.get())
.to(toOpt.get())
.next());
g.E(e)
.property("source", source.name())
.property("labelType", labelType.name())
.property("state", state.name())
.iterate();
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}

}

default List<TagLabel> getTags(String targetFQN) {
List<TagLabel> tags = getTagsInternal(targetFQN);
tags.forEach(TagLabelUtil::applyTagCommonFields);
Expand Down Expand Up @@ -2499,7 +2595,16 @@ List<Pair<String, TagLabel>> getTagsInternalByPrefix(
int getTagCount(@Bind("source") int source, @BindFQN("tagFqnHash") String tagFqnHash);

@SqlUpdate("DELETE FROM tag_usage where targetFQNHash = :targetFQNHash")
void deleteTagsByTarget(@BindFQN("targetFQNHash") String targetFQNHash);
void deleteTagsByTarget0(@BindFQN("targetFQNHash") String targetFQNHash);

default void deleteTagsByTarget(EntityDAO<?> dao, String targetFQN) {
deleteTagsByTarget0(targetFQN);
try (GraphTraversalSource g = new JanusGraphClient().getWriteGraphTraversalSource()) {
g.V().has(dao.getNameHashColumn(), FullyQualifiedName.buildHash(targetFQN)).inE(Relationship.TAG_TO.value()).drop().iterate();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

@SqlUpdate(
"DELETE FROM tag_usage where tagFQNHash = :tagFqnHash AND targetFQNHash LIKE CONCAT(:targetFQNHash, '%')")
Expand Down
Loading
Loading