Skip to content

Commit

Permalink
[feat] optimize stage function, with template stage.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Dec 30, 2024
1 parent ccbe0e4 commit 772818c
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 39 deletions.
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ static const char* CGRAPH_STR_ASPECT = "aspect";
static const char* CGRAPH_STR_EVENT = "event";
static const char* CGRAPH_STR_FENCE = "fence";
static const char* CGRAPH_STR_COORDINATOR = "coordinator";
static const char* CGRAPH_STR_STAGE = "stage";

CGRAPH_NAMESPACE_END

Expand Down
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphParam/GPassedDefaultParam.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class GPassedDefaultParam final : public GPassedParam {
using GAspectDefaultParam = GPassedDefaultParam;
using GDaemonDefaultParam = GPassedDefaultParam;
using GEventDefaultParam = GPassedDefaultParam;
using GStageDefaultParam = GPassedDefaultParam;

CGRAPH_NAMESPACE_END

Expand Down
2 changes: 2 additions & 0 deletions src/GraphCtrl/GraphParam/GPassedParam.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ using GAspectParam = GPassedParam;
using GDaemonParam = GPassedParam;
using GElementParam = GPassedParam;
using GEventParam = GPassedParam;
using GStageParam = GPassedParam;
using GPassedParamPtr = GPassedParam *;
using GAspectParamPtr = GAspectParam *;
using GDaemonParamPtr = GDaemonParam *;
using GElementParamPtr = GElementParam *;
using GEventParamPtr = GEventParam *;
using GStageParamPtr = GStageParam *;

using GElementParamMap = std::unordered_map<std::string, GElementParamPtr>;

Expand Down
9 changes: 0 additions & 9 deletions src/GraphCtrl/GraphPipeline/GPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,6 @@ CStatus GPipeline::perf(std::ostream& oss) {
}


GPipelinePtr GPipeline::addGStage(const std::string& key, CInt threshold) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(stage_manager_)
CGRAPH_THROW_EXCEPTION_BY_CONDITION(threshold <= 0, "threshold value must bigger than 0")

stage_manager_->create(key, threshold);
return this;
}


GPipelinePtr GPipeline::setGEngineType(GEngineType type) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
Expand Down
8 changes: 7 additions & 1 deletion src/GraphCtrl/GraphPipeline/GPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,17 @@ class GPipeline : public GPipelineObject,

/**
* 添加一个阶段
* @tparam TStage
* @tparam TParam
* @param key
* @param threshold
* @param param
* @return
*/
GPipeline* addGStage(const std::string& key, CInt threshold);
template<typename TStage, typename TParam = GStageDefaultParam,
c_enable_if_t<std::is_base_of<GStage, TStage>::value, int> = 0,
c_enable_if_t<std::is_base_of<GStageParam, TParam>::value, int> = 0>
GPipeline* addGStage(const std::string& key, CInt threshold, TParam* param = nullptr);

/**
* 设置引擎策略
Expand Down
14 changes: 14 additions & 0 deletions src/GraphCtrl/GraphPipeline/GPipeline.inl
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,20 @@ GPipelinePtr GPipeline::addGEvent(const std::string& key, TParam* param) {
return this;
}


template<typename TStage, typename TParam,
c_enable_if_t<std::is_base_of<GStage, TStage>::value, int>,
c_enable_if_t<std::is_base_of<GStageParam, TParam>::value, int>>
GPipelinePtr GPipeline::addGStage(const std::string& key, CInt threshold, TParam* param) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(stage_manager_, param_manager_)
CGRAPH_THROW_EXCEPTION_BY_CONDITION(threshold <= 0, "threshold value must bigger than 0")

stage_manager_->setGParamManager(param_manager_);
stage_manager_->create<TStage, TParam>(key, threshold, param);
return this;
}

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GPIPELINE_INL
52 changes: 52 additions & 0 deletions src/GraphCtrl/GraphStage/GStage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: GStage.cpp
@Time: 2024/12/28 23:02
@Desc:
***************************/

#include "GStage.h"

CGRAPH_NAMESPACE_BEGIN

CVoid GStage::launch(GStageParamPtr param) {
}


GStage::GStage() {
session_ = URandom<>::generateSession(CGRAPH_STR_STAGE);
}


GStage::~GStage() {
CGRAPH_DELETE_PTR(param_);
}


GStagePtr GStage::setThreshold(CInt threshold) {
threshold_ = threshold;
return this;
}


CVoid GStage::waiting() {
{
CGRAPH_LOCK_GUARD wm(waiting_mutex_);
cur_value_++;
if (cur_value_ >= threshold_) {
// 如果超过了 threshold,则打开全部
launch(param_);
cur_value_ = 0;
locker_.cv_.notify_all();
return;
}
}

CGRAPH_UNIQUE_LOCK lk(locker_.mtx_);
locker_.cv_.wait(lk, [this] {
return 0 == cur_value_ || cur_value_ >= threshold_;
});
}

CGRAPH_NAMESPACE_END
58 changes: 33 additions & 25 deletions src/GraphCtrl/GraphStage/GStage.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,50 @@
CGRAPH_NAMESPACE_BEGIN

