diff --git a/common/client/src/main/java/zingg/common/client/Samples.java b/common/client/src/main/java/zingg/common/client/Samples.java index 1a74c3874..c93fa249a 100644 --- a/common/client/src/main/java/zingg/common/client/Samples.java +++ b/common/client/src/main/java/zingg/common/client/Samples.java @@ -1,3 +1,7 @@ + + + + package zingg.common.client; import java.io.Serializable; diff --git a/common/client/src/main/java/zingg/common/client/ZFrame.java b/common/client/src/main/java/zingg/common/client/ZFrame.java index 37de2b0db..b07a264c0 100644 --- a/common/client/src/main/java/zingg/common/client/ZFrame.java +++ b/common/client/src/main/java/zingg/common/client/ZFrame.java @@ -82,6 +82,7 @@ public interface ZFrame { public ZFrame repartition(int num); public ZFrame repartition(int num, C c); public ZFrame repartition(int num,scala.collection.Seq partitionExprs); + public ZFrame repartition(scala.collection.Seq partitionExprs); public ZFrame sample(boolean repartition, float num); diff --git a/common/core/src/main/java/zingg/common/core/executor/Matcher.java b/common/core/src/main/java/zingg/common/core/executor/Matcher.java index 2e976eae9..88a16cd10 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Matcher.java +++ b/common/core/src/main/java/zingg/common/core/executor/Matcher.java @@ -49,7 +49,7 @@ public ZFrame getBlocked( ZFrame testData) throws Exception, Zin LOG.debug("Blocking model file location is " + args.getBlockFile()); Tree> tree = getBlockingTreeUtil().readBlockingTree(args); ZFrame blocked = getBlockingTreeUtil().getBlockHashes(testData, tree); - ZFrame blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)); //.cache(); + ZFrame blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache(); return blocked1; } diff --git a/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java index 4d0fff71d..2e9e261db 100644 --- a/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java +++ b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java @@ -27,7 +27,7 @@ public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws */ //joinH.show(); joinH = joinH.filter(joinH.gt(ColName.ID_COL)); - LOG.warn("Num comparisons " + joinH.count()); + if (LOG.isDebugEnabled()) LOG.debug("Num comparisons " + joinH.count()); joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.ID_COL)); bAll = bAll.repartition(args.getNumPartitions(), bAll.col(ColName.ID_COL)); joinH = joinH.joinOnCol(bAll, ColName.ID_COL); diff --git a/common/core/src/test/java/zingg/common/core/util/CsvReader.java b/common/core/src/test/java/zingg/common/core/util/CsvReader.java new file mode 100644 index 000000000..c700d6fe2 --- /dev/null +++ b/common/core/src/test/java/zingg/common/core/util/CsvReader.java @@ -0,0 +1,28 @@ +package zingg.common.core.util; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import java.util.Scanner; + +public class CsvReader { + protected List records; + IFromCsv creator; + + public CsvReader(IFromCsv creator){ + records = new ArrayList(); + this.creator = creator; + } + + public List getRecords(String file, boolean skipHeader) throws FileNotFoundException{ + int lineno = 0; + try (Scanner scanner = new Scanner(new File(file))) { + while (scanner.hasNextLine()) { + records.add(creator.fromCsv(scanner.nextLine())); + } + } + return records; + } + +} diff --git a/common/core/src/test/java/zingg/common/core/util/IFromCsv.java b/common/core/src/test/java/zingg/common/core/util/IFromCsv.java new file mode 100644 index 000000000..574da836b --- /dev/null +++ b/common/core/src/test/java/zingg/common/core/util/IFromCsv.java @@ -0,0 +1,7 @@ +package zingg.common.core.util; + +public interface IFromCsv { + + C fromCsv(String s); + +} diff --git a/common/infra/src/main/java/zingg/common/infra/util/PojoToArrayConverter.java b/common/core/src/test/java/zingg/common/core/util/PojoToArrayConverter.java similarity index 96% rename from common/infra/src/main/java/zingg/common/infra/util/PojoToArrayConverter.java rename to common/core/src/test/java/zingg/common/core/util/PojoToArrayConverter.java index a519cfe1f..e1a0ccf80 100644 --- a/common/infra/src/main/java/zingg/common/infra/util/PojoToArrayConverter.java +++ b/common/core/src/test/java/zingg/common/core/util/PojoToArrayConverter.java @@ -1,4 +1,4 @@ -package zingg.common.infra.util; +package zingg.common.core.util; import java.lang.reflect.*; import java.security.NoSuchAlgorithmException; diff --git a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java index c2984868a..7add25352 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java @@ -223,6 +223,10 @@ public ZFrame, Row, Column> repartition(int num,scala.collection.Se return new SparkFrame(df.repartition(num, partitionExprs)); } + public ZFrame, Row, Column> repartition(scala.collection.Seq partitionExprs){ + return new SparkFrame(df.repartition(partitionExprs)); + } + @Override public Column gt(String c) { return gt(this,c); diff --git a/spark/core/src/main/java/zingg/spark/core/util/SparkGraphUtil.java b/spark/core/src/main/java/zingg/spark/core/util/SparkGraphUtil.java index 44a8ac240..8a885c751 100644 --- a/spark/core/src/main/java/zingg/spark/core/util/SparkGraphUtil.java +++ b/spark/core/src/main/java/zingg/spark/core/util/SparkGraphUtil.java @@ -20,6 +20,7 @@ public ZFrame, Row, Column> buildGraph(ZFrame, Row, Co // we need to transform the input here by using stop words //rename id field which is a common field in data to another field as it //clashes with graphframes :-( + vOrig = vOrig.cache(); Dataset vertices = vOrig.df(); Dataset edges = ed.df(); vertices = vertices.withColumnRenamed(ColName.ID_EXTERNAL_ORIG_COL, ColName.ID_EXTERNAL_COL);