Skip to content

Commit

Permalink
unregister front taskInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Nov 25, 2024
1 parent ee53770 commit f7616af
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 3 deletions.
3 changes: 2 additions & 1 deletion cpp/wedpr-main/air-node/AirNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ void AirNodeInitializer::init(std::string const& _configPath)

INIT_LOG(INFO) << LOG_DESC("init the rpc");
// init RpcStatusInterface
RpcStatusInterface::Ptr rpcStatusInterface = std::make_shared<ppc::rpc::RpcMemory>();
RpcStatusInterface::Ptr rpcStatusInterface =
std::make_shared<ppc::rpc::RpcMemory>(m_nodeInitializer->ppcFront());


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
Expand Down
3 changes: 2 additions & 1 deletion cpp/wedpr-main/pro-node/ProNodeInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ void ProNodeInitializer::init(std::string const& _configPath)

INIT_LOG(INFO) << LOG_DESC("init the rpc");
// init RpcStatusInterface
RpcStatusInterface::Ptr rpcStatusInterface = std::make_shared<ppc::rpc::RpcMemory>();
RpcStatusInterface::Ptr rpcStatusInterface =
std::make_shared<ppc::rpc::RpcMemory>(m_nodeInitializer->ppcFront());


auto rpcFactory = std::make_shared<RpcFactory>(m_nodeInitializer->config()->agencyID());
Expand Down
11 changes: 11 additions & 0 deletions cpp/wedpr-transport/ppc-rpc/src/RpcMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ bcos::Error::Ptr RpcMemory::insertTask(protocol::Task::Ptr _task)
{
return std::make_shared<bcos::Error>(PPCRetCode::WRITE_RPC_STATUS_ERROR, "task exists");
}
if (taskResult)
{
RPC_STATUS_LOG(INFO) << LOG_DESC("find the existed not running-task")
<< LOG_KV("task", _task->id())
<< LOG_KV("status", taskResult->status());
if (taskResult->status() != toString(TaskState::COMPLETED))
{
// erase the task_id
m_front->eraseTaskInfo(_task->id());
}
}
}
auto taskResult = std::make_shared<TaskResult>(_task->id());
taskResult->setStatus(toString(TaskStatus::RUNNING));
Expand Down
8 changes: 7 additions & 1 deletion cpp/wedpr-transport/ppc-rpc/src/RpcMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

#pragma once
#include "ppc-framework/front/FrontInterface.h"
#include "ppc-framework/rpc/RpcStatusInterface.h"
#include <bcos-utilities/Common.h>
#include <bcos-utilities/Timer.h>
Expand All @@ -31,7 +32,10 @@ class RpcMemory : public RpcStatusInterface
public:
using Ptr = std::shared_ptr<RpcMemory>;

RpcMemory() : m_taskCleaner(std::make_shared<bcos::Timer>(60 * 60 * 1000, "taskCleaner")) {}
RpcMemory(ppc::front::FrontInterface::Ptr front)
: m_front(std::move(front)),
m_taskCleaner(std::make_shared<bcos::Timer>(60 * 60 * 1000, "taskCleaner"))
{}
~RpcMemory() override = default;

void start() override;
Expand All @@ -45,6 +49,8 @@ class RpcMemory : public RpcStatusInterface
void cleanTask();

private:
ppc::front::FrontInterface::Ptr m_front;

mutable bcos::SharedMutex x_tasks;
std::unordered_map<std::string, std::pair<uint64_t, protocol::TaskResult::Ptr>> m_tasks;
std::shared_ptr<bcos::Timer> m_taskCleaner;
Expand Down

0 comments on commit f7616af

Please sign in to comment.