Skip to content

Commit

Permalink
fix task_result_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Oct 15, 2024
1 parent 16a758d commit 3dca79d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 38 deletions.
12 changes: 7 additions & 5 deletions python/ppc_dev/result/model_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ class ModelResult(BaseResult):

FEATURE_BIN_FILE = "feature_bin.json"
MODEL_DATA_FILE = utils.XGB_TREE_PERFIX + '.json'
TEST_MODEL_OUTPUT_FILE = "xgb_output.csv"
TRAIN_MODEL_OUTPUT_FILE = "xgb_train_output.csv"
TEST_MODEL_OUTPUT_FILE = "test_output.csv"
TRAIN_MODEL_OUTPUT_FILE = "train_output.csv"

def __init__(self, dataset: DataContext, job_id: str, job_type: str):

super().__init__(dataset.ctx)
self.job_id = job_id

participant_id_list = []
for dataset in self.dataset.datasets:
participant_id_list.append(dataset.agency.agency_id)
Expand All @@ -32,8 +32,10 @@ def _xgb_train_result(self):

# train_praba, test_praba, train_y, test_y, feature_importance, split_xbin, trees, params
# 从hdfs读取结果文件信息,构造为属性
train_praba_path = os.path.join(self.job_id, self.TRAIN_MODEL_OUTPUT_FILE)
test_praba_path = os.path.join(self.job_id, self.TEST_MODEL_OUTPUT_FILE)
train_praba_path = os.path.join(
self.job_id, self.TRAIN_MODEL_OUTPUT_FILE)
test_praba_path = os.path.join(
self.job_id, self.TEST_MODEL_OUTPUT_FILE)
train_output = HDFSApi.download(train_praba_path)
test_output = HDFSApi.download(test_praba_path)
self.train_praba = train_output['class_pred'].values
Expand Down
4 changes: 2 additions & 2 deletions python/ppc_model/common/base_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class BaseContext:
MODEL_DATA_FILE = utils.XGB_TREE_PERFIX + '.json'
TEST_MODEL_RESULT_FILE = "model_result.csv"
# TEST_MODEL_OUTPUT_FILE = "model_output.csv"
TEST_MODEL_OUTPUT_FILE = "xgb_output.csv"
TEST_MODEL_OUTPUT_FILE = "test_model_output.csv"
TRAIN_MODEL_RESULT_FILE = "train_model_result.csv"
# TRAIN_MODEL_OUTPUT_FILE = "train_model_output.csv"
TRAIN_MODEL_OUTPUT_FILE = "xgb_train_output.csv"
TRAIN_MODEL_OUTPUT_FILE = "train_model_output.csv"

MODEL_FILE = "model.kpl"
MODEL_ENC_FILE = "model_enc.kpl"
Expand Down
71 changes: 40 additions & 31 deletions python/ppc_model/model_result/task_result_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ppc_common.ppc_utils.utils import PpcException, PpcErrorCode
from ppc_common.ppc_utils import utils
from ppc_model.common.protocol import ModelTask
from ppc_model.common.base_context import BaseContext
from ppc_common.ppc_ml.model.algorithm_info import ClassificationType
from ppc_model.common.model_result import ResultFileHandling
from ppc_common.ppc_ml.model.algorithm_info import EvaluationType
Expand Down Expand Up @@ -194,16 +195,17 @@ def to_dict(self):
return self.result


class XGBJobResult:
class ModelJobResult:
DEFAULT_PROPERTY_NAME = "outputModelResult"
MODEL_RESULT = "ModelResult"
MODEL_RESULT_PATH = "modelResultPath"
TRAIN_RESULT_PATH = "trainResultPath"
TEST_RESULT_PATH = "testResultPath"
WOE_RESULT_PATH = "woeIVResultPath"

def __init__(self, job_id, components, property_name=DEFAULT_PROPERTY_NAME):
def __init__(self, xgb_job, job_id, components, property_name=DEFAULT_PROPERTY_NAME):
self.job_id = job_id
self.xgb_job = xgb_job
self.components = components
self.logger = components.logger()
self.property_name = property_name
Expand All @@ -212,12 +214,14 @@ def __init__(self, job_id, components, property_name=DEFAULT_PROPERTY_NAME):
self.model_result_path = None
self.train_result_path = None
self.woe_iv_result_path = None
self.xgb_result_path = None
self.model_result_path_dict = None
self.evaluation_table = None
self.feature_importance_table = None
self.iteration_metrics = None

def fetch_model_result(self):
def fetch_xgb_model_result(self):
if not self.xgb_job:
return
self.model_result_list = []
i = 0
# while True:
Expand All @@ -240,25 +244,25 @@ def load_result(self, result_path, result_property):
self.job_result = job_result_object.to_dict()

def load_model_result_path(self, predict: bool):
self.xgb_result_path = dict()
self.model_result_path_dict = dict()
self.model_result_path = ResultFileHandling.get_remote_path(
self.components, self.job_id, BaseContext.MODEL_DATA_FILE)
self.xgb_result_path.update(
{XGBJobResult.MODEL_RESULT_PATH: self.model_result_path})
self.model_result_path_dict.update(
{ModelJobResult.MODEL_RESULT_PATH: self.model_result_path})

self.train_result_path = ResultFileHandling.get_remote_path(
self.components, self.job_id, BaseContext.TRAIN_MODEL_OUTPUT_FILE)
self.xgb_result_path.update(
{XGBJobResult.TRAIN_RESULT_PATH: self.train_result_path})
self.model_result_path_dict.update(
{ModelJobResult.TRAIN_RESULT_PATH: self.train_result_path})

