Skip to content

Commit

Permalink
Merge pull request #13 from Shareong/feature-milestone2
Browse files Browse the repository at this point in the history
add scheduler
  • Loading branch information
Shareong authored Aug 29, 2024
2 parents b6cee04 + d34dece commit 328f2d5
Show file tree
Hide file tree
Showing 75 changed files with 3,455 additions and 1,425 deletions.
10 changes: 10 additions & 0 deletions python/ppc_common/db_models/computing_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

from ppc_common.db_models import db


class ComputingNodeRecord(db.Model):
__tablename__ = 't_computing_node'
id = db.Column(db.String(255), primary_key=True)
url = db.Column(db.String(255))
type = db.Column(db.String(255))
loading = db.Column(db.Integer)
26 changes: 26 additions & 0 deletions python/ppc_common/db_models/config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

CREATE TABLE t_job_worker (
worker_id VARCHAR(100) PRIMARY KEY,
job_id VARCHAR(255) INDEX,
type VARCHAR(255),
status VARCHAR(255),
upstreams TEXT,
inputs_statement TEXT,
outputs TEXT,
create_time BIGINT,
update_time BIGINT
)ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_unicode_ci;

CREATE TABLE t_computing_node (
id VARCHAR(255) PRIMARY KEY,
url VARCHAR(255),
type VARCHAR(255),
loading INT
)ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_unicode_ci;


INSERT INTO t_computing_node (id, url, type, loading)
VALUES
("001", '127.0.0.1:10200', 'PSI', 0),
("002", '127.0.0.1:10201', 'MPC', 0),
("003", '127.0.0.1:10202', 'MODEL', 0);
14 changes: 0 additions & 14 deletions python/ppc_common/db_models/job_unit_record.py

This file was deleted.

15 changes: 15 additions & 0 deletions python/ppc_common/db_models/job_worker_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

from ppc_common.db_models import db


class JobWorkerRecord(db.Model):
__tablename__ = 't_job_worker'
worker_id = db.Column(db.String(100), primary_key=True)
job_id = db.Column(db.String(255), index=True)
type = db.Column(db.String(255))
status = db.Column(db.String(255))
upstreams = db.Column(db.Text)
inputs_statement = db.Column(db.Text)
outputs = db.Column(db.Text)
create_time = db.Column(db.BigInteger)
update_time = db.Column(db.BigInteger)
24 changes: 12 additions & 12 deletions python/ppc_common/ppc_async_executor/async_subprocess_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@ def __init__(self, logger):
self._cleanup_thread.daemon = True
self._cleanup_thread.start()

def execute(self, task_id: str, target: Callable, on_target_finish: Callable[[str, bool, Exception], None],
def execute(self, target_id: str, target: Callable, on_target_finish: Callable[[str, bool, Exception], None],
args=()):
process = multiprocessing.Process(target=target, args=args)
process.start()
with self.lock:
self.processes[task_id] = process
self.processes[target_id] = process

def kill(self, task_id: str):
def kill(self, target_id: str):
with self.lock:
if task_id not in self.processes:
if target_id not in self.processes:
return False
else:
process = self.processes[task_id]
process = self.processes[target_id]

process.terminate()
self.logger.info(f"Task {task_id} has been terminated!")
self.logger.info(f"Target {target_id} has been terminated!")
return True

def kill_all(self):
with self.lock:
keys = self.processes.keys()

for task_id in keys:
self.kill(task_id)
for target_id in keys:
self.kill(target_id)

def _loop_cleanup(self):
while True:
Expand All @@ -48,13 +48,13 @@ def _loop_cleanup(self):
def _cleanup_finished_processes(self):
with self.lock:
finished_processes = [
(task_id, proc) for task_id, proc in self.processes.items() if not proc.is_alive()]
(target_id, proc) for target_id, proc in self.processes.items() if not proc.is_alive()]

for task_id, process in finished_processes:
for target_id, process in finished_processes:
with self.lock:
process.join() # 确保进程资源释放
del self.processes[task_id]
self.logger.info(f"Cleanup finished task process {task_id}")
del self.processes[target_id]
self.logger.info(f"Cleanup finished process {target_id}")