class GStage : public GStageObject {
private:
GStage() = default;
protected:
/**
* stage 解开的时候进入的函数
* @param param
* @return
*/
virtual CVoid launch(GStageParamPtr param);

protected:
GStage();
~GStage() override;

private:
/**
* 设置阈值信息
* @param threshold
* @return
*/
GStage* setThreshold(CInt threshold) {
threshold_ = threshold;
return this;
}
GStage* setThreshold(CInt threshold);

/**
* 设置参数信息
* @tparam T
* @param param
* @return
*/
template <typename T,
c_enable_if_t<std::is_base_of<GStageParam, T>::value, int> = 0>
GStage* setSParam(T* param);

/**
* 进入等待区域
* @return
*/
CVoid waiting() {
{
CGRAPH_LOCK_GUARD wm(waiting_mutex_);
cur_value_++;
if (cur_value_ >= threshold_) {
// 如果超过了 threshold,则打开全部
cur_value_ = 0;
locker_.cv_.notify_all();
return;
}
}

CGRAPH_UNIQUE_LOCK lk(locker_.mtx_);
locker_.cv_.wait(lk, [this] {
return 0 == cur_value_ || cur_value_ >= threshold_;
});
}
CVoid waiting();

CGRAPH_DECLARE_GPARAM_MANAGER_WRAPPER_WITH_MEMBER

CGRAPH_NO_ALLOWED_COPY(GStage)

private:
CInt threshold_ { 0 }; // 阈值信息
CInt cur_value_ { 0 }; // 当前值
CInt threshold_ { 0 }; // 阈值信息
CInt cur_value_ { 0 }; // 当前值
GStageParamPtr param_ { nullptr }; // 参数信息
UCvMutex locker_;
std::mutex waiting_mutex_;

Expand All @@ -67,4 +73,6 @@ using GStagePtr = GStage *;

CGRAPH_NAMESPACE_END

#include "GStage.inl"

#endif //CGRAPH_GSTAGE_H
30 changes: 30 additions & 0 deletions src/GraphCtrl/GraphStage/GStage.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: GStage.inl
@Time: 2024/12/30 9:46 下午
@Desc:
***************************/

#ifndef CGRAPH_GSTAGE_INL
#define CGRAPH_GSTAGE_INL

#include "GStage.h"

CGRAPH_NAMESPACE_BEGIN

template <typename T,
c_enable_if_t<std::is_base_of<GStageParam, T>::value, int>>
GStagePtr GStage::setSParam(T* param) {
if (param) {
CGRAPH_DELETE_PTR(param_);
param_ = CGRAPH_SAFE_MALLOC_COBJECT(T);
param_->clone(static_cast<T *>(param));
}

return this;
}

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GSTAGE_INL
14 changes: 12 additions & 2 deletions src/GraphCtrl/GraphStage/GStageManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,22 @@ class GStageManager : public GStageObject,

/**
* 创建 stage 信息
* @tparam TStage
* @tparam TParam
* @param key
* @param threshold
* @param param
* @return
*/
CStatus create(const std::string& key, CInt threshold) {
template<typename TStage, typename TParam = GStageDefaultParam,
c_enable_if_t<std::is_base_of<GStage, TStage>::value, int> = 0,
c_enable_if_t<std::is_base_of<GStageParam, TParam>::value, int> = 0>
CStatus create(const std::string& key, CInt threshold, TParam* param) {
CGRAPH_FUNCTION_BEGIN
auto stage = CGRAPH_SAFE_MALLOC_COBJECT(GStage);
GStagePtr stage = CGRAPH_SAFE_MALLOC_COBJECT(TStage);
stage->setThreshold(threshold);
stage->setSParam(param);
stage->setGParamManager(param_manager_);
stage_map_.insert(std::pair<std::string, GStagePtr>(key, stage));
CGRAPH_FUNCTION_END
}
Expand All @@ -78,6 +86,8 @@ class GStageManager : public GStageObject,
CGRAPH_FUNCTION_END
}

CGRAPH_DECLARE_GPARAM_MANAGER_WRAPPER_WITH_MEMBER

private:
std::unordered_map<std::string, GStagePtr> stage_map_ {}; // stage 集合

Expand Down
4 changes: 3 additions & 1 deletion src/GraphCtrl/GraphStage/GStageObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
#include <atomic>

#include "../GraphObject.h"
#include "../GraphParam/GParamInclude.h"

CGRAPH_NAMESPACE_BEGIN

class GStageObject : public GraphObject {
class GStageObject : public GraphObject,
public CDescInfo {
private:
CStatus run() final {
CGRAPH_NO_SUPPORT
Expand Down
2 changes: 1 addition & 1 deletion tutorial/T28-Stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void tutorial_stage() {
* 添加一个 stage 信息,遇到 3次的时候,会通知触发结束
* 正好对应上面三个 MyStageNode 的逻辑
*/
pipeline->addGStage(kStageKey, 3);
pipeline->addGStage<GStage>(kStageKey, 3);

pipeline->process();
GPipelineFactory::remove(pipeline);
Expand Down

0 comments on commit 772818c

Please sign in to comment.