Skip to content

Commit

Permalink
Merge branch 'sania' of https://github.com/sania-16/zingg into sania
Browse files Browse the repository at this point in the history
  • Loading branch information
sania-16 committed Jul 23, 2024
2 parents fb4115d + a077c3b commit b29d324
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 3 deletions.
4 changes: 4 additions & 0 deletions common/client/src/main/java/zingg/common/client/Samples.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@




package zingg.common.client;

import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public interface ZFrame<D, R, C> {
public ZFrame<D, R, C> repartition(int num);
public ZFrame<D, R, C> repartition(int num, C c);
public ZFrame<D, R, C> repartition(int num,scala.collection.Seq<C> partitionExprs);
public ZFrame<D, R, C> repartition(scala.collection.Seq<C> partitionExprs);


public ZFrame<D, R, C> sample(boolean repartition, float num);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ZFrame<D,R,C> getBlocked( ZFrame<D,R,C> testData) throws Exception, Zin
LOG.debug("Blocking model file location is " + args.getBlockFile());
Tree<Canopy<R>> tree = getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D,R,C> blocked = getBlockingTreeUtil().getBlockHashes(testData, tree);
ZFrame<D,R,C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)); //.cache();
ZFrame<D,R,C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache();
return blocked1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ZFrame<D, R, C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws
*/
//joinH.show();
joinH = joinH.filter(joinH.gt(ColName.ID_COL));
LOG.warn("Num comparisons " + joinH.count());
if (LOG.isDebugEnabled()) LOG.debug("Num comparisons " + joinH.count());
joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.ID_COL));
bAll = bAll.repartition(args.getNumPartitions(), bAll.col(ColName.ID_COL));
joinH = joinH.joinOnCol(bAll, ColName.ID_COL);
Expand Down
28 changes: 28 additions & 0 deletions common/core/src/test/java/zingg/common/core/util/CsvReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package zingg.common.core.util;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;

public class CsvReader {
protected List<? extends IFromCsv> records;
IFromCsv creator;

public CsvReader(IFromCsv creator){
records = new ArrayList<IFromCsv>();
this.creator = creator;
}

public List<? extends IFromCsv> getRecords(String file, boolean skipHeader) throws FileNotFoundException{
int lineno = 0;
try (Scanner scanner = new Scanner(new File(file))) {
while (scanner.hasNextLine()) {
records.add(creator.fromCsv(scanner.nextLine()));
}
}
return records;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package zingg.common.core.util;

public interface IFromCsv {

<C> C fromCsv(String s);

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package zingg.common.infra.util;
package zingg.common.core.util;

import java.lang.reflect.*;
import java.security.NoSuchAlgorithmException;
Expand Down
4 changes: 4 additions & 0 deletions spark/client/src/main/java/zingg/spark/client/SparkFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ public ZFrame<Dataset<Row>, Row, Column> repartition(int num,scala.collection.Se
return new SparkFrame(df.repartition(num, partitionExprs));
}

public ZFrame<Dataset<Row>, Row, Column> repartition(scala.collection.Seq<Column> partitionExprs){
return new SparkFrame(df.repartition(partitionExprs));
}

@Override
public Column gt(String c) {
return gt(this,c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public ZFrame<Dataset<Row>, Row, Column> buildGraph(ZFrame<Dataset<Row>, Row, Co
// we need to transform the input here by using stop words
//rename id field which is a common field in data to another field as it
//clashes with graphframes :-(
vOrig = vOrig.cache();
Dataset<Row> vertices = vOrig.df();
Dataset<Row> edges = ed.df();
vertices = vertices.withColumnRenamed(ColName.ID_EXTERNAL_ORIG_COL, ColName.ID_EXTERNAL_COL);
Expand Down

0 comments on commit b29d324

Please sign in to comment.