Skip to content

Commit

Permalink
Merge pull request #152 from cirnoooo123/main
Browse files Browse the repository at this point in the history
implement distance-join
  • Loading branch information
SongY123 authored Apr 24, 2023
2 parents 751cbb4 + 2aebb55 commit 6d4701b
Show file tree
Hide file tree
Showing 14 changed files with 440 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,37 @@ public void testSqlRangeCount() throws SQLException {

@Test
public void testSqlRangeJoin() throws SQLException {
String sql = "select * from spatial s1 join spatial s2 on DWithin(s1.S_POINT, s2.S_POINT, 5)";
String sql = "select * from join_left s1 join spatial s2 on DWithin(s1.JL_POINT, s2.S_POINT, 500000)";
try (Statement stmt = user.createStatement()) {
ResultSet dataset = stmt.executeQuery(sql);
long count = 0;
while (dataset.next()) {
printLine(dataset);
++count;
}
assertEquals(1, count);
assertEquals(78, count);
dataset.close();
}
}

@Test
public void testSqlKNNQuery() throws SQLException {
String sql = "select * from spatial order by Distance(S_POINT, POINT(1404050, -4762163)) asc limit 10";
public void testSqlKNNQuery1() throws SQLException {
String sql = "select S_ID from spatial order by Distance(POINT(1404050, -4762163), S_POINT) asc limit 10";
try (Statement stmt = user.createStatement()) {
ResultSet dataset = stmt.executeQuery(sql);
long count = 0;
while (dataset.next()) {
printLine(dataset);
++count;
}
assertEquals(10, count);
dataset.close();
}
}

@Test
public void testSqlKNNQuery2() throws SQLException {
String sql = "select S_ID from spatial where KNN(POINT(1404050, -4762163), S_POINT, 10)";
try (Statement stmt = user.createStatement()) {
ResultSet dataset = stmt.executeQuery(sql);
long count = 0;
Expand All @@ -154,15 +169,15 @@ public void testSqlKNNQuery() throws SQLException {

@Test
public void testSqlKNNJOIN() throws SQLException {
String sql = "select * from spatial s1 join spatial s2 on KNN(s1.S_POINT, s2.S_POINT, 5)";
String sql = "select s1.JL_ID, s2.S_ID from join_left s1 join spatial s2 on KNN(s1.JL_POINT, s2.S_POINT, 5)";
try (Statement stmt = user.createStatement()) {
ResultSet dataset = stmt.executeQuery(sql);
long count = 0;
while (dataset.next()) {
printLine(dataset);
++count;
}
assertEquals(1, count);
assertEquals(50, count);
dataset.close();
}
}
Expand Down
9 changes: 9 additions & 0 deletions benchmark/src/test/resources/spatial-user-configs.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
"localName": "spatial"
}
]
},
{
"tableName": "join_left",
"localTables": [
{
"endpoint": "localhost:12345",
"localName": "join_left"
}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum ErrorCode {
IMPLEMENTOR_CONFIG_FILE_PATH_NOT_SET(20037, "implementor config file path not set"),
IMPLEMENTOR_CONFIG_FILE_NOT_FOUND(20038, "implementor config file: {} not found"),

RANGE_JOIN_LEFT_TABLE_NOT_PUBLIC(20041, "left table in range join must be public"),

// udf error
UDF_LOAD_FAILED(30001, "udf: {} load failed"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.hufudb.openhufu.core.implementor;

import com.hufudb.openhufu.common.exception.ErrorCode;
import com.hufudb.openhufu.common.exception.OpenHuFuException;
import com.hufudb.openhufu.core.client.OpenHuFuClient;
import com.hufudb.openhufu.core.client.OwnerClient;
import java.util.ArrayList;
Expand All @@ -8,11 +10,15 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import com.hufudb.openhufu.core.implementor.spatial.join.DistanceJoin;
import com.hufudb.openhufu.core.implementor.spatial.join.KNNJoin;
import com.hufudb.openhufu.core.implementor.spatial.knn.BinarySearchKNN;
import com.hufudb.openhufu.core.implementor.spatial.knn.KNNConverter;
import com.hufudb.openhufu.core.sql.plan.PlanUtils;
import com.hufudb.openhufu.data.schema.Schema;
import com.hufudb.openhufu.data.storage.*;
import com.hufudb.openhufu.data.storage.MultiSourceDataSet.Producer;
import com.hufudb.openhufu.expression.ExpressionUtils;
import com.hufudb.openhufu.implementor.PlanImplementor;
import com.hufudb.openhufu.interpreter.Interpreter;
import com.hufudb.openhufu.plan.BinaryPlan;
Expand Down Expand Up @@ -102,11 +108,111 @@ DataSet ownerSideQuery(Plan plan) {
return concurrentDataSet;
}

private boolean isPrivacyRangeJoin(BinaryPlan plan) {
if (plan.getJoinCond().getModifier().equals(Modifier.PUBLIC)) {
return false;
}
if (!plan.getJoinCond().getCondition().getIn(0).getModifier().equals(Modifier.PUBLIC)) {
throw new OpenHuFuException(ErrorCode.RANGE_JOIN_LEFT_TABLE_NOT_PUBLIC);
}
return plan.getJoinCond().getCondition().getStr().equals("dwithin");
}

private boolean isPrivacyKNNJoin(BinaryPlan plan) {
if (plan.getJoinCond().getModifier().equals(Modifier.PUBLIC)) {
return false;
}
if (!plan.getJoinCond().getCondition().getIn(0).getModifier().equals(Modifier.PUBLIC)) {
throw new OpenHuFuException(ErrorCode.RANGE_JOIN_LEFT_TABLE_NOT_PUBLIC);
}
return plan.getJoinCond().getCondition().getStr().equals("knn");
}

private DataSet privacySpatialJoin(BinaryPlan plan, boolean isDistanceJoin) {
return privacySpatialJoin(plan, isDistanceJoin, false);
}

private DataSet privacySpatialJoin(BinaryPlan plan, boolean isDistanceJoin, boolean isUsingKNNFunc) {
DataSet left = ownerSideQuery(plan.getChildren().get(0));
DataSetIterator leftIter = left.getIterator();
List<ArrayRow> arrayRows = new ArrayList<>();

boolean containsLeftKey = false;
int leftKey = -1;
for (OpenHuFuPlan.Expression expression: plan.getSelectExps()) {
if (expression.getOpType().equals(OpenHuFuPlan.OperatorType.REF)
&& expression.getI32() == plan.getJoinCond().getCondition().getIn(0).getI32()) {
containsLeftKey = true;
}
}
if (!containsLeftKey) {
for (int i = 0; i < plan.getChildren().get(0).getSelectExps().size(); i++) {
if (plan.getChildren().get(0).getSelectExps().get(i).getI32()
== plan.getJoinCond().getCondition().getIn(0).getI32()) {
leftKey = i;
break;
}
}
}

boolean containsRightKey = false;
int rightKey = -1;
for (OpenHuFuPlan.Expression expression: plan.getSelectExps()) {
if (expression.getOpType().equals(OpenHuFuPlan.OperatorType.REF)
&& expression.getI32() == plan.getJoinCond().getCondition().getIn(1).getI32()) {
containsRightKey = true;
}
}
if (!containsRightKey) {
for (int i = 0; i < plan.getChildren().get(1).getSelectExps().size(); i++) {
if (plan.getChildren().get(1).getSelectExps().get(i).getI32()
== plan.getJoinCond().getCondition().getIn(1).getI32() - plan.getChildren().get(0).getSelectExps().size()) {
rightKey = i;
break;
}
}
}
while (leftIter.next()) {
int leftRef = plan.getJoinCond().getCondition().getIn(0).getI32();
DataSet rightDataSet;
if (isDistanceJoin) {
rightDataSet = ownerSideQuery(DistanceJoin
.generateDistanceQueryPlan(plan, leftIter.get(leftRef).toString(), rightKey));
}
else {
rightDataSet = privacyKNN((UnaryPlan) KNNJoin.generateKNNQueryPlan(plan, leftIter.get(leftRef).toString(), rightKey), isUsingKNNFunc);
}
DataSetIterator rightIter = rightDataSet.getIterator();
while (rightIter.next()) {
arrayRows.add(ArrayRow.merge(leftIter, rightIter, leftKey));
LOG.info(ArrayRow.merge(leftIter, rightIter, leftKey).toString());
}
}
Schema schema;
schema = ExpressionUtils.createSchema(plan.getSelectExps());
LOG.info(schema.toString());
return new ArrayDataSet(schema, arrayRows);
}

@Override
public DataSet implement(Plan plan) {
LOG.info(plan.toString());
boolean isUsingKNNFuc = plan instanceof LeafPlan
&& !plan.getWhereExps().isEmpty()
&& plan.getWhereExps().get(0).getOpType().equals(OpenHuFuPlan.OperatorType.SCALAR_FUNC)
&& plan.getWhereExps().get(0).getStr().equals("knn");
if (isUsingKNNFuc) {
plan = KNNConverter.convertKNN((LeafPlan) plan);
}
if (isMultiParty(plan)) {
if (plan instanceof BinaryPlan && isPrivacyRangeJoin((BinaryPlan) plan)) {
return privacySpatialJoin((BinaryPlan) plan, true);
}
if (plan instanceof BinaryPlan && isPrivacyKNNJoin((BinaryPlan) plan)) {
return privacySpatialJoin((BinaryPlan) plan, false, isUsingKNNFuc);
}
if (plan instanceof UnaryPlan && isMultiPartySecureKNN((UnaryPlan) plan)) {
return privacyKNN((UnaryPlan) plan);
return privacyKNN((UnaryPlan) plan, isUsingKNNFuc);
}
// implement on owner side
DataSet dataset = ownerSideQuery(plan);
Expand All @@ -118,7 +224,7 @@ public DataSet implement(Plan plan) {
}
}

private DataSet privacyKNN(UnaryPlan plan) {
private DataSet privacyKNN(UnaryPlan plan, boolean isUsingKNNFunc) {
LOG.info("Using binary-search KNN.");
boolean USE_DP = false;
int k = plan.getFetch();
Expand Down Expand Up @@ -160,11 +266,12 @@ private DataSet privacyKNN(UnaryPlan plan) {
right = mid;
} else {
loop++;
return kNNCircleRangeQuery(plan, mid);
DataSet dataSet = ArrayDataSet.materialize(kNNCircleRangeQuery(plan, mid, isUsingKNNFunc));
return dataSet;
}
loop++;
}
return kNNCircleRangeQuery(plan, right);
return kNNCircleRangeQuery(plan, right, isUsingKNNFunc);
}
private double kNNRadiusQuery(UnaryPlan plan) {
//todo -sjz
Expand All @@ -191,8 +298,8 @@ private long privacyCompare(UnaryPlan plan, double range, long k) {
long res = (long) dataSet.get(0);
return res - k;
}
private DataSet kNNCircleRangeQuery(UnaryPlan plan, double range) {
return ownerSideQuery(BinarySearchKNN.generateKNNCircleRangeQueryPlan(plan, range));
private DataSet kNNCircleRangeQuery(UnaryPlan plan, double range, boolean isUsingKNNFunc) {
return ownerSideQuery(BinarySearchKNN.generateKNNCircleRangeQueryPlan(plan, range, isUsingKNNFunc));
}
private boolean isMultiPartySecureKNN(UnaryPlan unary) {
LeafPlan leaf = (LeafPlan) unary.getChildren().get(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.hufudb.openhufu.core.implementor.spatial.join;

import com.google.common.collect.ImmutableList;
import com.hufudb.openhufu.data.storage.utils.GeometryUtils;
import com.hufudb.openhufu.expression.ExpressionFactory;
import com.hufudb.openhufu.plan.BinaryPlan;
import com.hufudb.openhufu.plan.LeafPlan;
import com.hufudb.openhufu.plan.Plan;
import com.hufudb.openhufu.proto.OpenHuFuData;
import com.hufudb.openhufu.proto.OpenHuFuPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class DistanceJoin {
static final Logger LOG = LoggerFactory.getLogger(DistanceJoin.class);

public static Plan generateDistanceQueryPlan(BinaryPlan binaryPlan, String point, int rightKey) {
LOG.info(String.valueOf(rightKey));
LeafPlan originalLeaf = (LeafPlan) binaryPlan.getChildren().get(1);
LeafPlan leafPlan = new LeafPlan();
leafPlan.setTableName(originalLeaf.getTableName());
if (rightKey == -1) {
leafPlan.setSelectExps(originalLeaf.getSelectExps());
}
else {
List<OpenHuFuPlan.Expression> selects = new ArrayList<>();
int i = 0;
for (OpenHuFuPlan.Expression expression: originalLeaf.getSelectExps()) {
if (i != rightKey) {
selects.add(expression);
}
i++;
}
leafPlan.setSelectExps(selects);
}
OpenHuFuPlan.Expression oldDwithin = binaryPlan.getJoinCond().getCondition();
OpenHuFuPlan.Expression left = ExpressionFactory.createLiteral(OpenHuFuData.ColumnType.GEOMETRY, GeometryUtils.fromString(point));
OpenHuFuPlan.Expression right = ExpressionFactory.createInputRef(originalLeaf.getSelectExps().get(oldDwithin.getIn(1).getI32()
- binaryPlan.getChildren().get(0).getSelectExps().size()).getI32(), oldDwithin.getIn(1).getOutType(),
oldDwithin.getIn(1).getModifier());
OpenHuFuPlan.Expression dwithin = ExpressionFactory.createScalarFunc(OpenHuFuData.ColumnType.BOOLEAN, "dwithin",
ImmutableList.of(left, right, oldDwithin.getIn(2)));
LOG.info("rewriting DistanceQueryPlan");
List<OpenHuFuPlan.Expression> whereExps = ImmutableList.of(dwithin);
leafPlan.setWhereExps(whereExps);
leafPlan.setOrders(originalLeaf.getOrders());
LOG.info(leafPlan.toString());
return leafPlan;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.hufudb.openhufu.core.implementor.spatial.join;

import com.google.common.collect.ImmutableList;
import com.hufudb.openhufu.data.storage.utils.GeometryUtils;
import com.hufudb.openhufu.expression.ExpressionFactory;
import com.hufudb.openhufu.plan.BinaryPlan;
import com.hufudb.openhufu.plan.LeafPlan;
import com.hufudb.openhufu.plan.Plan;
import com.hufudb.openhufu.plan.UnaryPlan;
import com.hufudb.openhufu.proto.OpenHuFuData;
import com.hufudb.openhufu.proto.OpenHuFuPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class KNNJoin {
static final Logger LOG = LoggerFactory.getLogger(KNNJoin.class);

public static Plan generateKNNQueryPlan(BinaryPlan binaryPlan, String point, int rightKey) {
LOG.info(String.valueOf(rightKey));
LeafPlan originalLeaf = (LeafPlan) binaryPlan.getChildren().get(1);
LeafPlan leafPlan = new LeafPlan();
leafPlan.setTableName(originalLeaf.getTableName());
if (rightKey == -1) {
leafPlan.setSelectExps(originalLeaf.getSelectExps());
}
else {
List<OpenHuFuPlan.Expression> selects = new ArrayList<>();
int i = 0;
for (OpenHuFuPlan.Expression expression: originalLeaf.getSelectExps()) {
if (i != rightKey) {
selects.add(expression);
}
i++;
}
leafPlan.setSelectExps(selects);
}
OpenHuFuPlan.Expression oldDwithin = binaryPlan.getJoinCond().getCondition();
OpenHuFuPlan.Expression left = ExpressionFactory.createLiteral(OpenHuFuData.ColumnType.GEOMETRY, GeometryUtils.fromString(point));
OpenHuFuPlan.Expression right = ExpressionFactory.createInputRef(originalLeaf.getSelectExps().get(oldDwithin.getIn(1).getI32()
- binaryPlan.getChildren().get(0).getSelectExps().size()).getI32(), oldDwithin.getIn(1).getOutType(),
oldDwithin.getIn(1).getModifier());
OpenHuFuPlan.Expression dwithin = ExpressionFactory.createScalarFunc(OpenHuFuData.ColumnType.BOOLEAN, "knn",
ImmutableList.of(left, right, oldDwithin.getIn(2)));
LOG.info("rewriting DistanceQueryPlan");
List<OpenHuFuPlan.Expression> whereExps = ImmutableList.of(dwithin);
leafPlan.setWhereExps(whereExps);
leafPlan.setOrders(originalLeaf.getOrders());
LOG.info(leafPlan.toString());
return convertKNN(leafPlan);
}

public static UnaryPlan convertKNN(LeafPlan plan) {
LOG.info("converting KNN");
LeafPlan leafPlan = new LeafPlan();
leafPlan.setTableName(plan.getTableName());
List<OpenHuFuPlan.Expression> selects1 = new ArrayList<>(plan.getSelectExps());
OpenHuFuPlan.Expression knn = plan.getWhereExps().get(0);
OpenHuFuPlan.Expression distance = ExpressionFactory
.createScalarFunc(OpenHuFuData.ColumnType.DOUBLE, "distance",
ImmutableList.of(knn.getIn(0), knn.getIn(1)));
selects1.add(distance);
leafPlan.setSelectExps(selects1);
OpenHuFuPlan.Collation order1 = OpenHuFuPlan.Collation.newBuilder().setRef(plan.getSelectExps().size())
.setDirection(OpenHuFuPlan.Direction.ASC).build();
leafPlan.setOrders(ImmutableList.of(order1));
leafPlan.setFetch((int) knn.getIn(2).getF64());

UnaryPlan unaryPlan = new UnaryPlan(leafPlan);
List<OpenHuFuPlan.Expression> selects2 = new ArrayList<>(plan.getSelectExps());
selects2.add(ExpressionFactory.createInputRef(order1.getRef(), OpenHuFuData.ColumnType.DOUBLE, OpenHuFuData.Modifier.PROTECTED));
unaryPlan.setSelectExps(selects2);
OpenHuFuPlan.Collation order2 = OpenHuFuPlan.Collation.newBuilder().setRef(plan.getSelectExps().size())
.setDirection(OpenHuFuPlan.Direction.ASC).build();
unaryPlan.setOrders(ImmutableList.of(order2));
unaryPlan.setFetch(leafPlan.getFetch());
LOG.info(unaryPlan.toString());
return unaryPlan;
}
}
Loading

0 comments on commit 6d4701b

Please sign in to comment.