diff --git a/data/pom.xml b/data/pom.xml index 9b3d8ed8..6ccecce3 100644 --- a/data/pom.xml +++ b/data/pom.xml @@ -46,6 +46,17 @@ org.locationtech.jts jts-core + + org.apache.commons + commons-math3 + compile + + + com.hufudb.openhufu + openhufu-common + ${project.version} + compile + diff --git a/data/src/main/java/com/hufudb/openhufu/data/storage/ArrayDataSet.java b/data/src/main/java/com/hufudb/openhufu/data/storage/ArrayDataSet.java index 81fcd7ed..5b458970 100644 --- a/data/src/main/java/com/hufudb/openhufu/data/storage/ArrayDataSet.java +++ b/data/src/main/java/com/hufudb/openhufu/data/storage/ArrayDataSet.java @@ -9,7 +9,7 @@ public class ArrayDataSet implements MaterializedDataSet { final List rows; final int rowCount; - ArrayDataSet(Schema schema, List rows) { + public ArrayDataSet(Schema schema, List rows) { this.schema = schema; this.rows = rows; this.rowCount = rows.size(); @@ -50,6 +50,10 @@ public int rowCount() { return rowCount; } + public List getRows() { + return rows; + } + class Iterator implements DataSetIterator { int pointer; diff --git a/data/src/main/java/com/hufudb/openhufu/data/storage/RandomDataSet.java b/data/src/main/java/com/hufudb/openhufu/data/storage/RandomDataSet.java new file mode 100644 index 00000000..9012f320 --- /dev/null +++ b/data/src/main/java/com/hufudb/openhufu/data/storage/RandomDataSet.java @@ -0,0 +1,171 @@ +package com.hufudb.openhufu.data.storage; + +import com.hufudb.openhufu.common.exception.ErrorCode; +import com.hufudb.openhufu.common.exception.OpenHuFuException; +import com.hufudb.openhufu.data.schema.Schema; +import com.hufudb.openhufu.proto.OpenHuFuData; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.math3.distribution.LaplaceDistribution; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; + +import java.util.*; +import java.util.function.Function; + +public class RandomDataSet { + public final static GeometryFactory geoFactory = new GeometryFactory(); + private final static double RANDOM_SET_SCALE = 0.5; + private final static double EPS = 1.0; + public final static int RANDOM_SET_OFFSET = 10; + private final static LaplaceDistribution lap = new LaplaceDistribution(0, 1 / EPS); + private final static Random random = new Random(); + private final Schema schema; + private final DataSet source; + private final List originRows; + private final int originSize; + private final int resultSize; + private final List randomRows; + private final Map> randomRowMap; + + public RandomDataSet(DataSet source) { + this.schema = source.getSchema(); + this.source = source; + this.originRows = ArrayDataSet.materialize(source).rows; + this.originSize = originRows.size(); + this.randomRows = new ArrayList<>(); + this.randomRowMap = new HashMap<>(); + int size = (int) Math.ceil((double) originSize * RANDOM_SET_SCALE + lap.sample()); + this.resultSize = size > 0 ? size : (int) Math.abs(lap.sample()); + if (originSize == 0) { + init(this::getRandomValue); + } else { + init(this::getRandomValueFromData); + } + this.mix(); + } + + private void init(Function randomFunc) { + final int headerSize = schema.size(); + if (headerSize == 0) { + return; + } + for (int i = 0; i < resultSize; ++i) { + Object[] objects = new Object[headerSize]; + Object key = randomFunc.apply(0); + objects[0] = key; + for (int j = 1; j < headerSize; ++j) { + Object value = randomFunc.apply(j); + objects[j] = value; + } + ArrayRow row = new ArrayRow(objects); + randomRows.add(row); + recordRandomRow(key, row); + } + } + + private void recordRandomRow(Object key, ArrayRow value) { + if (randomRowMap.containsKey(key)) { + randomRowMap.get(key).add(value); + } else { + randomRowMap.put(key, new LinkedList<>(Arrays.asList(value))); + } + } + + private void mix() { + //todo index insert for ArrayList may be slow + for (ArrayRow row : originRows) { + int idx = (int) Math.ceil(random.nextDouble() * randomRows.size()); + randomRows.add(idx, row); + } + } + + public ArrayDataSet getRandomSet() { + return new ArrayDataSet(schema, randomRows); + } + + public ArrayDataSet removeRandom(DataSet dataSet) { + List newRows = new ArrayList<>(); + for (ArrayRow row : ArrayDataSet.materialize(dataSet).rows) { + Object key = row.get(0); + List rows = randomRowMap.get(key); + if (rows == null || rows.isEmpty()) { + newRows.add(row); + continue; + } + int idx = rows.indexOf(row); + if (idx == -1) { + newRows.add(row); + } else { + rows.remove(idx); + } + } + return new ArrayDataSet(schema, newRows); + } + + private Object getRandomValueFromData(int columnIndex) { + OpenHuFuData.ColumnType type = schema.getType((columnIndex)); + int r = (int) (random.nextDouble() * originSize); + switch (type) { + case BYTE: + return (byte) originRows.get(r).get(columnIndex) + (byte) lap.sample(); + case SHORT: + return (short) originRows.get(r).get(columnIndex) + (short) lap.sample(); + case INT: + return (int) originRows.get(r).get(columnIndex) + (int) lap.sample(); + case LONG: + case DATE: + case TIME: + case TIMESTAMP: + return (long) originRows.get(r).get(columnIndex) + (long) lap.sample(); + case FLOAT: + return (float) originRows.get(r).get(columnIndex) + (float) lap.sample(); + case DOUBLE: + return (double) originRows.get(r).get(columnIndex) + lap.sample(); + case BOOLEAN: + return lap.sample() > 0.0; + case GEOMETRY: + Geometry geometry = (Geometry) originRows.get(r).get(columnIndex); + if (geometry instanceof Point) { + Point p = (Point) geometry; + return geoFactory.createPoint(new Coordinate(p.getX() + lap.sample(), p.getX() + lap.sample())); + } else { + throw new OpenHuFuException(ErrorCode.DATA_TYPE_NOT_SUPPORT, type); + } + case STRING: + return originRows.get(r).get(columnIndex); + default: + throw new OpenHuFuException(ErrorCode.DATA_TYPE_NOT_SUPPORT, type); + } + } + + private Object getRandomValue(int columnIndex) { + OpenHuFuData.ColumnType type = schema.getType((columnIndex)); + switch (type) { + case BYTE: + return (byte) lap.sample(); + case SHORT: + return (short) lap.sample(); + case INT: + return (int) lap.sample(); + case LONG: + case DATE: + case TIME: + case TIMESTAMP: + return (long) lap.sample(); + case FLOAT: + return (float) lap.sample(); + case DOUBLE: + return lap.sample(); + case BOOLEAN: + return lap.sample() > 0.0; + case GEOMETRY: + return geoFactory.createPoint(new Coordinate(lap.sample(), lap.sample())); + case STRING: + return RandomStringUtils.randomAlphanumeric(RANDOM_SET_OFFSET); + default: + throw new OpenHuFuException(ErrorCode.DATA_TYPE_NOT_SUPPORT, type); + } + } +} diff --git a/data/src/test/java/com/hufudb/openhufu/data/storage/DataSetTest.java b/data/src/test/java/com/hufudb/openhufu/data/storage/DataSetTest.java index 56508210..3dc2ff79 100644 --- a/data/src/test/java/com/hufudb/openhufu/data/storage/DataSetTest.java +++ b/data/src/test/java/com/hufudb/openhufu/data/storage/DataSetTest.java @@ -11,6 +11,7 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; +import java.time.Year; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -533,4 +534,47 @@ public void testDataSetWithPoint() { } assertFalse(it.next()); } + + @Test + public void testRandomDataSet() { + Schema schema = Schema.newBuilder().add("A", ColumnType.GEOMETRY, Modifier.PUBLIC).build(); + Random random = new Random(); + List ps = new ArrayList<>(); + ProtoDataSet.Builder dBuilder = ProtoDataSet.newBuilder(schema); + for (int i = 0; i < 10; ++i) { + Geometry p = GeometryUtils.fromString(String.format("POINT(%f %f)", random.nextDouble(), random.nextDouble())); + ArrayRow.Builder rBuilder = ArrayRow.newBuilder(1); + rBuilder.set(0, p); + dBuilder.addRow(rBuilder.build()); + ps.add(p); + } + ProtoDataSet dataset = dBuilder.build(); + RandomDataSet randomDataSet = new RandomDataSet(dataset); + DataSet newDataSet = randomDataSet.getRandomSet(); + DataSetIterator it = newDataSet.getIterator(); + for (int i = 0; i < 10; ++i) { + assertTrue(it.next()); + } + assertTrue(it.next()); + + it = randomDataSet.removeRandom(newDataSet).getIterator(); + for (int i = 0; i < 10; ++i) { + assertTrue(it.next()); + boolean has = false; + for (int j = 0; j < 10; ++j) { + if (ps.get(j).equals(it.get(0))) { + has = true; + break; + } + } + assertTrue(has); + } + assertFalse(it.next()); + } + void printAll(DataSet dataSet) { + DataSetIterator it = dataSet.getIterator(); + while (it.next()) { + System.out.println(it.get(0)); + } + } } diff --git a/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java b/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java index f79c506d..263c6c8a 100644 --- a/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java +++ b/mpc/src/main/java/com/hufudb/openhufu/mpc/ProtocolType.java @@ -16,8 +16,8 @@ public enum ProtocolType { GMW("GMW", 100, true), SS("SS", 101, true), HASH_PSI("PSI", 200, true), - ABY("ABY", 300, true); - + ABY("ABY", 300, true), + SECRET_UNION("SECRET_UNION", 400, true); private static final ImmutableMap MAP; static { diff --git a/mpc/src/main/java/com/hufudb/openhufu/mpc/union/SecretUnion.java b/mpc/src/main/java/com/hufudb/openhufu/mpc/union/SecretUnion.java new file mode 100644 index 00000000..77f32813 --- /dev/null +++ b/mpc/src/main/java/com/hufudb/openhufu/mpc/union/SecretUnion.java @@ -0,0 +1,187 @@ +package com.hufudb.openhufu.mpc.union; + +import com.google.common.collect.ImmutableList; +import com.hufudb.openhufu.data.schema.Schema; +import com.hufudb.openhufu.data.storage.*; +import com.hufudb.openhufu.mpc.ProtocolException; +import com.hufudb.openhufu.mpc.ProtocolType; +import com.hufudb.openhufu.mpc.RpcProtocolExecutor; +import com.hufudb.openhufu.rpc.Rpc; +import com.hufudb.openhufu.rpc.utils.DataPacket; +import com.hufudb.openhufu.rpc.utils.DataPacketHeader; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +//Pawel Jurczyk and Li Xiong. 2011. Information Sharing across Private Databases: Secure Union Revisited. In SocialCom/PASSAT. 996–1003. +public class SecretUnion extends RpcProtocolExecutor { + + private List parties; + private long taskId; + + public SecretUnion(Rpc rpc) { + super(rpc, ProtocolType.SECRET_UNION); + } + + private void printAll(DataSet dataSet) { + DataSetIterator it = dataSet.getIterator(); + while (it.next()) { + System.out.println(it.get(0)); + } + } + + private DataSet leaderProcedure(DataSet dataSet) { + //phase 1 + RandomDataSet randomDataSet = new RandomDataSet(dataSet); + sendRowsToSuccessor(randomDataSet.getRandomSet()); + ArrayDataSet receives = receiveRowsFromPredecessor(dataSet.getSchema()); + + //phase 2 + sendRowsToSuccessor(randomDataSet.removeRandom(receives)); + return receiveRowsFromPredecessor(dataSet.getSchema()); + } + + private void followerProcedure(DataSet dataSet) { + //phase 1 + RandomDataSet randomDataSet = new RandomDataSet(dataSet); + ArrayDataSet receives = receiveRowsFromPredecessor(dataSet.getSchema()); + sendRowsToSuccessor(mergeDataSet(randomDataSet.getRandomSet(), receives)); + + //phase 2 + ArrayDataSet receives2 = receiveRowsFromPredecessor(dataSet.getSchema()); + sendRowsToSuccessor(randomDataSet.removeRandom(receives2)); + } + + private void sendRowsToSuccessor(ArrayDataSet dataSet) { + LOG.info("sending rows from {} to {}", ownId, getSuccessorID()); + final DataPacketHeader header = + new DataPacketHeader(taskId, getProtocolTypeId(), 0, ownId, getSuccessorID()); + rpc.send(DataPacket.fromByteArrayList(header, + dataSet2BytesList(dataSet))); + } + + private ArrayDataSet receiveRowsFromPredecessor(Schema schema) { + final DataPacketHeader expect = + new DataPacketHeader(taskId, getProtocolTypeId(), 0, getPredecessorID(), ownId); + DataPacket packet = rpc.receive(expect); + return bytesList2DataSet(packet.getPayload(), schema); + } + + private ArrayDataSet mergeDataSet(ArrayDataSet left, ArrayDataSet right) { + ArrayList arrayRows = new ArrayList<>(); + arrayRows.addAll(left.getRows()); + arrayRows.addAll(right.getRows()); + return new ArrayDataSet(left.getSchema(), arrayRows); + } + + private List dataSet2BytesList(ArrayDataSet dataSet) { + byte[] dd = objToByteArray(dataSet.getRows()); + return ImmutableList.of(dd); + } + + private ArrayDataSet bytesList2DataSet(List bytesList, Schema schema) { + return new ArrayDataSet(schema, (List) byteArrayToObj(bytesList.get(0))); + } + + /** + * 对象转Byte数组 + * + * @param obj + * @return + */ + public byte[] objToByteArray(Object obj) { + byte[] bytes = null; + ByteArrayOutputStream byteArrayOutputStream = null; + ObjectOutputStream objectOutputStream = null; + try { + byteArrayOutputStream = new ByteArrayOutputStream(); + objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(obj); + objectOutputStream.flush(); + bytes = byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + System.err.println("objectToByteArray failed, " + e); + } finally { + if (objectOutputStream != null) { + try { + objectOutputStream.close(); + } catch (IOException e) { + System.err.println("close objectOutputStream failed, " + e); + } + } + if (byteArrayOutputStream != null) { + try { + byteArrayOutputStream.close(); + } catch (IOException e) { + System.err.println("close byteArrayOutputStream failed, " + e); + } + } + } + return bytes; + } + + /** + * Byte数组转对象 + * + * @param bytes + * @return + */ + public Object byteArrayToObj(byte[] bytes) { + Object obj = null; + ByteArrayInputStream byteArrayInputStream = null; + ObjectInputStream objectInputStream = null; + try { + byteArrayInputStream = new ByteArrayInputStream(bytes); + objectInputStream = new ObjectInputStream(byteArrayInputStream); + obj = objectInputStream.readObject(); + } catch (Exception e) { + System.err.println("byteArrayToObject failed, " + e); + } finally { + if (byteArrayInputStream != null) { + try { + byteArrayInputStream.close(); + } catch (IOException e) { + System.err.println("close byteArrayInputStream failed, " + e); + } + } + if (objectInputStream != null) { + try { + objectInputStream.close(); + } catch (IOException e) { + System.err.println("close objectInputStream failed, " + e); + } + } + } + return obj; + } + + private int getSuccessorID() { + int index = parties.indexOf(ownId); + int next = (index + 1) % parties.size(); + return parties.get(next); + } + + private int getPredecessorID() { + int index = parties.indexOf(ownId); + int previous = (index + parties.size() - 1) % parties.size(); + return parties.get(previous); + } + + /** + * @param args[0] DataSet inputdata + */ + @Override + public Object run(long taskId, List parties, Object... args) throws ProtocolException { + // todo: check this + DataSet localDataSet = (DataSet) args[0]; + this.taskId = taskId; + this.parties = parties; + boolean isLeader = ownId == parties.get(0); + if (isLeader) { + return leaderProcedure(localDataSet); + } else { + followerProcedure(localDataSet); + } + return null; + } +} diff --git a/mpc/src/test/java/com/hufudb/openhufu/mpc/secretUnion/SecretUnionTest.java b/mpc/src/test/java/com/hufudb/openhufu/mpc/secretUnion/SecretUnionTest.java new file mode 100644 index 00000000..ef2e23e6 --- /dev/null +++ b/mpc/src/test/java/com/hufudb/openhufu/mpc/secretUnion/SecretUnionTest.java @@ -0,0 +1,182 @@ +package com.hufudb.openhufu.mpc.secretUnion; + +import com.google.common.collect.ImmutableList; +import com.hufudb.openhufu.data.schema.Schema; +import com.hufudb.openhufu.data.storage.ArrayDataSet; +import com.hufudb.openhufu.data.storage.ArrayRow; +import com.hufudb.openhufu.data.storage.DataSet; +import com.hufudb.openhufu.data.storage.DataSetIterator; +import com.hufudb.openhufu.mpc.union.SecretUnion; +import com.hufudb.openhufu.proto.OpenHuFuData.ColumnType; +import com.hufudb.openhufu.rpc.Party; +import com.hufudb.openhufu.rpc.grpc.OpenHuFuOwnerInfo; +import com.hufudb.openhufu.rpc.grpc.OpenHuFuRpc; +import com.hufudb.openhufu.rpc.grpc.OpenHuFuRpcManager; +import io.grpc.Channel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +@RunWith(JUnit4.class) +public class SecretUnionTest { + public final static GeometryFactory geoFactory = new GeometryFactory(); + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + OpenHuFuRpcManager manager; + ExecutorService threadpool = Executors.newFixedThreadPool(5); + + @Before + public void setUp() throws IOException { + String ownerName0 = InProcessServerBuilder.generateName(); + String ownerName1 = InProcessServerBuilder.generateName(); + String ownerName2 = InProcessServerBuilder.generateName(); + String ownerName3 = InProcessServerBuilder.generateName(); + String ownerName4 = InProcessServerBuilder.generateName(); + Party owner0 = new OpenHuFuOwnerInfo(0, ownerName0); + Party owner1 = new OpenHuFuOwnerInfo(1, ownerName1); + Party owner2 = new OpenHuFuOwnerInfo(2, ownerName2); + Party owner3 = new OpenHuFuOwnerInfo(3, ownerName3); + Party owner4 = new OpenHuFuOwnerInfo(4, ownerName4); + List parties = ImmutableList.of(owner0, owner1, owner2, owner3, owner4); + List channels = Arrays.asList( + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName0).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName1).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName2).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName3).directExecutor().build()), + grpcCleanup.register(InProcessChannelBuilder.forName(ownerName4).directExecutor().build())); + manager = new OpenHuFuRpcManager(parties, channels); + OpenHuFuRpc rpc0 = (OpenHuFuRpc) manager.getRpc(0); + OpenHuFuRpc rpc1 = (OpenHuFuRpc) manager.getRpc(1); + OpenHuFuRpc rpc2 = (OpenHuFuRpc) manager.getRpc(2); + OpenHuFuRpc rpc3 = (OpenHuFuRpc) manager.getRpc(3); + OpenHuFuRpc rpc4 = (OpenHuFuRpc) manager.getRpc(4); + rpc0.connect(); + rpc1.connect(); + rpc2.connect(); + rpc3.connect(); + rpc4.connect(); + Server server0 = InProcessServerBuilder.forName(ownerName0).directExecutor() + .addService(rpc0.getgRpcService()).build().start(); + Server server1 = InProcessServerBuilder.forName(ownerName1).directExecutor() + .addService(rpc1.getgRpcService()).build().start(); + Server server2 = InProcessServerBuilder.forName(ownerName2).directExecutor() + .addService(rpc2.getgRpcService()).build().start(); + Server server3 = InProcessServerBuilder.forName(ownerName3).directExecutor() + .addService(rpc3.getgRpcService()).build().start(); + Server server4 = InProcessServerBuilder.forName(ownerName4).directExecutor() + .addService(rpc4.getgRpcService()).build().start(); + grpcCleanup.register(server0); + grpcCleanup.register(server1); + grpcCleanup.register(server2); + grpcCleanup.register(server3); + grpcCleanup.register(server4); + } + + void testUnion(long taskId, List executors, List dataSets, List> ans) + throws InterruptedException, ExecutionException { + List parties = executors.stream().map(e -> e.getOwnId()).collect(Collectors.toList()); + List> futures = new ArrayList<>(); + for (int i = 0; i < executors.size(); ++i) { + final SecretUnion s = executors.get(i); + final DataSet dataSet = dataSets.get(i); + futures.add(threadpool.submit(new Callable() { + @Override + public Object call() throws Exception { + return s.run(taskId, parties, dataSet); + } + })); + } + List> resList = new ArrayList<>(); + DataSet res = (DataSet) futures.get(0).get(); + DataSetIterator it = res.getIterator(); + while (it.next()) { + resList.add(ImmutableList.of(it.get(0), it.get(1), it.get(2))); + } + sort(resList); + assertEquals(resList.size(), ans.size()); + for (int i = 0; i < resList.size(); i++) { + assertEquals(resList.get(i).get(0), ans.get(i).get(0)); + assertEquals(resList.get(i).get(1), ans.get(i).get(1)); + assertEquals(resList.get(i).get(2), ans.get(i).get(2)); + } + } + + private void sort(List> lists) { + lists.sort(new Comparator>() { + @Override + public int compare(List o1, List o2) { + Double double1 = (Double) o1.get(0); + Double double2 = (Double) o2.get(0); + Integer int1 = (Integer) o1.get(1); + Integer int2 = (Integer) o2.get(1); + Point point1 = (Point) o1.get(2); + Point point2 = (Point) o2.get(2); + if (double1 < double2) { + return -1; + } + else if (double1 > double2) { + return 1; + } + if (!Objects.equals(int1, int2)) { + return int1 - int2; + } + return point1.compareTo(point2); + } + }); + } + + @Ignore + @Test + public void testSecretUnion() throws InterruptedException, ExecutionException { + Random random = new Random(); + OpenHuFuRpc rpc0 = (OpenHuFuRpc) manager.getRpc(0); + OpenHuFuRpc rpc1 = (OpenHuFuRpc) manager.getRpc(1); + OpenHuFuRpc rpc2 = (OpenHuFuRpc) manager.getRpc(2); + OpenHuFuRpc rpc3 = (OpenHuFuRpc) manager.getRpc(3); + OpenHuFuRpc rpc4 = (OpenHuFuRpc) manager.getRpc(4); + List rpcs = ImmutableList.of(rpc0, rpc1, rpc2, rpc3, rpc4); + List executors = + rpcs.stream().map(rpc -> new SecretUnion(rpc)).collect(Collectors.toList()); + List dataSets = new ArrayList<>(); + List> ans = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + List arrayRows = new ArrayList<>(); + for (int j = 0; j < random.nextInt(20); j++) { + ArrayRow.Builder builder = ArrayRow.newBuilder(3); + Double randDouble = random.nextDouble(); + Integer randInt = random.nextInt(); + Point randPoint = geoFactory.createPoint(new Coordinate(random.nextDouble(), random.nextDouble())); + builder.set(0, randDouble); + builder.set(1, randInt); + builder.set(2, randPoint); + ans.add(ImmutableList.of(randDouble, randInt, randPoint)); + arrayRows.add(builder.build()); + } + dataSets.add(new ArrayDataSet(Schema.newBuilder() + .add("DOUBLE", ColumnType.DOUBLE) + .add("INT", ColumnType.INT) + .add("POINT", ColumnType.GEOMETRY) + .build(), arrayRows)); + } + sort(ans); + testUnion(0, executors, dataSets, ans); + } +} diff --git a/pom.xml b/pom.xml index 80104eb1..6f68b37e 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,13 @@ 1.7.32 + + org.apache.commons + commons-math3 + 3.6.1 + compile + + junit