Skip to content

Commit

Permalink
spark caching and perf improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
sonalgoyal committed Jul 22, 2024
1 parent a89f84d commit a077c3b
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 2 deletions.
4 changes: 4 additions & 0 deletions common/client/src/main/java/zingg/common/client/Samples.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@




package zingg.common.client;

import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ZFrame<D,R,C> getBlocked( ZFrame<D,R,C> testData) throws Exception, Zin
LOG.debug("Blocking model file location is " + args.getBlockFile());
Tree<Canopy<R>> tree = getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D,R,C> blocked = getBlockingTreeUtil().getBlockHashes(testData, tree);
ZFrame<D,R,C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)); //.cache();
ZFrame<D,R,C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache();
return blocked1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ZFrame<D, R, C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws
*/
//joinH.show();
joinH = joinH.filter(joinH.gt(ColName.ID_COL));
LOG.warn("Num comparisons " + joinH.count());
if (LOG.isDebugEnabled()) LOG.debug("Num comparisons " + joinH.count());
joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.ID_COL));
bAll = bAll.repartition(args.getNumPartitions(), bAll.col(ColName.ID_COL));
joinH = joinH.joinOnCol(bAll, ColName.ID_COL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public ZFrame<Dataset<Row>, Row, Column> buildGraph(ZFrame<Dataset<Row>, Row, Co
// we need to transform the input here by using stop words
//rename id field which is a common field in data to another field as it
//clashes with graphframes :-(
vOrig = vOrig.cache();
Dataset<Row> vertices = vOrig.df();
Dataset<Row> edges = ed.df();
vertices = vertices.withColumnRenamed(ColName.ID_EXTERNAL_ORIG_COL, ColName.ID_EXTERNAL_COL);
Expand Down

0 comments on commit a077c3b

Please sign in to comment.