Skip to content

Commit

Permalink
Merge pull request #89 from ywy2090/feature-milestone2-gateway
Browse files Browse the repository at this point in the history
update mpc service
  • Loading branch information
ywy2090 authored Nov 15, 2024
2 parents 1540968 + 34f237d commit b258d80
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 62 deletions.
6 changes: 4 additions & 2 deletions cpp/wedpr-computing/ppc-mpc/src/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ struct JobInfo
int selfIndex;
bool isMalicious;
int bitLength;
std::string inputFileName;
std::string outputFileName;
std::string mpcFilePath;
std::string inputFilePath;
std::string outputFilePath;
std::string gatewayEngineEndpoint;
};

Expand All @@ -51,6 +52,7 @@ const std::string MPC_RELATIVE_PATH = "/Programs/Source/";
const std::string MPC_ALGORITHM_FILE_SUFFIX = ".mpc";
const std::string MPC_ALGORITHM_COMPILER = "compile.py";
const std::string MPC_PREPARE_FILE = "mpc_prepare.csv";
const std::string MPC_RESULT_FILE = "mpc_result.csv";

enum MpcBinaryType
{
Expand Down
170 changes: 113 additions & 57 deletions cpp/wedpr-computing/ppc-mpc/src/MPCService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#include "MPCService.h"
#include "Common.h"
#include "ppc-framework/io/DataResourceLoader.h"
#include "ppc-io/src/DataResourceLoaderImpl.h"
#include "ppc-io/src/FileLineReader.h"
Expand All @@ -44,95 +45,128 @@ using namespace ppc::tools;
using namespace ppc::storage;
using namespace ppc::rpc;

void MPCService::removeAllFiles(const std::vector<std::string> &files)
{
for (const auto &file : files)
{
if (file.empty())
{
continue;
}
try {
if (boost::filesystem::exists(file))
{
boost::filesystem::remove_all(file);

MPC_LOG(INFO) << LOG_DESC("[MPCService][removeAllFiles]")
<< LOG_KV("file", file);
}
} catch (...) {
MPC_LOG(INFO) << LOG_DESC("[MPCService][removeAllFiles]")
<< LOG_DESC("remove file exception")
<< LOG_KV("file", file);
}
}
}

void MPCService::doRun(Json::Value const& request, Json::Value& response)
{
auto startT = utcSteadyTime();

std::string localPathPrefix;
std::string mpcFileLocalPath;
try
{ // 0 get jobInfo and make command
auto jobInfo = paramsToJobInfo(request);

std::string jobId = jobInfo.jobId;
int participantCount = jobInfo.participantCount;
int selfIndex = jobInfo.selfIndex;

std::string mpcCmd;
makeCommand(mpcCmd, jobInfo);

std::string hdfsPathPrefix =
m_mpcConfig.datasetHDFSPath + PATH_SEPARATOR + jobInfo.jobId + PATH_SEPARATOR;
std::string localPathPrefix =
localPathPrefix =
m_mpcConfig.jobPath + PATH_SEPARATOR + jobInfo.jobId + PATH_SEPARATOR;

// 1 download algorithm file
std::string algorithmFileHdfsPath =
hdfsPathPrefix + jobInfo.jobId + MPC_ALGORITHM_FILE_SUFFIX;
// 1 download mpc algorithm file
std::string mpcFileHdfsPath = jobInfo.mpcFilePath;
std::string mpcRootPath = m_mpcConfig.mpcRootPathNoGateway;
if (jobInfo.mpcNodeUseGateway)
{
mpcRootPath = m_mpcConfig.mpcRootPath;
}
std::string algorithmFileLocalPath =

mpcFileLocalPath =
mpcRootPath + MPC_RELATIVE_PATH + jobInfo.jobId + MPC_ALGORITHM_FILE_SUFFIX;
if (!boost::filesystem::exists(algorithmFileLocalPath))
{
auto lineReader1 =
initialize_lineReader(jobInfo, algorithmFileHdfsPath, DataResourceType::HDFS);
auto lineWriter1 =
initialize_lineWriter(jobInfo, algorithmFileLocalPath, DataResourceType::FILE);
readAndSaveFile(lineReader1, lineWriter1);
}

// 2 download dataset file
std::string datasetFileHdfsPath = hdfsPathPrefix + jobInfo.inputFileName;
std::string datasetFileLocalPath = localPathPrefix + jobInfo.inputFileName;
if (!boost::filesystem::exists(datasetFileLocalPath))
{
auto lineReader2 =
initialize_lineReader(jobInfo, datasetFileHdfsPath, DataResourceType::HDFS);
auto lineWriter2 =
initialize_lineWriter(jobInfo, datasetFileLocalPath, DataResourceType::FILE);
readAndSaveFile(lineReader2, lineWriter2);
}
auto mpcFileReader =
initialize_lineReader(jobInfo, mpcFileHdfsPath, DataResourceType::HDFS);
auto mpcFileWriter =
initialize_lineWriter(jobInfo, mpcFileLocalPath, DataResourceType::FILE);
readAndSaveFile(mpcFileHdfsPath, mpcFileLocalPath, mpcFileReader, mpcFileWriter);


// 2 download mpc prepare file
std::string mpcPrepareFileHdfsPath = jobInfo.inputFilePath;

// std::string inputFilePath =
// m_mpcConfig.jobPath + PATH_SEPARATOR + jobInfo.jobId + PATH_SEPARATOR + MPC_PREPARE_FILE;

std::string mpcPrepareFileLocalPath = localPathPrefix + MPC_PREPARE_FILE + "-P" + std::to_string(selfIndex) + "-0";
auto datasetFileReader =
initialize_lineReader(jobInfo, mpcPrepareFileHdfsPath, DataResourceType::HDFS);
auto datasetFileWriter =
initialize_lineWriter(jobInfo, mpcPrepareFileLocalPath, DataResourceType::FILE);
readAndSaveFile(mpcPrepareFileHdfsPath, mpcPrepareFileLocalPath, datasetFileReader, datasetFileWriter);

// 3 run mpc job
int outExitStatus = MPC_SUCCESS;
std::string outResult;
execCommand(mpcCmd, outExitStatus, outResult);

if (outExitStatus != MPC_SUCCESS)
{
removeAllFiles(std::vector<std::string>{localPathPrefix, mpcFileLocalPath});
MPC_LOG(ERROR) << LOG_DESC("[MPCService][doRun]")
<< "run mpc job failed"
<< LOG_KV("jobId", jobId)
<< LOG_KV("outExitStatus", outExitStatus)
<< LOG_KV("outResult", outResult);
BOOST_THROW_EXCEPTION(RunMpcFailException() << errinfo_comment(outResult));
}

std::string message = "run mpc job successfully";
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_KV("jobId", jobId) << LOG_DESC(message);
// MPC_LOG(DEBUG) << LOG_DESC("[MPCService][doRun]") << LOG_KV("jobId", jobId) << LOG_KV("outResult", outResult);
response["code"] = MPC_SUCCESS;
response["message"] = "success";

// 4 upload result file
std::string resultFileHdfsPath = hdfsPathPrefix + jobInfo.outputFileName;
std::string resultFileLocalPath = localPathPrefix + jobInfo.outputFileName;
std::string resultFileHdfsPath = jobInfo.outputFilePath;
std::string resultFileLocalPath = localPathPrefix + MPC_RESULT_FILE;
writeStringToFile(outResult, resultFileLocalPath);

auto lineReader3 =
auto resultFileReader =
initialize_lineReader(jobInfo, resultFileLocalPath, DataResourceType::FILE);
auto lineWriter3 =
auto resultFileWriter =
initialize_lineWriter(jobInfo, resultFileHdfsPath, DataResourceType::HDFS);
readAndSaveFile(lineReader3, lineWriter3);
readAndSaveFile(resultFileLocalPath, resultFileHdfsPath, resultFileReader, resultFileWriter);

if (outExitStatus != MPC_SUCCESS)
{
message = "run mpc job failed";
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC(message);
BOOST_THROW_EXCEPTION(RunMpcFailException() << errinfo_comment(message));
}
else
{
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC(message);
response["code"] = MPC_SUCCESS;
response["message"] = "success";
}
if (boost::filesystem::exists(localPathPrefix))
{
boost::filesystem::remove_all(localPathPrefix);
}
removeAllFiles(std::vector<std::string>{localPathPrefix, mpcFileLocalPath});
}
catch (const std::exception& e)
{
removeAllFiles(std::vector<std::string>{localPathPrefix, mpcFileLocalPath});

const std::string diagnostic_information = std::string(boost::diagnostic_information(e));
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC("run mpc job failed:")
MPC_LOG(INFO) << LOG_DESC("[MPCService][doRun]") << LOG_DESC("run mpc job failed")
<< LOG_DESC(diagnostic_information);
response["code"] = MPC_FAILED;
response["message"] = diagnostic_information;
}
MPC_LOG(INFO) << LOG_DESC("run mpc") << LOG_KV("timecost(ms)", utcSteadyTime() - startT);

MPC_LOG(INFO) << LOG_DESC("run mpc") << LOG_KV("request", request.toStyledString())<< LOG_KV("timecost(ms)", utcSteadyTime() - startT);
}

void MPCService::runMpcRpc(Json::Value const& request, RespFunc func)
Expand Down Expand Up @@ -200,7 +234,7 @@ void MPCService::writeStringToFile(const std::string& content, const std::string
file << buffer.str();
}

void MPCService::readAndSaveFile(LineReader::Ptr lineReader, LineWriter::Ptr lineWriter)
void MPCService::readAndSaveFile(const std::string &readerFilePath, const std::string &writerFilePath, LineReader::Ptr lineReader, LineWriter::Ptr lineWriter)
{
uint64_t lineSize = 0;
int64_t readPerBatchLines = m_mpcConfig.readPerBatchLines;
Expand All @@ -216,7 +250,10 @@ void MPCService::readAndSaveFile(LineReader::Ptr lineReader, LineWriter::Ptr lin
lineWriter->writeLine(dataBatch, DataSchema::String, "\n");
}
lineWriter->close();
MPC_LOG(INFO) << LOG_DESC("save file ok") << LOG_KV("file lines", lineSize);
MPC_LOG(INFO) << LOG_DESC("save file ok")
<< LOG_KV("readerFilePath", readerFilePath)
<< LOG_KV("writerFilePath", writerFilePath)
<< LOG_KV("file lines", lineSize);
}

LineReader::Ptr MPCService::initialize_lineReader(
Expand Down Expand Up @@ -282,8 +319,9 @@ JobInfo MPCService::paramsToJobInfo(const Json::Value& params)
jobInfo.selfIndex = params["selfIndex"].asInt();
jobInfo.isMalicious = params["isMalicious"].asBool();
jobInfo.bitLength = params["bitLength"].asInt();
jobInfo.inputFileName = params["inputFileName"].asString();
jobInfo.outputFileName = params["outputFileName"].asString();
jobInfo.mpcFilePath = params["mpcFilePath"].asString();
jobInfo.inputFilePath = params["inputFilePath"].asString();
jobInfo.outputFilePath = params["outputFilePath"].asString();
jobInfo.gatewayEngineEndpoint = params["gatewayEngineEndpoint"].asString();
return jobInfo;
}
Expand All @@ -297,30 +335,48 @@ JobInfo MPCService::paramsToJobInfo(const Json::Value& params)

void MPCService::makeCommand(std::string& cmd, const JobInfo& jobInfo)
{
std::string jobId = jobInfo.jobId;
std::string mpcRootPath = m_mpcConfig.mpcRootPath;
if (jobInfo.mpcNodeUseGateway)
{
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] use gateway to connect node");
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] use gateway to connect node")
<< LOG_KV("jobId", jobId);
}
else
{
mpcRootPath = m_mpcConfig.mpcRootPathNoGateway;
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] direct connect node");
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] direct connect node")
<< LOG_KV("jobId", jobId);
}
int r = chdir(mpcRootPath.c_str());
if (r == 0)
{
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] change dir ok");
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand] change dir ok")
<< LOG_KV("jobId", jobId);
}
else
{
MPC_LOG(ERROR) << LOG_DESC("[MPCService][makeCommand] change dir fail")
<< LOG_KV("mpcRootPath", mpcRootPath)
<< LOG_KV("ret", r)
<< LOG_KV("jobId", jobId);
;
}
std::string compileFilePath = mpcRootPath + PATH_SEPARATOR + MPC_ALGORITHM_COMPILER;
int participantCount = jobInfo.participantCount;
int selfIndex = jobInfo.selfIndex;
bool isMalicious = jobInfo.isMalicious;
std::string mpcBinFileName;
std::string compileOption;
getMpcProtocol(participantCount, isMalicious, mpcBinFileName, compileOption);