def __del__(self):
self.kill_all()
32 changes: 16 additions & 16 deletions python/ppc_common/ppc_async_executor/async_thread_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,44 @@ def __init__(self, event_manager: ThreadEventManager, logger):
self._cleanup_thread.daemon = True
self._cleanup_thread.start()

def execute(self, task_id: str, target: Callable, on_target_finish: Callable[[str, bool, Exception], None],
def execute(self, target_id: str, target: Callable, on_target_finish: Callable[[str, bool, Exception], None],
args=()):
def thread_target(logger, on_finish, *args):
try:
target(*args)
on_finish(task_id, True)
on_finish(target_id, True)
except Exception as e:
logger.warn(traceback.format_exc())
on_finish(task_id, False, e)
on_finish(target_id, False, e)

thread = threading.Thread(target=thread_target, args=(
self.logger, on_target_finish,) + args)
thread.start()

with self.lock:
self.threads[task_id] = thread
self.threads[target_id] = thread

stop_event = threading.Event()
self.event_manager.add_event(task_id, stop_event)
self.event_manager.add_event(target_id, stop_event)

def kill(self, task_id: str):
def kill(self, target_id: str):
with self.lock:
if task_id not in self.threads:
if target_id not in self.threads:
return False
else:
thread = self.threads[task_id]
thread = self.threads[target_id]

self.event_manager.set_event(task_id)
self.event_manager.set_event(target_id)
thread.join()
self.logger.info(f"Task {task_id} has been stopped!")
self.logger.info(f"Target {target_id} has been stopped!")
return True

def kill_all(self):
with self.lock:
keys = self.threads.keys()

for task_id in keys:
self.kill(task_id)
for target_id in keys:
self.kill(target_id)

def _loop_cleanup(self):
while True:
Expand All @@ -64,12 +64,12 @@ def _loop_cleanup(self):
def _cleanup_finished_threads(self):
with self.lock:
finished_threads = [
task_id for task_id, thread in self.threads.items() if not thread.is_alive()]
target_id for target_id, thread in self.threads.items() if not thread.is_alive()]

for task_id in finished_threads:
for target_id in finished_threads:
with self.lock:
del self.threads[task_id]
self.logger.info(f"Cleanup finished task thread {task_id}")
del self.threads[target_id]
self.logger.info(f"Cleanup finished thread {target_id}")

def __del__(self):
self.kill_all()
24 changes: 12 additions & 12 deletions python/ppc_common/ppc_async_executor/thread_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@ def __init__(self):
self.events: Dict[str, threading.Event] = {}
self.rw_lock = rwlock.RWLockWrite()

def add_event(self, task_id: str, event: threading.Event) -> None:
def add_event(self, target_id: str, event: threading.Event) -> None:
with self.rw_lock.gen_wlock():
self.events[task_id] = event
self.events[target_id] = event

def remove_event(self, task_id: str):
def remove_event(self, target_id: str):
with self.rw_lock.gen_wlock():
if task_id in self.events:
del self.events[task_id]
if target_id in self.events:
del self.events[target_id]

def set_event(self, task_id: str):
def set_event(self, target_id: str):
with self.rw_lock.gen_wlock():
if task_id in self.events:
self.events[task_id].set()
if target_id in self.events:
self.events[target_id].set()
else:
raise KeyError(f"Task ID {task_id} not found")
raise KeyError(f"Target id {target_id} not found")

def event_status(self, task_id: str) -> bool:
def event_status(self, target_id: str) -> bool:
with self.rw_lock.gen_rlock():
if task_id in self.events:
return self.events[task_id].is_set()
if target_id in self.events:
return self.events[target_id].is_set()
else:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class DataSetHandlerInitialize:
def __init__(self, config, logger):
self._config = config
self._logger = logger
self._init_sql_storage()
# self._init_sql_storage()
self._init_remote_storage()
self._init_dataset_factory()

Expand Down
58 changes: 13 additions & 45 deletions python/ppc_common/ppc_protos/generated/ppc_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 328f2d5

Please sign in to comment.