diff --git a/python-api/README.md b/python-api/README.md index 98816d7cc..bde9e6842 100644 --- a/python-api/README.md +++ b/python-api/README.md @@ -121,7 +121,7 @@ rdd_op = gp.GetPySparkRDDOfAllElements(view=view) rdd = graph.execute(rdd_op, user) ``` -USe pyspark to calculate the distribution of counts +Use pyspark to calculate the distribution of counts ``` def getCount(edge): @@ -130,6 +130,39 @@ def getCount(edge): count_distribution = rdd.map(getCount).reduceByKey(lambda a, b: a + b).collect() ``` +Use pyspark to create a GraphFrame + +``` +from gafferpy_pyspark import gaffer_pyspark as gp +from graphframes import * + +edge=g.ElementDefinition(group="YOUR_EDGE_GROUP",group_by=[]) +entity=g.ElementDefinition(group="YOUR_ENTITY_GROUP",group_by=[]) +entityView=g.View( + entities=[entity] +) +edgeView=g.View( + edges=[edge] +) + +df_entity_op = gp.GetPysparkDataFrameOfElements(entityView, sampleRatio=0.1) +df_edge_op = gp.GetPysparkDataFrameOfElements(edgeView, sampleRatio=0.1) +df_entity = graph.execute(df_entity_op, user) +df_edge = graph.execute(df_edge_op, user) + +edges = df_edge.withColumnRenamed("destination", "dst").withColumnRenamed("source", "src") +entities = df_entity.withColumnRenamed("vertex", "id") + +gf = GraphFrame(entities, edges) +``` + +Use pyspark to run Page Rank on a GraphFrame + +``` +results = gf.pageRank(resetProbability=0.15, maxIter=10) +results.vertices.show() +``` + ### Use with larger graphs ### If you already have a large Gaffer instance running and want to use the pyspark api with it, follow the same steps above except that you will need to point to the larger graph's schema, graphconfig and store-properties files when you create the python graph object. diff --git a/python-api/gafferpy-core/src/main/java/uk/gov/gchq/gaffer/python/data/serialiser/config/PythonSerialiserConfig.java b/python-api/gafferpy-core/src/main/java/uk/gov/gchq/gaffer/python/data/serialiser/config/PythonSerialiserConfig.java index daef0bef3..8a36f7580 100644 --- a/python-api/gafferpy-core/src/main/java/uk/gov/gchq/gaffer/python/data/serialiser/config/PythonSerialiserConfig.java +++ b/python-api/gafferpy-core/src/main/java/uk/gov/gchq/gaffer/python/data/serialiser/config/PythonSerialiserConfig.java @@ -51,17 +51,18 @@ public PythonSerialiserConfig(final FileInputStream fis) { } public PythonSerialiserConfig(final byte[] bytes) { - Map map = null; + Map map = null; + Map serialisersMap = null; this.serialisers = new HashMap<>(); try { map = JSONSerialiser.deserialise(bytes, Map.class); - } catch (final SerialisationException e) { - e.printStackTrace(); - } - - for (final String s : map.keySet()) { + serialisersMap = (Map) map.get("serialisers"); + } catch (final SerialisationException e) { + e.printStackTrace(); + } + for (final String s : serialisersMap.keySet()) { try { - this.serialisers.put(Class.forName(s), Class.forName(map.get(s))); + this.serialisers.put(Class.forName(s), Class.forName(serialisersMap.get(s))); } catch (final ClassNotFoundException e) { e.printStackTrace(); } diff --git a/python-api/gafferpy-core/src/main/python/gafferpy_core/gaffer_utils.py b/python-api/gafferpy-core/src/main/python/gafferpy_core/gaffer_utils.py index 6a3ff0fcf..8de46b953 100644 --- a/python-api/gafferpy-core/src/main/python/gafferpy_core/gaffer_utils.py +++ b/python-api/gafferpy-core/src/main/python/gafferpy_core/gaffer_utils.py @@ -97,9 +97,15 @@ def convertElement(input): class User(g.ToJson): CLASS='uk.gov.gchq.gaffer.user.User' - def __init__(self, user_id=None): + def __init__(self, user_id=None, data_auths=None): self._class_name=self.CLASS, self.user_id=user_id; + self.data_auths=data_auths; def to_json(self): - return {'userId': self.user_id} + json = {} + if self.user_id is not None: + json['userId'] = self.user_id + if self.data_auths is not None: + json['dataAuths'] = self.data_auths + return json