diff --git a/.gitignore b/.gitignore index 15029804..190391e3 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,7 @@ cmake-build-debug *.exe *.out *.app +**/*.class build/** GPATH @@ -43,6 +44,7 @@ GRTAGS GTAGS deps deps/** +**/bin d1-encrypted d2-encrypted @@ -137,6 +139,7 @@ ENV/ *.crt */bin */logs +*/bin *.csv */*/logs */start.out diff --git a/java/ppc-pir-services/build.gradle b/java/ppc-pir-services/build.gradle index ce80f537..095c7cd8 100644 --- a/java/ppc-pir-services/build.gradle +++ b/java/ppc-pir-services/build.gradle @@ -26,8 +26,16 @@ dependencies { implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.2.2' implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.19.0' implementation 'commons-codec:commons-codec:1.15' + implementation("org.apache.hadoop:hadoop-common:2.7.5"){ + exclude group: 'javax.servlet', module: 'servlet-api' + } + implementation("org.apache.hadoop:hadoop-client:2.7.5"){ + exclude group: 'javax.servlet', module: 'servlet-api' + } + implementation 'com.opencsv:opencsv:5.9' implementation 'com.squareup.okhttp3:okhttp:4.9.1' implementation project(":ppc-pir-sdk") + implementation 'com.google.code.gson:gson:2.8.8' compileOnly 'org.projectlombok:lombok:1.18.24' annotationProcessor 'org.projectlombok:lombok:1.18.24' diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/demo/Demo.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/demo/Demo.java index f88a121b..fd1ad363 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/demo/Demo.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/demo/Demo.java @@ -19,14 +19,12 @@ public static void main(String[] args) throws WedprException { private static void testIdFilter() throws WedprException { PirJobParam pirJobParam = new PirJobParam(); - pirJobParam.setJobId("j-12345678912"); + pirJobParam.setServiceId("s-123456789"); + pirJobParam.setAccessKeyId("a-123456789"); pirJobParam.setJobType(ParamEnum.JobType.SearchValue.getValue()); pirJobParam.setJobAlgorithmType(ParamEnum.AlgorithmType.idFilter.getValue()); - pirJobParam.setParticipateAgencyId("1002"); - pirJobParam.setDatasetId("t_login_token"); - pirJobParam.setJobCreator("1001"); - pirJobParam.setGatewayUrl("http://localhost:5831/api/pir/v3/server"); - pirJobParam.setSearchIdList(Arrays.asList("1", "20", "3", "456")); + pirJobParam.setGatewayUrl("localhost:5831"); + pirJobParam.setSearchIdList(Arrays.asList("1", "2", "3")); PirClient pirClient = new PirClient(pirJobParam); PirResultResponse pirResultResponse = pirClient.executePirJob(); @@ -35,14 +33,12 @@ private static void testIdFilter() throws WedprException { private static void testIdObfuscation() throws WedprException { PirJobParam pirJobParam = new PirJobParam(); - pirJobParam.setJobId("j-12345678912"); + pirJobParam.setServiceId("s-123456789"); + pirJobParam.setAccessKeyId("a-123456789"); pirJobParam.setJobType(ParamEnum.JobType.SearchValue.getValue()); pirJobParam.setJobAlgorithmType(ParamEnum.AlgorithmType.idObfuscation.getValue()); - pirJobParam.setParticipateAgencyId("1002"); - pirJobParam.setDatasetId("t_login_token"); - pirJobParam.setJobCreator("1001"); - pirJobParam.setGatewayUrl("http://localhost:5831/api/pir/v3/server"); - pirJobParam.setSearchIdList(Arrays.asList("1", "20", "3", "456")); + pirJobParam.setGatewayUrl("localhost:5831"); + pirJobParam.setSearchIdList(Arrays.asList("3", "2", "1")); PirClient pirClient = new PirClient(pirJobParam); PirResultResponse pirResultResponse = pirClient.executePirJob(); diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/PirClient.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/PirClient.java index ecf6ba75..41da2bd7 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/PirClient.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/PirClient.java @@ -1,14 +1,19 @@ package com.wedpr.pir.sdk; import com.fasterxml.jackson.databind.ObjectMapper; +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; import com.wedpr.pir.sdk.entity.body.SimpleEntity; import com.wedpr.pir.sdk.entity.param.PirJobParam; +import com.wedpr.pir.sdk.entity.request.ClientAuthRequest; import com.wedpr.pir.sdk.entity.request.ServerJobRequest; +import com.wedpr.pir.sdk.entity.response.ClientAuthResponse; import com.wedpr.pir.sdk.entity.response.ClientOTResponse; import com.wedpr.pir.sdk.entity.response.PirResultResponse; +import com.wedpr.pir.sdk.enums.ParamEnum; import com.wedpr.pir.sdk.exception.WedprException; import com.wedpr.pir.sdk.exception.WedprStatusEnum; import com.wedpr.pir.sdk.gateway.GatewayClient; +import com.wedpr.pir.sdk.helper.Constant; import com.wedpr.pir.sdk.service.PirJobService; import java.util.Objects; @@ -26,13 +31,33 @@ public PirClient(PirJobParam pirJobParam) throws WedprException { public PirResultResponse executePirJob() throws WedprException { try { - ClientOTResponse otParamResponse = pirJobService.requesterOtCipher(); + ObjectMapper objectMapper = new ObjectMapper(); + // 认证 PirJobParam pirJobParam = pirJobService.getPirJobParam(); + ServiceConfigBody serviceConfigBody = pirJobParam.getServiceConfigBody(); + Integer invokeType = pirJobParam.getPirInvokeType(); + if (invokeType.equals(ParamEnum.JobMode.SDKMode.getValue())) { + ClientAuthRequest clientAuthRequest = new ClientAuthRequest(); + clientAuthRequest.setAccessKeyId(pirJobParam.getAccessKeyId()); + clientAuthRequest.setServiceId(pirJobParam.getServiceId()); + String authUrl = pirJobParam.getGatewayUrl() + Constant.PERMISSION; + String body = objectMapper.writeValueAsString(clientAuthRequest); + ClientAuthResponse authResult = + GatewayClient.sendPostRequestWithRetry( + authUrl, body, ClientAuthResponse.class); + if (!authResult.getCode().equals(WedprStatusEnum.SUCCESS.getCode())) { + throw new WedprException( + WedprStatusEnum.CALL_PSI_ERROR, "认证失败:" + authResult.getMessage()); + } + pirJobParam.setServiceConfigBody(authResult.getData()); + } + + // 执行任务 + ClientOTResponse otParamResponse = pirJobService.requesterOtCipher(); ServerJobRequest serverJobRequest = generatePirJobRequestFromJobAndOTParam(pirJobParam, otParamResponse); - String pirUrl = pirJobParam.getGatewayUrl(); - ObjectMapper objectMapper = new ObjectMapper(); + String pirUrl = pirJobParam.getGatewayUrl() + Constant.SERVER; String body = objectMapper.writeValueAsString(serverJobRequest); SimpleEntity otResult = GatewayClient.sendPostRequestWithRetry(pirUrl, body, SimpleEntity.class); @@ -42,7 +67,7 @@ public PirResultResponse executePirJob() throws WedprException { } return pirJobService.requesterOtRecover(otParamResponse, pirJobParam, otResult); } catch (Exception e) { - throw new WedprException(WedprStatusEnum.CALL_PSI_ERROR); + throw new WedprException(WedprStatusEnum.CALL_PSI_ERROR, e.getMessage()); } } @@ -50,10 +75,9 @@ public PirResultResponse executePirJob() throws WedprException { private ServerJobRequest generatePirJobRequestFromJobAndOTParam( PirJobParam pirJobParam, ClientOTResponse otParamResponse) { ServerJobRequest serverJobRequest = new ServerJobRequest(); - serverJobRequest.setJobId(pirJobParam.getJobId()); - serverJobRequest.setParticipateAgencyId(pirJobParam.getParticipateAgencyId()); - serverJobRequest.setDatasetId(pirJobParam.getDatasetId()); - serverJobRequest.setJobCreator(pirJobParam.getJobCreator()); + serverJobRequest.setServiceId(pirJobParam.getServiceId()); + serverJobRequest.setServiceConfigBody(pirJobParam.getServiceConfigBody()); + serverJobRequest.setAccessKeyId(pirJobParam.getAccessKeyId()); serverJobRequest.setJobAlgorithmType(pirJobParam.getJobAlgorithmType()); serverJobRequest.setX(otParamResponse.getX()); serverJobRequest.setY(otParamResponse.getY()); diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/body/ServiceConfigBody.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/body/ServiceConfigBody.java new file mode 100644 index 00000000..5ffbd332 --- /dev/null +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/body/ServiceConfigBody.java @@ -0,0 +1,14 @@ +package com.wedpr.pir.sdk.entity.body; + +import lombok.Data; + +/** + * @author zachma + * @date 2024/9/3 + */ +@Data +public class ServiceConfigBody { + private String datasetId; + private String[] exists; + private String[] values; +} diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/param/PirJobParam.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/param/PirJobParam.java index 1812d9c4..4095dafc 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/param/PirJobParam.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/param/PirJobParam.java @@ -1,5 +1,6 @@ package com.wedpr.pir.sdk.entity.param; +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; import com.wedpr.pir.sdk.entity.request.PirBaseRequest; import com.wedpr.pir.sdk.enums.ParamEnum; import com.wedpr.pir.sdk.exception.WedprException; @@ -7,6 +8,7 @@ import com.wedpr.pir.sdk.helper.BasicTypeHelper; import java.util.Arrays; import java.util.List; +import java.util.Objects; import lombok.Data; import lombok.EqualsAndHashCode; @@ -24,13 +26,21 @@ public class PirJobParam extends PirBaseRequest { private List searchIdList; + private ServiceConfigBody serviceConfigBody; + public void check() throws WedprException { if (BasicTypeHelper.isStringEmpty(gatewayUrl)) { throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "访问gatewayUrl不能为空"); } - if (BasicTypeHelper.isStringEmpty(getJobId())) { - throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "jobId 不能为空"); + if (getPirInvokeType().equals(ParamEnum.JobMode.SDKMode.getValue()) + && BasicTypeHelper.isStringEmpty(getServiceId())) { + throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "serviceId 不能为空"); + } + + if (getPirInvokeType().equals(ParamEnum.JobMode.SDKMode.getValue()) + && BasicTypeHelper.isStringEmpty(getAccessKeyId())) { + throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "accessKeyId 不能为空"); } if (Arrays.stream(ParamEnum.JobType.values()) @@ -38,28 +48,20 @@ public void check() throws WedprException { throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "jobType输入错误"); } - if (BasicTypeHelper.isStringEmpty(getParticipateAgencyId())) { - throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "数据机构Id 不能为空"); - } - - if (BasicTypeHelper.isStringEmpty(getDatasetId())) { - throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "datasetId 不能为空"); - } - if (Arrays.stream(ParamEnum.AlgorithmType.values()) .noneMatch(enumValue -> enumValue.getValue().equals(getJobAlgorithmType()))) { throw new WedprException( WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "jobAlgorithmType输入错误"); } - if (searchIdList == null || searchIdList.size() == 0) { + if (Objects.isNull(searchIdList) || searchIdList.size() == 0) { throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "searchId列表不能为空"); } - if (getJobAlgorithmType().equals(ParamEnum.AlgorithmType.idObfuscation.getValue()) - && getObfuscationOrder() == -1) { + if (getPirInvokeType().equals(ParamEnum.JobMode.DirectorMode.getValue()) + && Objects.isNull(serviceConfigBody)) { throw new WedprException( - WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "哈希披露算法中, obfuscationOrder未设置"); + WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "向导模式下serviceConfigBody不能为空"); } } } diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ClientAuthRequest.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ClientAuthRequest.java new file mode 100644 index 00000000..0841ba6c --- /dev/null +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ClientAuthRequest.java @@ -0,0 +1,13 @@ +package com.wedpr.pir.sdk.entity.request; + +import lombok.Data; + +/** + * @author zachma + * @date 2024/9/3 + */ +@Data +public class ClientAuthRequest { + String serviceId; + String accessKeyId; +} diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ClientDirectRequest.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ClientDirectRequest.java new file mode 100644 index 00000000..9079ceb5 --- /dev/null +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ClientDirectRequest.java @@ -0,0 +1,16 @@ +package com.wedpr.pir.sdk.entity.request; + +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; +import java.util.List; +import lombok.Data; + +/** + * @author zachma + * @date 2024/9/5 + */ +@Data +public class ClientDirectRequest { + private String algorithmType; + private List searchIds; + private ServiceConfigBody serviceConfigBody; +} diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/PirBaseRequest.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/PirBaseRequest.java index 028db290..f38200b0 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/PirBaseRequest.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/PirBaseRequest.java @@ -1,21 +1,22 @@ package com.wedpr.pir.sdk.entity.request; +import com.wedpr.pir.sdk.enums.ParamEnum; import lombok.Data; @Data public class PirBaseRequest { // 任务ID - String jobId; // 任务类型(0: 查询存在性, 1: 查询对应值) + String serviceId; + + String accessKeyId; + String jobType; // 数据方机构id - String participateAgencyId; - // 数据集id - String datasetId; - // 发起方用户名 - String jobCreator; // 匿踪算法类型(0: hash披露算法, 1: hash混淆算法) String jobAlgorithmType; // 查询范围 Integer obfuscationOrder = 9; + + Integer pirInvokeType = ParamEnum.JobMode.SDKMode.getValue(); } diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerJobRequest.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerJobRequest.java index 0f91aa94..296b00c1 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerJobRequest.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerJobRequest.java @@ -1,6 +1,7 @@ package com.wedpr.pir.sdk.entity.request; import com.wedpr.pir.sdk.entity.body.PirDataBody; +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; import java.math.BigInteger; import java.util.List; import lombok.Data; @@ -16,4 +17,5 @@ public class ServerJobRequest extends PirBaseRequest { BigInteger x; BigInteger y; List dataBodyList; + ServiceConfigBody serviceConfigBody; } diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerOTRequest.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerOTRequest.java index a1ad74de..99ebce12 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerOTRequest.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/request/ServerOTRequest.java @@ -13,6 +13,7 @@ public class ServerOTRequest { String jobType; String datasetId; + String[] params; BigInteger x; BigInteger y; List dataBodyList; diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/response/ClientAuthResponse.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/response/ClientAuthResponse.java new file mode 100644 index 00000000..b2743f9a --- /dev/null +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/response/ClientAuthResponse.java @@ -0,0 +1,15 @@ +package com.wedpr.pir.sdk.entity.response; + +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; +import lombok.Data; + +/** + * @author zachma + * @date 2024/9/3 + */ +@Data +public class ClientAuthResponse { + String code; + String message; + ServiceConfigBody data; +} diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/response/PirResultResponse.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/response/PirResultResponse.java index fd78b0df..1b250263 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/response/PirResultResponse.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/entity/response/PirResultResponse.java @@ -8,7 +8,6 @@ */ @Data public class PirResultResponse { - String jobId; String jobType; // List detail; ClientDecryptResponse detail; diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/enums/ParamEnum.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/enums/ParamEnum.java index f3a2b321..27cdfa5f 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/enums/ParamEnum.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/enums/ParamEnum.java @@ -29,7 +29,15 @@ public enum AlgorithmType { @AllArgsConstructor public enum HttpType { HttpTimeout(180), - HttpMaxRetries(4); + HttpMaxRetries(1); + private Integer value; + } + + @Getter + @AllArgsConstructor + public enum JobMode { + SDKMode(0), + DirectorMode(1); private Integer value; } } diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/exception/WedprStatusEnum.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/exception/WedprStatusEnum.java index f1070a74..2313402d 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/exception/WedprStatusEnum.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/exception/WedprStatusEnum.java @@ -34,6 +34,7 @@ public enum WedprStatusEnum { INVALID_TRANS_TYPE("2009", "不合法的交易类型"), INVALID_FILE_PATH("2010", "不合法交集文件路径"), INVALID_ROLE_VALUE("2011", "pir.Role设置错误"), + CSV_ERROR("2012", "csv解析是错误"), // 3000 db错误 DB_ERROR("3000", "DB查询失败"), diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/gateway/GatewayClient.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/gateway/GatewayClient.java index ce58e8e2..c9cfaceb 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/gateway/GatewayClient.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/gateway/GatewayClient.java @@ -47,6 +47,9 @@ private static String sendGetRequest(String url) throws IOException, WedprExcept } private static Response sendPostRequest(String url, String jsonBody) throws IOException { + if (!url.startsWith("http")) { + url = "http://" + url; + } OkHttpClient httpClient = new OkHttpClient().newBuilder().build(); RequestBody body = RequestBody.create(jsonBody, JSON_MEDIA_TYPE); Request request = new Request.Builder().post(body).url(url).build(); @@ -69,6 +72,7 @@ public static T sendPostRequestWithRetry(String url, String body, Class r } } catch (Exception e) { + e.printStackTrace(); retries++; if (retries > maxRetries) { throw new WedprException(WedprStatusEnum.HTTP_CALL_ERROR); diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/helper/Constant.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/helper/Constant.java new file mode 100644 index 00000000..976252fd --- /dev/null +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/helper/Constant.java @@ -0,0 +1,17 @@ +package com.wedpr.pir.sdk.helper; +/** + * @author zachma + * @date 2024/9/3 + */ +public class Constant { + + public static final String SERVER = "/api/pir/v3/server"; + + public static final String PERMISSION = "/api/pir/v3/auth"; + + public static String PIR_TEMP_TABLE_PREFIX = "wedpr_pir_"; + + public static String datasetId2tableId(String datasetId) { + return PIR_TEMP_TABLE_PREFIX + datasetId.substring(2); + } +} diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/helper/CryptoOperatorHelper.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/helper/CryptoOperatorHelper.java index 51c89168..2bec5edf 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/helper/CryptoOperatorHelper.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/helper/CryptoOperatorHelper.java @@ -2,7 +2,6 @@ import com.wedpr.pir.sdk.exception.WedprException; import com.wedpr.pir.sdk.exception.WedprStatusEnum; - import java.math.BigInteger; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -57,9 +56,7 @@ public static BigInteger OTMul(BigInteger a, BigInteger b) { return a.multiply(b).mod(DEFAULT_N); } - /** - * 批量获取searchID的Hash值 - * */ + /** 批量获取searchID的Hash值 */ public static List getIdHashVec(int obfuscationOrder, Integer idIndex, String searchId) throws WedprException { diff --git a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/service/PirJobService.java b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/service/PirJobService.java index 5d6c0e1d..bdd97265 100644 --- a/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/service/PirJobService.java +++ b/java/ppc-pir-services/sdk/src/main/java/com/wedpr/pir/sdk/service/PirJobService.java @@ -56,7 +56,6 @@ public PirResultResponse requesterOtRecover( clientDecryptService.runDecryptOTparam(clientDecryptRequest); PirResultResponse pirResultResponse = new PirResultResponse(); - pirResultResponse.setJobId(clientJobRequest.getJobId()); pirResultResponse.setJobType(clientJobRequest.getJobType()); pirResultResponse.setDetail(clientDecryptResponse); return pirResultResponse; diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/WedprPirApplication.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/WedprPirApplication.java index 9d016653..f2c23ed7 100644 --- a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/WedprPirApplication.java +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/WedprPirApplication.java @@ -1,8 +1,10 @@ package com.webank.wedpr.pir; +import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; /** * @author zachma @@ -10,6 +12,8 @@ */ @SpringBootApplication @EnableAutoConfiguration +@MapperScan("com.webank.wedpr.pir.http.mapper") +@EntityScan(basePackages = {"cn.webank.wedpr.http", "cn.webank.wedpr.pir"}) public class WedprPirApplication { public static void main(String[] args) { SpringApplication.run(WedprPirApplication.class, args); diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/Dataset.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/Dataset.java new file mode 100644 index 00000000..4cb4f463 --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/Dataset.java @@ -0,0 +1,16 @@ +package com.webank.wedpr.pir.crypto.entity; + +import lombok.Data; + +/** + * @author zachma + * @date 2024/9/4 + */ +@Data +public class Dataset { + private String datasetId; + + private String datasetFields; + + private String datasetStoragePath; +} diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/DatasetStoragePath.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/DatasetStoragePath.java new file mode 100644 index 00000000..b6173658 --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/DatasetStoragePath.java @@ -0,0 +1,13 @@ +package com.webank.wedpr.pir.crypto.entity; + +import lombok.Data; + +/** + * @author zachma + * @date 2024/9/4 + */ +@Data +public class DatasetStoragePath { + private String storageType; + private String filePath; +} diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/WedprAuthTable.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/WedprAuthTable.java new file mode 100644 index 00000000..85a347ec --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/entity/WedprAuthTable.java @@ -0,0 +1,17 @@ +package com.webank.wedpr.pir.crypto.entity; + +import java.time.LocalDateTime; +import javax.persistence.Entity; +import lombok.Data; + +/** + * @author zachma + * @date 2024/9/3 + */ +@Entity +@Data +public class WedprAuthTable { + private String serviceId; + private String accessKeyId; + private LocalDateTime expireTime; +} diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/PirTableService.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/PirTableService.java index a70db535..a337c02d 100644 --- a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/PirTableService.java +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/PirTableService.java @@ -9,11 +9,7 @@ */ public interface PirTableService { /** 根据id查询 */ - List idFilterTable(String datasetId, String filter); - - List idFilterTable(String datasetId, String filter, String[] params); - - List idObfuscationTable(String datasetId, String searchId); + List idFilterTable(String datasetId, String filter, String[] param); List idObfuscationTable(String datasetId, String searchId, String[] params); diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirOTServiceImpl.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirOTServiceImpl.java index a61fa4c3..56a54e82 100644 --- a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirOTServiceImpl.java +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirOTServiceImpl.java @@ -9,6 +9,7 @@ import com.wedpr.pir.sdk.entity.response.ServerOTResponse; import com.wedpr.pir.sdk.exception.WedprException; import com.wedpr.pir.sdk.helper.AESHelper; +import com.wedpr.pir.sdk.helper.Constant; import com.wedpr.pir.sdk.helper.ConvertTypeHelper; import com.wedpr.pir.sdk.helper.CryptoOperatorHelper; import java.math.BigInteger; @@ -33,13 +34,15 @@ public ServerOTResponse runServerOTParam(ServerOTRequest serverOTRequest) throws WedprException { logger.info("Server start runServerOTParam."); String datasetId = serverOTRequest.getDatasetId(); + String tableId = Constant.datasetId2tableId(datasetId); BigInteger x = serverOTRequest.getX(); BigInteger y = serverOTRequest.getY(); + String[] params = serverOTRequest.getParams(); List serverResultlist = new ArrayList<>(); for (int i = 0; i < serverOTRequest.getDataBodyList().size(); i++) { BigInteger z0 = serverOTRequest.getDataBodyList().get(i).getZ0(); String filter = serverOTRequest.getDataBodyList().get(i).getFilter(); - List objects = pirTableService.idFilterTable(datasetId, filter); + List objects = pirTableService.idFilterTable(tableId, filter, params); List pirTables = pirTableService.objectsToPirTableList(objects); List serverResultBodyTempList = new ArrayList<>(); for (PirTable pirTable : pirTables) { @@ -59,8 +62,9 @@ public ServerOTResponse runServerOTCipher(ServerOTRequest serverOTRequest) String datasetId = serverOTRequest.getDatasetId(); BigInteger x = serverOTRequest.getX(); BigInteger y = serverOTRequest.getY(); - + String tableId = Constant.datasetId2tableId(datasetId); List serverResultlist = new ArrayList<>(); + String[] params = serverOTRequest.getParams(); for (int i = 0; i < serverOTRequest.getDataBodyList().size(); i++) { BigInteger z0 = serverOTRequest.getDataBodyList().get(i).getZ0(); List idHashList = serverOTRequest.getDataBodyList().get(i).getIdHashList(); @@ -68,7 +72,7 @@ public ServerOTResponse runServerOTCipher(ServerOTRequest serverOTRequest) for (int j = 0; j < idHashList.size(); j++) { List objects = - pirTableService.idObfuscationTable(datasetId, idHashList.get(j)); + pirTableService.idObfuscationTable(tableId, idHashList.get(j), params); List pirTables = pirTableService.objectsToPirTableList(objects); for (PirTable pirTable : pirTables) { ServerResultBody serverResultBody = diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirTableServiceImpl.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirTableServiceImpl.java index 7d319b5f..ef38830a 100644 --- a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirTableServiceImpl.java +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/crypto/service/impl/PirTableServiceImpl.java @@ -19,42 +19,28 @@ public class PirTableServiceImpl implements PirTableService { @PersistenceContext private EntityManager entityManager; - @Override - public List idFilterTable(String datasetId, String filter) { - String sqlFormat = "SELECT * FROM %s WHERE id LIKE CONCAT(%s, '%%')"; - String sql = String.format(sqlFormat, datasetId, filter); - Query query = entityManager.createNativeQuery(sql); - return query.getResultList(); - } - @Override public List idFilterTable(String datasetId, String filter, String[] params) { - String sqlFormat = "SELECT %s FROM %s WHERE id LIKE CONCAT(%s, '%%')"; - String sql = String.format(sqlFormat, String.join(", ", params), datasetId, filter); - Query query = entityManager.createNativeQuery(sql); - return query.getResultList(); - } - - @Override - public List idObfuscationTable(String datasetId, String searchId) { - String sqlFormat = "SELECT * FROM %s WHERE id = %s"; - String sql = String.format(sqlFormat, datasetId, searchId); + String param = processParams(params); + String sqlFormat = "SELECT t.id as t_id, %s FROM %s t WHERE t.id LIKE CONCAT(%s, '%%')"; + String sql = String.format(sqlFormat, param, datasetId, filter); Query query = entityManager.createNativeQuery(sql); return query.getResultList(); } @Override public List idObfuscationTable(String datasetId, String searchId, String[] params) { - String sqlFormat = "SELECT %s FROM %s WHERE id = %s"; - String sql = String.format(sqlFormat, String.join(", ", params), datasetId, searchId); + String param = processParams(params); + String sqlFormat = "SELECT t.id as t_id, %s FROM %s t WHERE t.id = %s"; + String sql = String.format(sqlFormat, param, datasetId, searchId); Query query = entityManager.createNativeQuery(sql); return query.getResultList(); } - public List objectsToPirTableList(List params) { + public List objectsToPirTableList(List values) { List result = new LinkedList<>(); - for (int i = 0; i < params.size(); i++) { - Object[] temp = (Object[]) params.get(i); + for (int i = 0; i < values.size(); i++) { + Object[] temp = (Object[]) values.get(i); PirTable pirTable = new PirTable(); pirTable.setId(i + 1); pirTable.setPirKey(String.valueOf(temp[0])); @@ -64,4 +50,9 @@ public List objectsToPirTableList(List params) { } return result; } + + private String processParams(String[] params) { + String[] temp = Arrays.stream(params).map(s -> "t." + s).toArray(String[]::new); + return String.join(",", temp); + } } diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/common/CSVFileParser.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/common/CSVFileParser.java new file mode 100644 index 00000000..4fd4c8e4 --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/common/CSVFileParser.java @@ -0,0 +1,220 @@ +package com.webank.wedpr.pir.http.common; + +import com.opencsv.CSVReaderHeaderAware; +import com.wedpr.pir.sdk.exception.WedprException; +import com.wedpr.pir.sdk.exception.WedprStatusEnum; +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CSVFileParser { + private static final Logger logger = LoggerFactory.getLogger(CSVFileParser.class); + + private interface ParseHandler { + Object call(CSVReaderHeaderAware reader) throws Exception; + } + + private static Object loadCSVFile(String filePath, int chunkSize, ParseHandler handler) + throws Exception { + try (Reader fileReader = new BufferedReader(new FileReader(filePath), chunkSize); + CSVReaderHeaderAware reader = new CSVReaderHeaderAware(fileReader)) { + if (handler != null) { + return handler.call(reader); + } + return null; + } catch (Exception e) { + logger.warn("CSVFileParser exception, filePath: {}, error: ", filePath, e); + throw new WedprException( + WedprStatusEnum.CSV_ERROR, "loadCSVFile exception for " + e.getMessage()); + } + } + + public static List> processCsv2SqlMap(String[] tableFields, String csvFilePath) + throws Exception { + return (List>) + loadCSVFile( + csvFilePath, + Constant.DEFAULT_READ_TRUNK_SIZE, + reader -> { + List> resultValue = new ArrayList<>(); + Map row; + while ((row = reader.readMap()) != null) { + List rowValue = new ArrayList<>(); + for (String field : tableFields) { + if (!row.keySet().contains(field.trim())) { + String errorMsg = + "extractFields failed for the field " + + field + + " not existed in the file " + + tableFields.toString(); + logger.warn(errorMsg); + throw new WedprException( + WedprStatusEnum.CSV_ERROR, errorMsg); + } + rowValue.add(row.get(field)); + } + resultValue.add(rowValue); + } + return resultValue; + }); + } + + public static Set getFields(String filePath) throws Exception { + return (Set) + (loadCSVFile( + filePath, + Constant.DEFAULT_READ_TRUNK_SIZE, + new ParseHandler() { + @Override + public Object call(CSVReaderHeaderAware reader) throws Exception { + return reader.readMap().keySet(); + } + })); + } + + public static class ExtractConfig { + private String originalFilePath; + private List extractFields; + private String extractFilePath; + private String fieldSplitter = ","; + private Integer writeChunkSize = Constant.DEFAULT_WRITE_TRUNK_SIZE; + private Integer readChunkSize = Constant.DEFAULT_READ_TRUNK_SIZE; + + public ExtractConfig() {} + + public ExtractConfig( + String originalFilePath, List extractFields, String extractFilePath) { + this.originalFilePath = originalFilePath; + this.extractFields = extractFields; + this.extractFilePath = extractFilePath; + } + + public String getOriginalFilePath() { + return originalFilePath; + } + + public void setOriginalFilePath(String originalFilePath) { + this.originalFilePath = originalFilePath; + } + + public List getExtractFields() { + return extractFields; + } + + public void setExtractFields(List extractFields) { + this.extractFields = extractFields; + } + + public String getExtractFilePath() { + return extractFilePath; + } + + public void setExtractFilePath(String extractFilePath) { + this.extractFilePath = extractFilePath; + } + + public String getFieldSplitter() { + return fieldSplitter; + } + + public void setFieldSplitter(String fieldSplitter) { + this.fieldSplitter = fieldSplitter; + } + + public Integer getWriteChunkSize() { + return writeChunkSize; + } + + public void setWriteChunkSize(Integer writeChunkSize) { + this.writeChunkSize = writeChunkSize; + } + + public Integer getReadChunkSize() { + return readChunkSize; + } + + public void setReadChunkSize(Integer readChunkSize) { + this.readChunkSize = readChunkSize; + } + + @Override + public String toString() { + return "ExtractConfig{" + + "originalFilePath='" + + originalFilePath + + '\'' + + ", extractFields=" + + extractFields + + ", extractFilePath='" + + extractFilePath + + '\'' + + ", fieldSplitter='" + + fieldSplitter + + '\'' + + ", writeChunkSize=" + + writeChunkSize + + ", readChunkSize=" + + readChunkSize + + '}'; + } + } + + public static void extractFields(ExtractConfig extractConfig) throws Exception { + loadCSVFile( + extractConfig.getOriginalFilePath(), + extractConfig.getReadChunkSize(), + new ParseHandler() { + @Override + public Object call(CSVReaderHeaderAware reader) throws Exception { + // check the fields + Map headerInfo = reader.readMap(); + Set fields = headerInfo.keySet(); + for (String field : extractConfig.getExtractFields()) { + if (!fields.contains(field.trim())) { + String errorMsg = + "extractFields failed for the field " + + field + + " not existed in the file " + + extractConfig.getOriginalFilePath(); + logger.warn(errorMsg); + throw new WedprException(WedprStatusEnum.CSV_ERROR, errorMsg); + } + } + Map row; + try (Writer writer = + new BufferedWriter( + new FileWriter(extractConfig.getExtractFilePath()), + extractConfig.getWriteChunkSize())) { + // write the data(Note: here no need to write the header) + while ((row = reader.readMap()) != null) { + int column = 0; + for (String field : extractConfig.getExtractFields()) { + writer.write(row.get(field)); + if (column < extractConfig.getExtractFields().size() - 1) { + writer.write(extractConfig.getFieldSplitter()); + } + column++; + } + writer.write(Constant.DEFAULT_LINE_SPLITTER); + } + } catch (Exception e) { + logger.warn( + "extractFields exception, config: {}, error", + extractConfig.toString(), + e); + throw new WedprException( + WedprStatusEnum.CSV_ERROR, + "extractFields exception, detail: " + + extractConfig.toString() + + ", error: " + + e.getMessage()); + } + return null; + } + }); + } +} diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/common/Constant.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/common/Constant.java index 9ddfd9bb..f967d323 100644 --- a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/common/Constant.java +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/common/Constant.java @@ -7,4 +7,9 @@ public class Constant { public static final String PIR_API_PREFIX = "/api/pir/v3"; + + public static final Integer DEFAULT_READ_TRUNK_SIZE = 1024 * 1024; + public static final Integer DEFAULT_WRITE_TRUNK_SIZE = 1024 * 1024; + + public static final String DEFAULT_LINE_SPLITTER = "\n"; } diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/config/HadoopConfig.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/config/HadoopConfig.java new file mode 100644 index 00000000..def233a0 --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/config/HadoopConfig.java @@ -0,0 +1,33 @@ +package com.webank.wedpr.pir.http.config; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.fs.FileSystem; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author zachma + * @date 2024/9/4 + */ +@Configuration +public class HadoopConfig { + @Value("${hdfs.uri}") + private String hdfsUri; + + @Value("${hdfs.user}") + private String hdfsUser; + + @Value("${hdfs.local}") + private String hdfsLocal; + + @Bean + public FileSystem fileSystem() throws IOException, InterruptedException { + org.apache.hadoop.conf.Configuration configuration = + new org.apache.hadoop.conf.Configuration(); + configuration.set("fs.defaultFS", hdfsUri); + FileSystem fileSystem = FileSystem.get(URI.create(hdfsUri), configuration, hdfsUser); + return fileSystem; + } +} diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/controller/PirController.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/controller/PirController.java index 361c63b0..e876b9d2 100644 --- a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/controller/PirController.java +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/controller/PirController.java @@ -3,9 +3,18 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.webank.wedpr.pir.http.common.Constant; import com.webank.wedpr.pir.http.service.PirAppService; +import com.webank.wedpr.pir.http.service.PirAuthService; +import com.webank.wedpr.pir.http.service.PirDataService; +import com.wedpr.pir.sdk.PirClient; +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; +import com.wedpr.pir.sdk.entity.param.PirJobParam; +import com.wedpr.pir.sdk.entity.request.ClientAuthRequest; +import com.wedpr.pir.sdk.entity.request.ClientDirectRequest; import com.wedpr.pir.sdk.entity.request.ServerJobRequest; import com.wedpr.pir.sdk.entity.response.ClientJobResponse; +import com.wedpr.pir.sdk.entity.response.PirResultResponse; import com.wedpr.pir.sdk.entity.response.ServerOTResponse; +import com.wedpr.pir.sdk.enums.ParamEnum; import com.wedpr.pir.sdk.exception.WedprStatusEnum; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -26,10 +35,58 @@ public class PirController { @Autowired private ObjectMapper objectMapper; @Autowired private PirAppService pirAppService; + @Autowired private PirAuthService pirAuthService; + @Autowired private PirDataService pirDataService; + + @PostMapping(Constant.PIR_API_PREFIX + "/auth") + public ClientJobResponse pirAuthController(@RequestBody ClientAuthRequest clientAuthRequest) { + try { + logger.info( + "ClientAuthRequest: clientAuthRequest: {}.", + objectMapper.writeValueAsString(clientAuthRequest)); + // 1. 发起认证auth + ServiceConfigBody serviceConfigBody = + pirAuthService.checkServiceAuth(clientAuthRequest); + return new ClientJobResponse(WedprStatusEnum.SUCCESS, serviceConfigBody); + } catch (Exception e) { + logger.warn( + "匿踪任务认证失败, serviceId: {}, response: {}", + clientAuthRequest.getServiceId(), + WedprStatusEnum.SYSTEM_EXCEPTION.getMessage()); + return new ClientJobResponse(WedprStatusEnum.SYSTEM_EXCEPTION, null); + } + } + + @PostMapping(Constant.PIR_API_PREFIX + "/client") + public PirResultResponse pirClientController( + @RequestBody ClientDirectRequest clientDirectRequest) { + try { + logger.info( + "ClientDirectRequest: clientDirectRequest: {}.", + objectMapper.writeValueAsString(clientDirectRequest)); + PirJobParam pirJobParam = new PirJobParam(); + pirJobParam.setPirInvokeType(ParamEnum.JobMode.DirectorMode.getValue()); + pirJobParam.setJobType(ParamEnum.JobType.SearchValue.getValue()); + pirJobParam.setJobAlgorithmType(clientDirectRequest.getAlgorithmType()); + + // todo:对接网关 + pirJobParam.setGatewayUrl("localhost:5831"); + + pirJobParam.setSearchIdList(clientDirectRequest.getSearchIds()); + pirJobParam.setServiceConfigBody(clientDirectRequest.getServiceConfigBody()); + + PirClient pirClient = new PirClient(pirJobParam); + return pirClient.executePirJob(); + } catch (Exception e) { + logger.warn("匿踪任务失败, response: {}", WedprStatusEnum.SYSTEM_EXCEPTION.getMessage()); + return null; + } + } @PostMapping(Constant.PIR_API_PREFIX + "/server") public ClientJobResponse pirServerController(@RequestBody ServerJobRequest serverJobRequest) { try { + pirDataService.processPirDataset(serverJobRequest.getServiceConfigBody()); logger.info( "WedprServerjob: serverJobRequest: {}.", objectMapper.writeValueAsString(serverJobRequest)); @@ -37,9 +94,10 @@ public ClientJobResponse pirServerController(@RequestBody ServerJobRequest serve ServerOTResponse serverOTResponse = pirAppService.providerOtCipher(serverJobRequest); return new ClientJobResponse(WedprStatusEnum.SUCCESS, serverOTResponse); } catch (Exception e) { + e.printStackTrace(); logger.warn( - "匿踪任务失败, jobID: {}, response: {}", - serverJobRequest.getJobId(), + "匿踪任务失败, serviceId: {}, response: {}", + serverJobRequest.getServiceId(), WedprStatusEnum.SYSTEM_EXCEPTION.getMessage()); return new ClientJobResponse(WedprStatusEnum.SYSTEM_EXCEPTION, e.getMessage()); } diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/mapper/WedprServiceAuthTableMapper.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/mapper/WedprServiceAuthTableMapper.java new file mode 100644 index 00000000..061fbe60 --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/mapper/WedprServiceAuthTableMapper.java @@ -0,0 +1,47 @@ +package com.webank.wedpr.pir.http.mapper; + +import com.webank.wedpr.pir.crypto.entity.Dataset; +import com.webank.wedpr.pir.crypto.entity.WedprAuthTable; +import java.util.List; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Result; +import org.apache.ibatis.annotations.Results; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +/** + * @author zachma + * @date 2024/9/3 + */ +@Mapper +public interface WedprServiceAuthTableMapper { + + @Select( + "SELECT service_id, access_key_id, expire_time FROM wedpr_service_auth_table WHERE service_id = #{serviceId} AND access_key_id = #{accessKeyId}") + @Results({ + @Result(property = "serviceId", column = "service_id"), + @Result(property = "accessKeyId", column = "access_key_id"), + @Result(property = "expireTime", column = "expire_time") + }) + WedprAuthTable searchWedprAuthTable( + @Param("serviceId") String serviceId, @Param("accessKeyId") String accessKeyId); + + @Select("SELECT service_config FROM wedpr_published_service WHERE service_id = #{serviceId}") + String searchWedprServicePublishTable(@Param("serviceId") String serviceId); + + @Select("SHOW TABLES") + List showAllTables(); + + @Select( + "SELECT dataset_id, dataset_fields, dataset_storage_path FROM wedpr_dataset WHERE dataset_id = #{datasetId}") + @Results({ + @Result(property = "datasetId", column = "dataset_id"), + @Result(property = "datasetFields", column = "dataset_fields"), + @Result(property = "datasetStoragePath", column = "dataset_storage_path") + }) + Dataset searchWedprDatasetId(@Param("datasetId") String datasetId); + + @Update(value = "${nativeSql}") + void executeTableByNativeSql(@Param("nativeSql") String nativeSql); +} diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirAppService.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirAppService.java index 4d0db22f..1852c523 100644 --- a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirAppService.java +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirAppService.java @@ -24,7 +24,7 @@ public ServerOTResponse providerOtCipher(ServerJobRequest serverJobRequest) // 1. 根据请求,筛选数据,加密密钥,返回筛选结果及AES消息密文 ServerOTRequest serverOTRequest = new ServerOTRequest(); serverOTRequest.setJobType(serverJobRequest.getJobType()); - serverOTRequest.setDatasetId(serverJobRequest.getDatasetId()); + serverOTRequest.setDatasetId(serverJobRequest.getServiceConfigBody().getDatasetId()); serverOTRequest.setX(serverJobRequest.getX()); serverOTRequest.setY(serverJobRequest.getY()); serverOTRequest.setDataBodyList(serverJobRequest.getDataBodyList()); @@ -33,10 +33,12 @@ public ServerOTResponse providerOtCipher(ServerJobRequest serverJobRequest) if (serverJobRequest .getJobAlgorithmType() .equals(ParamEnum.AlgorithmType.idFilter.getValue())) { + serverOTRequest.setParams(serverJobRequest.getServiceConfigBody().getValues()); otResultResponse = pirOTService.runServerOTParam(serverOTRequest); } else if (serverJobRequest .getJobAlgorithmType() .equals(ParamEnum.AlgorithmType.idObfuscation.getValue())) { + serverOTRequest.setParams(serverJobRequest.getServiceConfigBody().getExists()); otResultResponse = pirOTService.runServerOTCipher(serverOTRequest); } else { throw new WedprException(WedprStatusEnum.INVALID_INPUT_VALUE); diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirAuthService.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirAuthService.java new file mode 100644 index 00000000..ca0d979e --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirAuthService.java @@ -0,0 +1,50 @@ +package com.webank.wedpr.pir.http.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.webank.wedpr.pir.crypto.entity.WedprAuthTable; +import com.webank.wedpr.pir.http.mapper.WedprServiceAuthTableMapper; +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; +import com.wedpr.pir.sdk.entity.request.ClientAuthRequest; +import com.wedpr.pir.sdk.exception.WedprException; +import com.wedpr.pir.sdk.exception.WedprStatusEnum; +import java.time.LocalDateTime; +import java.util.Objects; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author zachma + * @date 2024/9/3 + */ +@Service +public class PirAuthService { + + @Autowired private WedprServiceAuthTableMapper wedprServiceAuthTableMapper; + + public ServiceConfigBody checkServiceAuth(ClientAuthRequest clientAuthRequest) + throws WedprException { + try { + WedprAuthTable wedprAuthTable = + wedprServiceAuthTableMapper.searchWedprAuthTable( + clientAuthRequest.getServiceId(), clientAuthRequest.getAccessKeyId()); + if (Objects.isNull(wedprAuthTable)) { + throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "用户未申请认证"); + } + + if (LocalDateTime.now().isAfter(wedprAuthTable.getExpireTime())) { + throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, "认证已经过期"); + } + + String serviceConfigStr = + wedprServiceAuthTableMapper.searchWedprServicePublishTable( + clientAuthRequest.getServiceId()); + ObjectMapper objectMapper = new ObjectMapper(); + ServiceConfigBody serviceConfigBody = + objectMapper.readValue(serviceConfigStr, ServiceConfigBody.class); + + return serviceConfigBody; + } catch (Exception e) { + throw new WedprException(WedprStatusEnum.CLIENT_PARAM_EXCEPTION, e.getMessage()); + } + } +} diff --git a/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirDataService.java b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirDataService.java new file mode 100644 index 00000000..25946eb8 --- /dev/null +++ b/java/ppc-pir-services/src/main/java/com/webank/wedpr/pir/http/service/PirDataService.java @@ -0,0 +1,90 @@ +package com.webank.wedpr.pir.http.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.webank.wedpr.pir.crypto.entity.Dataset; +import com.webank.wedpr.pir.crypto.entity.DatasetStoragePath; +import com.webank.wedpr.pir.http.common.CSVFileParser; +import com.webank.wedpr.pir.http.mapper.WedprServiceAuthTableMapper; +import com.wedpr.pir.sdk.entity.body.ServiceConfigBody; +import com.wedpr.pir.sdk.helper.Constant; +import java.util.Arrays; +import java.util.List; +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author zachma + * @date 2024/9/4 + */ +@Service +public class PirDataService { + + @Autowired private WedprServiceAuthTableMapper wedprServiceAuthTableMapper; + + @Autowired private FileSystem fileSystem; + + @PersistenceContext private EntityManager entityManager; + + public void processPirDataset(ServiceConfigBody serviceConfigBody) throws Exception { + // 数据预处理 + List allTables = wedprServiceAuthTableMapper.showAllTables(); + String tableId = Constant.datasetId2tableId(serviceConfigBody.getDatasetId()); + if (allTables.contains(tableId)) { + return; + } + Dataset dataset = + wedprServiceAuthTableMapper.searchWedprDatasetId(serviceConfigBody.getDatasetId()); + ObjectMapper objectMapper = new ObjectMapper(); + DatasetStoragePath datasetStoragePath = + objectMapper.readValue(dataset.getDatasetStoragePath(), DatasetStoragePath.class); + + String csvFilePath = "./" + serviceConfigBody.getDatasetId(); + fileSystem.copyToLocalFile( + new Path(datasetStoragePath.getFilePath()), new Path(csvFilePath)); + + String[] datasetFields = + Arrays.stream(dataset.getDatasetFields().trim().split(",")) + .map(String::trim) + .toArray(String[]::new); + processCsv2Db(serviceConfigBody.getDatasetId(), datasetFields, csvFilePath); + } + + private void processCsv2Db(String datasetId, String[] datasetFields, String csvFilePath) + throws Exception { + List> sqlValues = CSVFileParser.processCsv2SqlMap(datasetFields, csvFilePath); + if (sqlValues.size() == 0) { + throw new Exception("数据集为空,请退回"); + } + + String tableId = Constant.datasetId2tableId(datasetId); + String createSqlFormat = "CREATE TABLE %s ( %s , PRIMARY KEY (`id`) USING BTREE )"; + + String[] fieldsWithType = new String[datasetFields.length]; + for (int i = 0; i < datasetFields.length; i++) { + if ("id".equalsIgnoreCase(datasetFields[i])) { + fieldsWithType[i] = datasetFields[i] + " VARCHAR(64)"; + } else { + fieldsWithType[i] = datasetFields[i] + " TEXT"; + } + } + String sql = String.format(createSqlFormat, tableId, String.join(",", fieldsWithType)); + wedprServiceAuthTableMapper.executeTableByNativeSql(sql); + + StringBuilder sb = new StringBuilder(); + for (List values : sqlValues) { + sb.append("(").append(String.join(",", values)).append("), "); + } + String insertValues = sb.toString(); + insertValues = insertValues.substring(0, insertValues.length() - 2); + + String insertSqlFormat = "INSERT INTO %s (%s) VALUES %s "; + sql = + String.format( + insertSqlFormat, tableId, String.join(",", datasetFields), insertValues); + wedprServiceAuthTableMapper.executeTableByNativeSql(sql); + } +} diff --git a/java/ppc-pir-services/src/main/resources/application-dev.properties b/java/ppc-pir-services/src/main/resources/application-dev.properties index 86233b92..ac03ea13 100644 --- a/java/ppc-pir-services/src/main/resources/application-dev.properties +++ b/java/ppc-pir-services/src/main/resources/application-dev.properties @@ -6,4 +6,9 @@ banner.charset=UTF-8 spring.jackson.local-date-time-format=yyyy-MM-dd HH:mm:ss spring.jackson.local-date-format=yyyy-MM-dd spring.jackson.local-time-format=HH:mm:ss -spring.jackson.time-zone=GMT+8 \ No newline at end of file +spring.jackson.time-zone=GMT+8 +spring.jackson.open-in-view=false + +hdfs.uri=hdfs://139.159.202.235:9900 +hdfs.user=ppc +hdfs.local=./temp \ No newline at end of file diff --git a/java/ppc-pir-services/src/main/resources/application-mysql.properties b/java/ppc-pir-services/src/main/resources/application-mysql.properties index 4c82652f..3f2227a1 100644 --- a/java/ppc-pir-services/src/main/resources/application-mysql.properties +++ b/java/ppc-pir-services/src/main/resources/application-mysql.properties @@ -1,6 +1,6 @@ -spring.datasource.url=jdbc:mysql://139.159.202.235:3306/ppc1?serverTimezone=Asia/Shanghai&useUnicode=true&autoReconnect=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useFractionalSeconds=true&useUnbufferedInput=false&useSSL=false -spring.datasource.username=root -spring.datasource.password=Wedpr2023 +spring.datasource.url=jdbc:mysql://xxxxxxx:3306/xxxxx?serverTimezone=Asia/Shanghai&useUnicode=true&autoReconnect=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useFractionalSeconds=true&useUnbufferedInput=false&useSSL=false +spring.datasource.username=xxxx +spring.datasource.password=xxxx spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5InnoDBDialect spring.jpa.database=mysql