if (!boost::filesystem::exists(compileFilePath))
{
MPC_LOG(ERROR) << LOG_DESC("[MPCService] compile file not exist")
<< LOG_KV("compileFilePath", compileFilePath)
<< LOG_KV("jobId", jobId);
;

BOOST_THROW_EXCEPTION(MpcCompilerNotExistException()
<< errinfo_comment("compile file not exist:" + compileFilePath));
}
Expand Down Expand Up @@ -359,7 +415,7 @@ void MPCService::makeCommand(std::string& cmd, const JobInfo& jobInfo)
{
cmd += "-N " + std::to_string(jobInfo.participantCount) + " ";
}
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand]") << LOG_KV("mpcCmd", cmd);
MPC_LOG(INFO) << LOG_DESC("[MPCService][makeCommand]") << LOG_KV("jobId", jobId) << LOG_KV("mpcCmd", cmd);
}

void MPCService::getMpcProtocol(const int participantCount, const bool isMalicious,
Expand Down
4 changes: 3 additions & 1 deletion cpp/wedpr-computing/ppc-mpc/src/MPCService.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ class MPCService
void execCommand(const std::string cmd, int& outExitStatus, std::string& outResult);

void writeStringToFile(const std::string& content, const std::string& filePath);
void readAndSaveFile(ppc::io::LineReader::Ptr lineReader, ppc::io::LineWriter::Ptr lineWriter);
void readAndSaveFile(const std::string &readerFilePath, const std::string &writerFilePath,ppc::io::LineReader::Ptr lineReader, ppc::io::LineWriter::Ptr lineWriter);
ppc::io::LineReader::Ptr initialize_lineReader(const JobInfo& jobInfo,
const std::string& readerFilePath, ppc::protocol::DataResourceType type);
ppc::io::LineWriter::Ptr initialize_lineWriter(const JobInfo& jobInfo,
const std::string& writerFilePath, ppc::protocol::DataResourceType type);

void removeAllFiles(const std::vector<std::string> &files);

private:
ppc::tools::MPCConfig m_mpcConfig;
ppc::tools::StorageConfig m_storageConfig;
Expand Down
4 changes: 2 additions & 2 deletions cpp/wedpr-computing/ppc-mpc/tests/TestMPCService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ BOOST_AUTO_TEST_CASE(testMPCService)
jobInfo.selfIndex = 0;
jobInfo.isMalicious = false;
jobInfo.bitLength = 128;
jobInfo.inputFileName = "mpc_prepare.csv";
jobInfo.outputFileName = "mpc_output.txt";
jobInfo.inputFilePath = "mpc_prepare.csv";
jobInfo.outputFilePath = "mpc_output.txt";
jobInfo.gatewayEngineEndpoint = "127.0.0.1:6789";

mpcService->makeCommand(cmd, jobInfo);
Expand Down

0 comments on commit b258d80

Please sign in to comment.