self.xgb_result_path.update(
{XGBJobResult.TEST_RESULT_PATH: ResultFileHandling.get_remote_path(
self.model_result_path_dict.update(
{ModelJobResult.TEST_RESULT_PATH: ResultFileHandling.get_remote_path(
self.components, self.job_id, BaseContext.TEST_MODEL_OUTPUT_FILE)})

self.woe_iv_result_path = ResultFileHandling.get_remote_path(
self.components, self.job_id, BaseContext.WOE_IV_FILE)
self.xgb_result_path.update(
{XGBJobResult.WOE_RESULT_PATH: self.woe_iv_result_path})
self.model_result_path_dict.update(
{ModelJobResult.WOE_RESULT_PATH: self.woe_iv_result_path})

def load_evaluation_table(self, evaluation_path, property):
evaluation_table_object = TableResult(self.components,
Expand All @@ -267,6 +271,8 @@ def load_evaluation_table(self, evaluation_path, property):
type=DataType.TABLE).to_dict()}

def load_feature_importance_table(self, feature_importance_path, property):
if not self.xgb_job:
return
feature_importance_table = TableResult(self.components,
self.job_id, ResultFileMeta(feature_importance_path))
self.feature_importance_table = {property: DataItem(name=property, data=feature_importance_table.to_dict(),
Expand Down Expand Up @@ -295,9 +301,9 @@ def to_dict(self):
result.update(self.feature_importance_table)
if self.iteration_metrics is not None:
result.update({self.iteration_property: self.iteration_metrics})
if self.xgb_result_path is not None:
if self.model_result_path_dict is not None:
result.update(
{XGBJobResult.MODEL_RESULT: self.xgb_result_path})
{ModelJobResult.MODEL_RESULT: self.model_result_path_dict})
return result


Expand All @@ -308,8 +314,11 @@ def __init__(self, task_result_request: TaskResultRequest, components):
self.logger = components.logger()
self.result_list = []
self.predict = False
if self.task_result_request.task_type == ModelTask.XGB_PREDICTING.name:
self.xgb_job = False
if self.task_result_request.task_type == ModelTask.XGB_PREDICTING.name or self.task_result_request.task_type == ModelTask.LR_PREDICTING.name:
self.predict = True
if self.task_result_request.task_type == ModelTask.XGB_PREDICTING.name or self.task_result_request.task_type == ModelTask.XGB_TRAINING.name:
self.xgb_job = True
self.logger.info(
f"Init jobResultHandler for: {self.task_result_request.job_id}")
self._get_evaluation_result()
Expand All @@ -323,7 +332,7 @@ def get_response(self):
return utils.make_response(PpcErrorCode.SUCCESS.get_code(), PpcErrorCode.SUCCESS.get_msg(), response)

def _get_evaluation_result(self):
if self.task_result_request.task_type == ModelTask.XGB_TRAINING.name:
if not self.predict:
# the train evaluation result
self.train_evaluation_result = JobEvaluationResult(
property_name="outputMetricsGraphs",
Expand All @@ -347,18 +356,18 @@ def _get_evaluation_result(self):
"mpc_metric_ks.csv", "KSTable")
self.result_list.append(self.validation_evaluation_result)

self.xgb_model = XGBJobResult(
self.task_result_request.job_id, self.components, XGBJobResult.DEFAULT_PROPERTY_NAME)
self.xgb_model.fetch_model_result()
self.model = ModelJobResult(self.xgb_job,
self.task_result_request.job_id, self.components, ModelJobResult.DEFAULT_PROPERTY_NAME)
self.model.fetch_xgb_model_result()
# the ks-auc table
self.xgb_model.load_evaluation_table(
self.model.load_evaluation_table(
utils.MPC_XGB_EVALUATION_TABLE, "EvaluationTable")
# the feature-importance table
self.xgb_model.load_feature_importance_table(
self.model.load_feature_importance_table(
utils.XGB_FEATURE_IMPORTANCE_TABLE, "FeatureImportance")
self.result_list.append(self.xgb_model)
self.result_list.append(self.model)
# the metrics iteration graph
self.xgb_model.load_iteration_metrics(
self.model.load_iteration_metrics(
utils.METRICS_OVER_ITERATION_FILE, "IterationGraph")

if self.predict:
Expand All @@ -374,13 +383,13 @@ def _get_evaluation_result(self):
"mpc_eval_metric_ks.csv", "KSTable")
self.result_list.append(self.predict_evaluation_result)

# load xgb_result
self.xgb_result = XGBJobResult(
self.task_result_request.job_id, self.components, XGBJobResult.DEFAULT_PROPERTY_NAME)
self.xgb_result.load_result(
"xgb_train_output.csv", "outputTrainPreview")
self.xgb_result.load_model_result_path(self.predict)
self.result_list.append(self.xgb_result)
# load model_result
self.model_result = ModelJobResult(self.xgb_job,
self.task_result_request.job_id, self.components, ModelJobResult.DEFAULT_PROPERTY_NAME)
self.model_result.load_result(
BaseContext.TRAIN_MODEL_OUTPUT_FILE, "outputTrainPreview")
self.model_result.load_model_result_path(self.predict)
self.result_list.append(self.model_result)

def _get_feature_processing_result(self):
self.feature_processing_result = FeatureProcessingResult(
Expand Down

0 comments on commit 3dca79d

Please sign in to comment.