Skip to content

Commit 142e709

Browse files
authored
Merge branch 'zinggAI:main' into main
2 parents f54530d + c6cb1c8 commit 142e709

File tree

181 files changed

+422
-153
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

181 files changed

+422
-153
lines changed

.github/workflows/load-test.yml

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
name: load-test
1+
name: performance-test
22

33
on:
44
schedule:
55
- cron: "0 0 */3 * *"
66

77
jobs:
8-
load-test:
9-
name: load-test-spark
8+
perf-test:
9+
name: performance-test
1010
runs-on: ubuntu-latest
1111
env:
1212
SPARK_MASTER: local[*]
@@ -26,7 +26,7 @@ jobs:
2626
- name: setup spark
2727
uses: vemonet/setup-spark@v1
2828
with:
29-
spark-version: '3.5.3'
29+
spark-version: '3.5.0'
3030
hadoop-version: '3'
3131
- name: check spark
3232
run: spark-submit --version
@@ -37,6 +37,9 @@ jobs:
3737
- name: execute py script
3838
run: |
3939
python perf_test/perfTestRunner.py
40+
- name: Commit test results
41+
if: always()
42+
run: |
4043
git config user.name nitish
4144
git config user.email [email protected]
4245
git add .

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
[![CodeQL](https://github.com/zinggAI/zingg/actions/workflows/codeql.yml/badge.svg)](https://github.com/zinggAI/zingg/actions/workflows/codeql.yml)
44
[![PMD](https://github.com/zinggAI/zingg/actions/workflows/pmd.yml/badge.svg)](https://github.com/zinggAI/zingg/actions/workflows/pmd.yml)
55
[![Junits](https://github.com/zinggAI/zingg/actions/workflows/junits.yml/badge.svg)](https://github.com/zinggAI/zingg/actions/workflows/junits.yml)
6-
6+
[![performance-test](https://github.com/zinggAI/zingg/actions/workflows/load-test.yml/badge.svg?branch=main)](https://github.com/zinggAI/zingg/actions/workflows/load-test.yml)
77
## 0.5.0 release of Zingg is coming soon!
88

99
## The Problem

common/client/src/main/java/zingg/common/client/util/ColName.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ public interface ColName {
3333
public static final String COL_SPLIT = COL_PREFIX + "split";
3434
public static final String HASH_COUNTS_COL = ColName.HASH_COL + "_count";
3535
public static final String BLOCK_SAMPLES = "blockSamples/";
36-
37-
36+
public static final String STOPWORD_COL = COL_PREFIX + "word";
37+
3838
}

common/core/src/main/java/zingg/common/core/executor/blockingverifier/IVerifyBlockingPipes.java

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public interface IVerifyBlockingPipes<S,D,R,C> {
2121

2222
public void setPipeUtil(PipeUtilBase<S,D,R,C> pipeUtil);
2323

24+
public PipeUtilBase<S,D,R,C> getPipeUtil();
25+
2426
public IModelHelper<D,R, C> getModelHelper();
2527

2628
public void setModelHelper(IModelHelper<D,R,C> imh);

common/core/src/main/java/zingg/common/core/executor/blockingverifier/VerifyBlocking.java

+25-14
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,12 @@ public void execute() throws ZinggClientException {
3939
ZFrame<D,R,C> testDataOriginal = getTestData();
4040
testDataOriginal = getFieldDefColumnsDS(testDataOriginal).cache();
4141
ZFrame<D,R,C> blocked = getBlockedData(testDataOriginal);
42-
LOG.info("Blocked");
43-
44-
ZFrame<D,R,C> blockCounts = blocked.select(ColName.HASH_COL).groupByCount(ColName.HASH_COL, ColName.HASH_COUNTS_COL);
45-
46-
getPipeUtil().write(blockCounts,getVerifyBlockingPipeUtil().getCountsPipe(args));
47-
48-
ZFrame<D,R,C> blockTopRec = blockCounts.select(ColName.HASH_COL,ColName.HASH_COUNTS_COL).sortDescending(ColName.HASH_COUNTS_COL).limit(noOfBlocks);
49-
50-
getBlockSamples(blocked, blockTopRec,verifyBlockingPipeUtil);
42+
LOG.info("Blocked Data" + blocked.count());
43+
//get the no of counts per hash
44+
ZFrame<D,R,C> blockCounts = getBlockCounts(blocked, verifyBlockingPipeUtil);
45+
getPipeUtil().write(blockCounts,getVerifyBlockingPipeUtil().getCountsPipe(args));
46+
//get the records associated with the top 3 hashes
47+
getBlockSamples(blocked, blockCounts,verifyBlockingPipeUtil);
5148

5249
} catch (Exception e) {
5350
if (LOG.isDebugEnabled()){
@@ -58,21 +55,35 @@ public void execute() throws ZinggClientException {
5855

5956
}
6057

58+
protected ZFrame<D, R, C> getBlockCounts(ZFrame<D, R, C> blocked, IVerifyBlockingPipes<S,D,R,C> verifyBlockingPipeUtil) throws ZinggClientException{
59+
ZFrame<D,R,C> blockCounts = blocked.select(ColName.HASH_COL).groupByCount(ColName.HASH_COL, ColName.HASH_COUNTS_COL);
60+
return blockCounts;
61+
}
6162

62-
public void getBlockSamples(ZFrame<D, R, C> blocked, ZFrame<D, R, C> blockTopRec, IVerifyBlockingPipes verifyBlockingPipeUtil) throws ZinggClientException {
63+
protected ZFrame<D,R,C> getTopRecordsDF(ZFrame<D,R,C> blockCounts){
64+
return blockCounts.select(ColName.HASH_COL,ColName.HASH_COUNTS_COL).sortDescending(ColName.HASH_COUNTS_COL).limit(noOfBlocks);
65+
}
66+
67+
protected void getBlockSamples(ZFrame<D, R, C> blocked, ZFrame<D, R, C> blockCounts, IVerifyBlockingPipes<S,D,R,C> verifyBlockingPipeUtil) throws ZinggClientException {
68+
ZFrame<D,R,C> blockTopRec = getTopRecordsDF(blockCounts);
6369
List<R> topRec = blockTopRec.collectAsList();
6470

6571
for(R row: topRec) {
6672
int hash = (int) blockTopRec.getAsInt(row, ColName.HASH_COL);
67-
long count = (long) blockTopRec.getAsLong(row, ColName.HASH_COUNTS_COL);
68-
int sampleSize = Math.max(1, (int) Math.ceil(count * percentageOfBlockedRecords));
69-
ZFrame<D,R,C> matchingRecords = null;
70-
matchingRecords = blocked.filter(blocked.equalTo(ColName.HASH_COL,String.valueOf(hash))).limit(sampleSize);
73+
ZFrame<D,R,C> matchingRecords = getMatchingRecords(row, blockTopRec, blocked, hash);
7174
getPipeUtil().write(matchingRecords, getVerifyBlockingPipeUtil().getBlockSamplesPipe(args, ColName.BLOCK_SAMPLES + hash));
7275
}
7376

7477
}
7578

79+
protected ZFrame<D,R,C> getMatchingRecords(R row, ZFrame<D,R,C> blockTopRec, ZFrame<D,R,C> blocked, int hash){
80+
long count = (long) blockTopRec.getAsLong(row, ColName.HASH_COUNTS_COL);
81+
int sampleSize = Math.max(1, (int) Math.ceil(count * percentageOfBlockedRecords));
82+
ZFrame<D,R,C> matchingRecords = null;
83+
matchingRecords = blocked.filter(blocked.equalTo(ColName.HASH_COL,String.valueOf(hash))).limit(sampleSize);
84+
return matchingRecords;
85+
}
86+
7687
public ZFrame<D, R, C> getFieldDefColumnsDS(ZFrame<D, R, C> testDataOriginal) {
7788
ZidAndFieldDefSelector zidAndFieldDefSelector = new ZidAndFieldDefSelector(args.getFieldDefinition());
7889
return testDataOriginal.select(zidAndFieldDefSelector.getCols());

common/core/src/main/java/zingg/common/core/executor/blockingverifier/VerifyBlockingPipes.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,6 @@ public abstract class VerifyBlockingPipes<S,D,R,C> implements IVerifyBlockingPip
1313
Pipe<D,R,C> blockSamplesPipe;
1414
IModelHelper<D,R,C> modelHelper;
1515

16-
public IModelHelper<D, R, C> getModelHelper() {
17-
return this.modelHelper;
18-
}
19-
20-
21-
public void setModelHelper(IModelHelper<D, R, C> mh) {
22-
this.modelHelper = mh;
23-
}
24-
2516

2617
public VerifyBlockingPipes(PipeUtilBase<S,D,R,C> pipeUtil, long timestamp, IModelHelper<D, R, C> mh) {
2718
setPipeUtil(pipeUtil);
@@ -55,19 +46,31 @@ public void setBlockSamplesPipe(Pipe<D, R, C> blockSamplesPipe) {
5546
this.blockSamplesPipe = blockSamplesPipe;
5647
}
5748

58-
59-
60-
6149
@Override
6250
public void setTimestamp(long timestamp) {
6351
this.timestamp = timestamp;
6452
}
6553

54+
@Override
55+
public PipeUtilBase<S,D,R,C> getPipeUtil(){
56+
return pipeUtil;
57+
}
58+
6659

6760
@Override
6861
public void setPipeUtil(PipeUtilBase<S, D, R, C> pipeUtil) {
6962
this.pipeUtil = pipeUtil;
7063
}
7164

65+
@Override
66+
public IModelHelper<D, R, C> getModelHelper() {
67+
return this.modelHelper;
68+
}
69+
70+
@Override
71+
public void setModelHelper(IModelHelper<D, R, C> mh) {
72+
this.modelHelper = mh;
73+
}
74+
7275

7376
}

common/core/src/test/java/zingg/common/core/documenter/TestDocumenterBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void initialize(Context<S, D, R, C, T> context) throws ZinggClientExcepti
4242
this.context = context;
4343
}
4444

45-
public abstract DocumenterBase<S,D,R,C,T> getDocumenter(IContext<S,D,R,C,T> context, IArguments args, ClientOptions options);
45+
protected abstract DocumenterBase<S,D,R,C,T> getDocumenter(IContext<S,D,R,C,T> context, IArguments args, ClientOptions options);
4646

4747
@DisplayName ("Test Column is a Z column or not")
4848
@Test

common/core/src/test/java/zingg/common/core/executor/TestExecutorsSingle.java

-11
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,11 @@
77
import org.apache.commons.logging.LogFactory;
88

99
import zingg.common.client.ZinggClientException;
10-
import zingg.common.core.executor.blockingverifier.IVerifyBlockingPipes;
11-
import zingg.common.core.executor.validate.BlockerValidator;
1210
import zingg.common.core.executor.validate.LabellerValidator;
1311
import zingg.common.core.executor.validate.LinkerValidator;
1412
import zingg.common.core.executor.validate.MatcherValidator;
1513
import zingg.common.core.executor.validate.TrainerValidator;
1614
import zingg.common.core.executor.validate.TrainingDataFinderValidator;
17-
import zingg.common.core.executor.blockingverifier.VerifyBlocking;
1815

1916
public abstract class TestExecutorsSingle<S, D, R, C, T> extends TestExecutorsGeneric<S, D, R, C, T> {
2017

@@ -45,11 +42,6 @@ public void getBaseExecutors() throws ZinggClientException, IOException{
4542
Trainer<S, D, R, C, T> trainer = getTrainer();
4643
executorTesterList.add(new ExecutorTester<S, D, R, C, T>(trainer,getTrainerValidator(trainer),getConfigFile(),getModelId(),getDFObjectUtil()));
4744

48-
VerifyBlocking<S, D, R, C, T> verifyBlocker = getVerifyBlocker();
49-
IVerifyBlockingPipes<S, D, R, C> verifyBlockingPipes = getVerifyBlockingPipes();
50-
verifyBlockingPipes.setTimestamp(verifyBlocker.getTimestamp());
51-
executorTesterList.add(new ExecutorTester<S, D, R, C, T>(verifyBlocker, new BlockerValidator<S, D, R, C, T>(verifyBlocker, verifyBlockingPipes),getConfigFile(),getModelId(),getDFObjectUtil()));
52-
5345
}
5446

5547
public void getAdditionalExecutors() throws ZinggClientException, IOException{
@@ -73,12 +65,9 @@ public void getAdditionalExecutors() throws ZinggClientException, IOException{
7365
protected abstract Trainer<S, D, R, C, T> getTrainer() throws ZinggClientException;
7466

7567
protected abstract TrainerValidator<S, D, R, C, T> getTrainerValidator(Trainer<S, D, R, C, T> trainer);
76-
77-
protected abstract VerifyBlocking<S, D, R, C, T> getVerifyBlocker() throws ZinggClientException;
7868

7969
protected abstract Matcher<S, D, R, C, T> getMatcher() throws ZinggClientException;
8070

8171
protected abstract Linker<S, D, R, C, T> getLinker() throws ZinggClientException;
8272

83-
protected abstract IVerifyBlockingPipes<S, D, R, C> getVerifyBlockingPipes() throws ZinggClientException;
8473
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package zingg.common.core.executor.blockingverifier;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import java.util.List;
7+
8+
import org.apache.commons.logging.Log;
9+
import org.apache.commons.logging.LogFactory;
10+
import org.junit.jupiter.api.Test;
11+
12+
import zingg.common.client.Arguments;
13+
import zingg.common.client.IArguments;
14+
import zingg.common.client.ZFrame;
15+
import zingg.common.client.ZinggClientException;
16+
import zingg.common.client.util.ColName;
17+
import zingg.common.client.util.DFObjectUtil;
18+
import zingg.common.core.context.Context;
19+
import zingg.common.core.executor.blockingverifier.data.BlockingVerifyData;
20+
import zingg.common.core.executor.blockingverifier.model.BlockCountsData;
21+
import zingg.common.core.executor.blockingverifier.model.BlockedData;
22+
23+
public abstract class TestVerifyBlocking<S,D,R,C,T> {
24+
25+
public static final Log LOG = LogFactory.getLog(TestVerifyBlocking.class);
26+
protected Context<S, D, R, C, T> context;
27+
protected DFObjectUtil<S, D, R, C> dfObjectUtil;
28+
protected IVerifyBlockingPipes<S,D,R,C> verifyBlockingPipes;
29+
IArguments arguments = new Arguments();
30+
31+
public TestVerifyBlocking(){
32+
33+
}
34+
35+
public void initialize(DFObjectUtil<S, D, R, C> dfObjectUtil, Context<S, D, R, C, T> context) {
36+
this.dfObjectUtil = dfObjectUtil;
37+
this.context = context;
38+
}
39+
40+
public abstract VerifyBlocking<S,D,R,C,T> getVerifyBlocker();
41+
42+
public abstract IVerifyBlockingPipes<S,D,R,C> getVerifyBlockingPipes();
43+
44+
@Test
45+
public void testGetBlockCounts() throws ZinggClientException, Exception{
46+
VerifyBlocking<S,D,R,C,T> vb = getVerifyBlocker();
47+
verifyBlockingPipes = getVerifyBlockingPipes();
48+
49+
ZFrame<D,R,C> blocked = dfObjectUtil.getDFFromObjectList(BlockingVerifyData.getBlockedDF1(), BlockedData.class);
50+
ZFrame<D,R,C> blockCounts = vb.getBlockCounts(blocked,verifyBlockingPipes);
51+
blockCounts = blockCounts.sortDescending(ColName.HASH_COUNTS_COL);
52+
53+
ZFrame<D,R,C> expBlockCounts = dfObjectUtil.getDFFromObjectList(BlockingVerifyData.getExpectedBlockedDF1(), BlockCountsData.class);
54+
55+
assertTrue(expBlockCounts.except(blockCounts).isEmpty());
56+
assertTrue(blockCounts.except(expBlockCounts).isEmpty());
57+
}
58+
59+
@Test
60+
public void testGetBlockSamples() throws Exception, ZinggClientException{
61+
VerifyBlocking<S,D,R,C,T> vb = getVerifyBlocker();
62+
verifyBlockingPipes = getVerifyBlockingPipes();
63+
vb.setModelHelper(verifyBlockingPipes.getModelHelper());
64+
verifyBlockingPipes.setTimestamp(vb.getTimestamp());
65+
arguments.setModelId("junit_vb");
66+
vb.setArgs(arguments);
67+
68+
ZFrame<D,R,C> blocked = dfObjectUtil.getDFFromObjectList(BlockingVerifyData.getBlockedDF1(), BlockedData.class);
69+
ZFrame<D,R,C> blockCounts = vb.getBlockCounts(blocked,verifyBlockingPipes);
70+
ZFrame<D,R,C> blockTopRec = vb.getTopRecordsDF(blockCounts);
71+
assertTrue(checkNoOfTopBlocks(blockTopRec));
72+
73+
List<R> topRec = blockTopRec.collectAsList();
74+
assertEquals("3930",blockTopRec.getAsString(topRec.get(1), ColName.HASH_COL));
75+
76+
ZFrame<D,R,C> matchingRec1 = vb.getMatchingRecords(topRec.get(0), blockTopRec, blocked, 3915);
77+
context.getPipeUtil().write(matchingRec1, verifyBlockingPipes.getBlockSamplesPipe(arguments, ColName.BLOCK_SAMPLES + "3915"));
78+
79+
ZFrame<D,R,C> matchingRec2 = vb.getMatchingRecords(topRec.get(2), blockTopRec, blocked, -3910);
80+
context.getPipeUtil().write(matchingRec2, verifyBlockingPipes.getBlockSamplesPipe(arguments, ColName.BLOCK_SAMPLES + "-3910"));
81+
82+
ZFrame<D, R, C> df1 = context.getPipeUtil().read(false, false, verifyBlockingPipes.getBlockSamplesPipe(arguments, ColName.BLOCK_SAMPLES + "3915"));
83+
ZFrame<D, R, C> df2 = context.getPipeUtil().read(false, false, verifyBlockingPipes.getBlockSamplesPipe(arguments, getMassagedTableName("-3910")));
84+
85+
assertTrue(df1.count() == 3L);
86+
assertTrue(df2.count() == 1L);
87+
}
88+
89+
public boolean checkNoOfTopBlocks(ZFrame<D,R,C> blockTopRec){
90+
if(blockTopRec.count() == 3L){
91+
return true;
92+
}
93+
else{
94+
return false;
95+
}
96+
}
97+
98+
public abstract String getMassagedTableName(String hash);
99+
100+
101+
}

0 commit comments

Comments
 (0)