Skip to content

Commit

Permalink
use T_MODEL insteadOf t_proprecessing
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Oct 15, 2024
1 parent 44512e3 commit 16a758d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 49 deletions.
38 changes: 19 additions & 19 deletions python/ppc_scheduler/demo/default_flow_config_sample.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
flow_dict = {

"SHELL": [
{
"index": 1,
Expand All @@ -24,7 +24,7 @@
]
}
],

"PSI": [
{
"index": 1,
Expand Down Expand Up @@ -61,18 +61,18 @@
"PREPROCESSING": [
{
"index": 1,
"type": "T_PREPROCESSING"
"type": "T_MODEL"
}
],

"FEATURE_ENGINEERING": [
{
"index": 1,
"type": "T_PREPROCESSING"
"type": "T_MODEL"
},
{
"index": 2,
"type": "T_FEATURE_ENGINEERING",
"type": "T_MODEL",
"upstreams": [
{
"index": 1
Expand All @@ -84,11 +84,11 @@
"TRAINING": [
{
"index": 1,
"type": "T_PREPROCESSING"
"type": "T_MODEL"
},
{
"index": 2,
"type": "T_TRAINING",
"type": "T_MODEL",
"upstreams": [
{
"index": 1
Expand All @@ -100,11 +100,11 @@
"PREDICTION": [
{
"index": 1,
"type": "T_PREPROCESSING"
"type": "T_MODEL"
},
{
"index": 2,
"type": "T_PREDICTION",
"type": "T_MODEL",
"upstreams": [
{
"index": 1
Expand All @@ -116,11 +116,11 @@
"FEATURE_ENGINEERING_TRAINING": [
{
"index": 1,
"type": "T_PREPROCESSING"
"type": "T_MODEL"
},
{
"index": 2,
"type": "T_FEATURE_ENGINEERING",
"type": "T_MODEL",
"upstreams": [
{
"index": 1
Expand All @@ -129,7 +129,7 @@
},
{
"index": 3,
"type": "T_TRAINING",
"type": "T_MODEL",
"upstreams": [
{
"index": 2
Expand All @@ -145,7 +145,7 @@
},
{
"index": 2,
"type": "T_PREPROCESSING",
"type": "T_MODEL",
"upstreams": [
{
"index": 1
Expand All @@ -154,7 +154,7 @@
},
{
"index": 3,
"type": "T_FEATURE_ENGINEERING",
"type": "T_MODEL",
"upstreams": [
{
"index": 2
Expand All @@ -170,7 +170,7 @@
},
{
"index": 2,
"type": "T_PREPROCESSING",
"type": "T_MODEL",
"upstreams": [
{
"index": 1
Expand All @@ -179,7 +179,7 @@
},
{
"index": 3,
"type": "T_TRAINING",
"type": "T_MODEL",
"upstreams": [
{
"index": 2
Expand All @@ -195,7 +195,7 @@
},
{
"index": 2,
"type": "T_PREPROCESSING",
"type": "T_MODEL",
"upstreams": [
{
"index": 1
Expand All @@ -204,7 +204,7 @@
},
{
"index": 3,
"type": "T_FEATURE_ENGINEERING",
"type": "T_MODEL",
"upstreams": [
{
"index": 2
Expand All @@ -213,7 +213,7 @@
},
{
"index": 4,
"type": "T_TRAINING",
"type": "T_MODEL",
"upstreams": [
{
"index": 3
Expand Down
12 changes: 5 additions & 7 deletions python/ppc_scheduler/node/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,22 @@
class ComputingNodeManager:
type_map = {
WorkerType.T_PSI: 'PSI',
WorkerType.T_ML_PSI: 'PSI',
WorkerType.T_MPC: 'MPC',
WorkerType.T_PREPROCESSING: 'MODEL',
WorkerType.T_FEATURE_ENGINEERING: 'MODEL',
WorkerType.T_TRAINING: 'MODEL',
WorkerType.T_PREDICTION: 'MODEL',
WorkerType.T_MODEL: 'MODEL'
}

def __init__(self, components):
self.components = components

def add_node(self, node_id: str, url: str, worker_type: str):
with self.components.create_sql_session() as session:
computing_node_mapper.insert_computing_node(session, node_id, url, self.type_map[worker_type], 0)
computing_node_mapper.insert_computing_node(
session, node_id, url, self.type_map[worker_type], 0)

def remove_node(self, url: str, worker_type: str):
with self.components.create_sql_session() as session:
computing_node_mapper.delete_computing_node(session, url, self.type_map[worker_type])
computing_node_mapper.delete_computing_node(
session, url, self.type_map[worker_type])

def get_node(self, worker_type: str):
with self.components.create_sql_session() as session:
Expand Down
6 changes: 1 addition & 5 deletions python/ppc_scheduler/workflow/common/worker_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ class WorkerType:

# specific job worker
T_PSI = 'PSI'
T_ML_PSI = 'ML_PSI'
T_MPC = 'MPC'
T_PREPROCESSING = 'PREPROCESSING'
T_FEATURE_ENGINEERING = 'FEATURE_ENGINEERING'
T_TRAINING = 'XGB_TRAINING'
T_PREDICTION = 'XGB_PREDICTING'
T_MODEL = "MODEL"

# finish job
T_ON_SUCCESS = 'T_ON_SUCCESS'
Expand Down
22 changes: 11 additions & 11 deletions python/ppc_scheduler/workflow/worker/engine/model_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ppc_scheduler.workflow.common.worker_type import WorkerType
from ppc_scheduler.workflow.worker.engine.work_engine import WorkerEngine


class ModelWorkerEngine(WorkerEngine):
def __init__(self, model_client, worker_type, worker_id, components, job_context: JobContext):
self.model_client = model_client
Expand All @@ -14,25 +15,24 @@ def __init__(self, model_client, worker_type, worker_id, components, job_context
self.logger = self.components.logger()

def run(self, *args) -> list:
if self.worker_type == WorkerType.T_PREPROCESSING\
or self.worker_type == WorkerType.T_FEATURE_ENGINEERING\
or self.worker_type == WorkerType.T_TRAINING\
or self.worker_type == WorkerType.T_PREDICTION:
if self.worker_type == WorkerType.T_MODEL:
pass
else:
raise ValueError(f"Unsupported worker type: {self.worker_type}")

job_id = self.job_context.job_id
start_time = time.time()

self.logger.info(f"## model engine run begin, job_id={job_id}, worker_id={self.worker_id}, args: {args}")

self.logger.info(
f"## model engine run begin, job_id={job_id}, worker_id={self.worker_id}, args: {args}")

# send job request to model node and wait for the job to finish
self.model_client.run(*args)

time_costs = time.time() - start_time
self.logger.info(f"## model engine run finished, job_id={job_id}, timecost: {time_costs}s")

self.logger.info(
f"## model engine run finished, job_id={job_id}, timecost: {time_costs}s")

# args = {
# 'job_id': job_id,
# 'task_id': task_id,
Expand All @@ -48,5 +48,5 @@ def run(self, *args) -> list:
# self.log.info(
# f"call compute_xgb_job service success, job: {job_id}, "
# f"task_id: {task_id}, timecost: {time.time() - start}")

return []
11 changes: 4 additions & 7 deletions python/ppc_scheduler/workflow/worker/worker_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ def build_worker(job_context, worker_id, worker_type, worker_args, *args, **kwar
return PythonWorker(components, job_context, worker_id, worker_type, worker_args, *args, *kwargs)
elif worker_type == WorkerType.T_SHELL:
return ShellWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs)
elif worker_type == WorkerType.T_PSI or \
worker_type == WorkerType.T_ML_PSI:
elif worker_type == WorkerType.T_PSI:
return PsiWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs)
elif worker_type == WorkerType.T_MPC:
return MpcWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs)
elif worker_type == WorkerType.T_PREPROCESSING or \
worker_type == WorkerType.T_FEATURE_ENGINEERING or \
worker_type == WorkerType.T_TRAINING or \
worker_type == WorkerType.T_PREDICTION:
elif worker_type == WorkerType.T_MODEL:
return ModelWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs)
elif worker_type == WorkerType.T_ON_SUCCESS or \
worker_type == WorkerType.T_ON_FAILURE:
return DefaultWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs)
else:
raise PpcException(PpcErrorCode.UNSUPPORTED_WORK_TYPE, f"Unsupported worker type: {worker_type}")
raise PpcException(PpcErrorCode.UNSUPPORTED_WORK_TYPE,
f"Unsupported worker type: {worker_type}")

0 comments on commit 16a758d

Please sign in to comment.