Skip to content

Commit

Permalink
fix cm2020 check hostResource
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Nov 25, 2024
1 parent 89b8a6b commit 1a0970c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 10 deletions.
10 changes: 5 additions & 5 deletions cpp/wedpr-computing/ppc-psi/src/cm2020-psi/CM2020PSIImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,14 @@ void CM2020PSIImpl::asyncRunTask()
{
return;
}

CM2020_PSI_LOG(INFO) << LOG_DESC("noticePeerToFinish") << printTaskInfo(task);
psi->noticePeerToFinish(task);
});
// check the memory
checkHostResource(m_config->minNeededMemoryGB());
addPendingTask(taskState);

try
{
addPendingTask(taskState);
// check the memory
checkHostResource(m_config->minNeededMemoryGB());
// prepare reader and writer
auto dataResource = task->selfParty()->dataResource();
auto reader = loadReader(task->id(), dataResource, DataSchema::Bytes);
Expand Down Expand Up @@ -321,6 +320,7 @@ void CM2020PSIImpl::stop()

void CM2020PSIImpl::onReceivedErrorNotification(const std::string& _taskID)
{
CM2020_PSI_LOG(INFO) << LOG_DESC("onReceivedErrorNotification") << LOG_KV("taskID", _taskID);
// finish the task while the peer is failed
auto taskState = findPendingTask(_taskID);
if (taskState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ void EcdhMultiPSIImpl::asyncRunTask(
psi->removePartner(taskID);
psi->removePendingTask(taskID);
});
addPendingTask(taskState);
// check the memory
checkHostResource(m_config->minNeededMemoryGB());
addPendingTask(taskState);
// over the peer limit
if (_task->getAllPeerParties().size() > c_max_peer_size)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ void LabeledPSIImpl::asyncRunTask(

psi->noticePeerToFinish(_task);
});
addPendingTask(taskState);
// check the memory
checkHostResource(m_config->minNeededMemoryGB());
addPendingTask(taskState);

auto oprfClient = std::make_shared<EcdhOprfClient>(
sizeof(apsi::Item::value_type) + sizeof(apsi::LabelKey), m_config->hash(),
Expand Down
11 changes: 8 additions & 3 deletions cpp/wedpr-computing/ppc-psi/src/psi-framework/TaskState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ void TaskState::removeGeneratedOutputFile()
{
return;
}
if (!m_writer)
{
return;
}
auto outputDataResource = m_task->selfParty()->dataResource();
if (!outputDataResource->desc())
{
Expand Down Expand Up @@ -271,10 +275,11 @@ void TaskState::onTaskFinished(TaskResult::Ptr _result, bool _noticePeer)
void TaskState::onPeerNotifyFinish()
{
PSI_LOG(WARNING) << LOG_BADGE("onReceivePeerError") << LOG_KV("taskID", m_task->id());
auto tesult = std::make_shared<TaskResult>(task()->id());
tesult->setError(std::make_shared<bcos::Error>(
auto result = std::make_shared<TaskResult>(task()->id());
result->setError(std::make_shared<bcos::Error>(
(int)PSIRetCode::PeerNotifyFinish, "job participant sent an error"));
onTaskFinished(std::move(tesult), false);
onTaskFinished(std::move(result), false);
removeGeneratedOutputFile();
}

// Note: must store the result serially
Expand Down

0 comments on commit 1a0970c

Please sign in to comment.