From 0ebbbf11668b65370790cafbf5432cac6ce85db2 Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Fri, 25 Oct 2024 11:51:02 +0800 Subject: [PATCH] remove unused ppc-scheduler --- .github/workflows/cpp_full_node_workflow.yml | 40 +- .github/workflows/cpp_sdk_workflow.yml | 2 + .github/workflows/cpp_toolkit_workflow.yml | 2 + cpp/cmake/grpc.cmake | 25 +- cpp/tools/install_depends.sh | 2 +- .../ppc-pir/tests/CMakeLists.txt | 2 +- .../ppc-pir/tests/data/AysPreDataset.csv | 3 + cpp/wedpr-protocol/grpc/CMakeLists.txt | 2 +- .../protocol/src/JsonTaskImpl.cpp | 2 +- cpp/wedpr-protocol/tars/TarsStruct.h | 15 +- python/ppc_scheduler/__init__.py | 0 python/ppc_scheduler/common/__init__.py | 0 python/ppc_scheduler/common/global_context.py | 3 - .../common/global_job_manager.py | 15 - python/ppc_scheduler/common/initializer.py | 72 --- python/ppc_scheduler/common/log_utils.py | 78 --- .../ppc_scheduler/conf/application-sample.yml | 13 - python/ppc_scheduler/conf/logging.conf | 40 -- python/ppc_scheduler/database/__init__.py | 0 .../database/computing_node_mapper.py | 74 --- .../database/job_worker_mapper.py | 52 -- .../demo/default_flow_config_sample.py | 224 --------- .../demo/prefect_workflow_sample.py | 79 --- python/ppc_scheduler/demo/web_submit_job.json | 134 ----- .../demo/workflow_args_sample.json | 73 --- python/ppc_scheduler/endpoints/__init__.py | 0 python/ppc_scheduler/endpoints/body_schema.py | 18 - .../ppc_scheduler/endpoints/job_controller.py | 70 --- python/ppc_scheduler/endpoints/restx.py | 35 -- python/ppc_scheduler/job/__init__.py | 0 python/ppc_scheduler/job/job_manager.py | 154 ------ python/ppc_scheduler/job/job_status.py | 5 - python/ppc_scheduler/job/job_type.py | 12 - .../ppc_scheduler/mpc_generator/__init__.py | 0 .../ppc_scheduler/mpc_generator/generator.py | 468 ------------------ .../mpc_generator/mpc_func_str.py | 64 --- .../mpc_sample/aggr_func_only.mpc | 136 ----- .../mpc_sample/aggr_func_with_group_by.mpc | 234 --------- .../mpc_sample/basic_arith_ope.mpc | 78 --- .../mpc_generator/test_generator.py | 101 ---- python/ppc_scheduler/node/__init__.py | 0 .../node/computing_node_client/__init__.py | 0 .../model_node_client.py | 95 ---- .../computing_node_client/mpc_node_client.py | 32 -- .../node/computing_node_client/psi_client.py | 74 --- .../node/computing_node_client/utils.py | 8 - python/ppc_scheduler/node/node_manager.py | 31 -- python/ppc_scheduler/ppc_scheduler_app.py | 125 ----- python/ppc_scheduler/scripts/start.sh | 69 --- python/ppc_scheduler/scripts/stop.sh | 42 -- python/ppc_scheduler/workflow/__init__.py | 0 .../workflow/builder/__init__.py | 0 .../workflow/builder/flow_builder.py | 86 ---- .../ppc_scheduler/workflow/common/__init__.py | 0 python/ppc_scheduler/workflow/common/codec.py | 19 - .../workflow/common/flow_utils.py | 34 -- .../workflow/common/job_context.py | 74 --- .../workflow/common/worker_status.py | 7 - .../workflow/common/worker_type.py | 14 - .../workflow/scheduler/__init__.py | 0 .../workflow/scheduler/scheduler.py | 100 ---- .../workflow/scheduler/scheduler_api.py | 8 - .../ppc_scheduler/workflow/worker/__init__.py | 0 .../workflow/worker/api_worker.py | 10 - .../workflow/worker/default_worker.py | 37 -- .../workflow/worker/engine/__init__.py | 0 .../workflow/worker/engine/model_engine.py | 52 -- .../workflow/worker/engine/mpc_engine.py | 31 -- .../workflow/worker/engine/psi_engine.py | 31 -- .../workflow/worker/engine/shell_engine.py | 16 - .../workflow/worker/engine/work_engine.py | 6 - .../workflow/worker/model_worker.py | 19 - .../workflow/worker/mpc_worker.py | 19 - .../workflow/worker/psi_worker.py | 21 - .../workflow/worker/python_worker.py | 10 - .../workflow/worker/shell_worker.py | 16 - .../ppc_scheduler/workflow/worker/worker.py | 117 ----- .../workflow/worker/worker_factory.py | 34 -- python/requirements.txt | 1 - 79 files changed, 61 insertions(+), 3504 deletions(-) create mode 100644 cpp/wedpr-computing/ppc-pir/tests/data/AysPreDataset.csv delete mode 100644 python/ppc_scheduler/__init__.py delete mode 100644 python/ppc_scheduler/common/__init__.py delete mode 100644 python/ppc_scheduler/common/global_context.py delete mode 100644 python/ppc_scheduler/common/global_job_manager.py delete mode 100644 python/ppc_scheduler/common/initializer.py delete mode 100644 python/ppc_scheduler/common/log_utils.py delete mode 100644 python/ppc_scheduler/conf/application-sample.yml delete mode 100644 python/ppc_scheduler/conf/logging.conf delete mode 100644 python/ppc_scheduler/database/__init__.py delete mode 100644 python/ppc_scheduler/database/computing_node_mapper.py delete mode 100644 python/ppc_scheduler/database/job_worker_mapper.py delete mode 100644 python/ppc_scheduler/demo/default_flow_config_sample.py delete mode 100644 python/ppc_scheduler/demo/prefect_workflow_sample.py delete mode 100644 python/ppc_scheduler/demo/web_submit_job.json delete mode 100644 python/ppc_scheduler/demo/workflow_args_sample.json delete mode 100644 python/ppc_scheduler/endpoints/__init__.py delete mode 100644 python/ppc_scheduler/endpoints/body_schema.py delete mode 100644 python/ppc_scheduler/endpoints/job_controller.py delete mode 100644 python/ppc_scheduler/endpoints/restx.py delete mode 100644 python/ppc_scheduler/job/__init__.py delete mode 100644 python/ppc_scheduler/job/job_manager.py delete mode 100644 python/ppc_scheduler/job/job_status.py delete mode 100644 python/ppc_scheduler/job/job_type.py delete mode 100644 python/ppc_scheduler/mpc_generator/__init__.py delete mode 100644 python/ppc_scheduler/mpc_generator/generator.py delete mode 100644 python/ppc_scheduler/mpc_generator/mpc_func_str.py delete mode 100644 python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_only.mpc delete mode 100644 python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_with_group_by.mpc delete mode 100644 python/ppc_scheduler/mpc_generator/mpc_sample/basic_arith_ope.mpc delete mode 100644 python/ppc_scheduler/mpc_generator/test_generator.py delete mode 100644 python/ppc_scheduler/node/__init__.py delete mode 100644 python/ppc_scheduler/node/computing_node_client/__init__.py delete mode 100644 python/ppc_scheduler/node/computing_node_client/model_node_client.py delete mode 100644 python/ppc_scheduler/node/computing_node_client/mpc_node_client.py delete mode 100644 python/ppc_scheduler/node/computing_node_client/psi_client.py delete mode 100644 python/ppc_scheduler/node/computing_node_client/utils.py delete mode 100644 python/ppc_scheduler/node/node_manager.py delete mode 100644 python/ppc_scheduler/ppc_scheduler_app.py delete mode 100755 python/ppc_scheduler/scripts/start.sh delete mode 100755 python/ppc_scheduler/scripts/stop.sh delete mode 100644 python/ppc_scheduler/workflow/__init__.py delete mode 100644 python/ppc_scheduler/workflow/builder/__init__.py delete mode 100644 python/ppc_scheduler/workflow/builder/flow_builder.py delete mode 100644 python/ppc_scheduler/workflow/common/__init__.py delete mode 100644 python/ppc_scheduler/workflow/common/codec.py delete mode 100644 python/ppc_scheduler/workflow/common/flow_utils.py delete mode 100644 python/ppc_scheduler/workflow/common/job_context.py delete mode 100644 python/ppc_scheduler/workflow/common/worker_status.py delete mode 100644 python/ppc_scheduler/workflow/common/worker_type.py delete mode 100644 python/ppc_scheduler/workflow/scheduler/__init__.py delete mode 100644 python/ppc_scheduler/workflow/scheduler/scheduler.py delete mode 100644 python/ppc_scheduler/workflow/scheduler/scheduler_api.py delete mode 100644 python/ppc_scheduler/workflow/worker/__init__.py delete mode 100644 python/ppc_scheduler/workflow/worker/api_worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/default_worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/engine/__init__.py delete mode 100644 python/ppc_scheduler/workflow/worker/engine/model_engine.py delete mode 100644 python/ppc_scheduler/workflow/worker/engine/mpc_engine.py delete mode 100644 python/ppc_scheduler/workflow/worker/engine/psi_engine.py delete mode 100644 python/ppc_scheduler/workflow/worker/engine/shell_engine.py delete mode 100644 python/ppc_scheduler/workflow/worker/engine/work_engine.py delete mode 100644 python/ppc_scheduler/workflow/worker/model_worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/mpc_worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/psi_worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/python_worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/shell_worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/worker.py delete mode 100644 python/ppc_scheduler/workflow/worker/worker_factory.py diff --git a/.github/workflows/cpp_full_node_workflow.yml b/.github/workflows/cpp_full_node_workflow.yml index 8c4cc9e2..7779352c 100644 --- a/.github/workflows/cpp_full_node_workflow.yml +++ b/.github/workflows/cpp_full_node_workflow.yml @@ -3,11 +3,13 @@ on: push: paths-ignore: - "docs/**" + - "python/**" - "Changelog.md" - "README.md" pull_request: paths-ignore: - "docs/**" + - "python/**" - "Changelog.md" - "README.md" release: @@ -62,7 +64,7 @@ jobs: sudo apt install -y lcov ccache wget libgmp-dev python3-dev export GCC='gcc-10' export CXX='g++-10' - bash cpp/tools/install_depends.sh -o ubuntu + bash -x cpp/tools/install_depends.sh -o ubuntu mkdir -p cpp/build && cd cpp/build && cmake -DTESTS=ON -DCOVERAGE=ON -DCMAKE_TOOLCHAIN_FILE=${{ env.VCPKG_ROOT }}/scripts/buildsystems/vcpkg.cmake ../ || cat /Users/runner/work/WeDPR-Component/WeDPR-Component/vcpkg/buildtrees/libhdfs3/config-x64-osx-dbg-err.log - name: Build for macOS if: runner.os == 'macOS' @@ -70,16 +72,16 @@ jobs: bash -x cpp/tools/install_depends.sh -o macos mkdir -p cpp/build && cd cpp/build && cmake -DTESTS=ON -DCOVERAGE=ON -DCMAKE_TOOLCHAIN_FILE=${{ env.VCPKG_ROOT }}/scripts/buildsystems/vcpkg.cmake ../ make -j3 - - name: Test - if: runner.os != 'Windows' - run: | - cd cpp/build && CTEST_OUTPUT_ON_FAILURE=TRUE ctest - name: Publish Error if: always() uses: actions/upload-artifact@v4 with: - name: config-x64-osx-dbg-err.log - path: /Users/runner/work/WeDPR-Component/WeDPR-Component/vcpkg/buildtrees/libhdfs3/config-x64-osx-dbg-err.log + name: config-x64-linux-dbg-err.log + path: /home/runner/work/WeDPR-Component/WeDPR-Component/vcpkg/buildtrees/libhdfs3/config-x64-linux-dbg-err.log + #- name: Test + # if: runner.os != 'Windows' + # run: | + # cd cpp/build && CTEST_OUTPUT_ON_FAILURE=TRUE ctest build_centos: name: build_centos full node @@ -135,28 +137,34 @@ jobs: run: | bash cpp/tools/install_depends.sh -o centos alias cmake='cmake3' - . /opt/rh/devtoolset-11/enable + . /opt/rh/devtoolset-11/enable mkdir -p cpp/build cd cpp/build cmake3 -DCMAKE_BUILD_TYPE=Release -DTESTS=ON -DCMAKE_TOOLCHAIN_FILE=/usr/local/share/vcpkg/scripts/buildsystems/vcpkg.cmake ../ cmake3 --build . --parallel 3 - - name: Test - run: | - export OMP_NUM_THREADS=1 - cd build && CTEST_OUTPUT_ON_FAILURE=TRUE make test - - uses: actions/upload-artifact@v4 + - name: Publish Error + if: always() + uses: actions/upload-artifact@v3 + with: + name: vcpkg-manifest-install.log + path: /__w/WeDPR-Component/WeDPR-Component/cpp/build/vcpkg-manifest-install.log + #- name: Test + # run: | + # export OMP_NUM_THREADS=1 + # cd build && CTEST_OUTPUT_ON_FAILURE=TRUE make test + - uses: actions/upload-artifact@v3 with: name: ppc-air-node-centos-x64 path: ./cpp/build/bin/ppc-air-node - - uses: actions/upload-artifact@v4 + - uses: actions/upload-artifact@v3 with: name: ppc-pro-node-centos-x64 path: ./cpp/build/bin/ppc-pro-node - - uses: actions/upload-artifact@v4 + - uses: actions/upload-artifact@v3 with: name: ppc-gateway-service-centos-x64 path: ./cpp/build/bin/ppc-gateway-service - - uses: actions/upload-artifact@v4 + - uses: actions/upload-artifact@v3 with: name: libppc-crypto-sdk-jni.so path: ./cpp/wedpr-component-sdk/bindings/java/src/main/resources/META-INF/native/libppc-crypto-sdk-jni.so diff --git a/.github/workflows/cpp_sdk_workflow.yml b/.github/workflows/cpp_sdk_workflow.yml index 93091ea8..bc4252d1 100644 --- a/.github/workflows/cpp_sdk_workflow.yml +++ b/.github/workflows/cpp_sdk_workflow.yml @@ -3,11 +3,13 @@ on: push: paths-ignore: - "docs/**" + - "python/**" - "Changelog.md" - "README.md" pull_request: paths-ignore: - "docs/**" + - "python/**" - "Changelog.md" - "README.md" release: diff --git a/.github/workflows/cpp_toolkit_workflow.yml b/.github/workflows/cpp_toolkit_workflow.yml index a63040c0..fdcdc9a4 100644 --- a/.github/workflows/cpp_toolkit_workflow.yml +++ b/.github/workflows/cpp_toolkit_workflow.yml @@ -3,11 +3,13 @@ on: push: paths-ignore: - "docs/**" + - "python/**" - "Changelog.md" - "README.md" pull_request: paths-ignore: - "docs/**" + - "python/**" - "Changelog.md" - "README.md" release: diff --git a/cpp/cmake/grpc.cmake b/cpp/cmake/grpc.cmake index f02b80b5..b88d316f 100644 --- a/cpp/cmake/grpc.cmake +++ b/cpp/cmake/grpc.cmake @@ -19,6 +19,29 @@ if(NOT GRPC_CPP_PLUGIN AND TARGET gRPC::grpc_cpp_plugin) endif() endif() + +if(NOT GRPC_CPP_REFLECTION AND TARGET gRPC::grpc++_reflection) + get_target_property(GRPC_CPP_REFLECTION gRPC::grpc++_reflection + IMPORTED_LOCATION_RELEASE) + if(NOT EXISTS "${GRPC_CPP_REFLECTION}") + get_target_property(GRPC_CPP_REFLECTION gRPC::grpc++_reflection + IMPORTED_LOCATION_RELWITHDEBINFO) + endif() + if(NOT EXISTS "${GRPC_CPP_REFLECTION}") + get_target_property(GRPC_CPP_REFLECTION gRPC::grpc++_reflection + IMPORTED_LOCATION_MINSIZEREL) + endif() + if(NOT EXISTS "${GRPC_CPP_REFLECTION}") + get_target_property(GRPC_CPP_REFLECTION gRPC::grpc++_reflection + IMPORTED_LOCATION_DEBUG) + endif() + if(NOT EXISTS "${GRPC_CPP_REFLECTION}") + get_target_property(GRPC_CPP_REFLECTION gRPC::grpc++_reflection + IMPORTED_LOCATION_NOCONFIG) + endif() +endif() + set(PROTOC_BINARY ${Protobuf_PROTOC_EXECUTABLE}) message("# PROTOC_BINARY: ${Protobuf_PROTOC_EXECUTABLE}") -message("# GRPC_CPP_PLUGIN: ${GRPC_CPP_PLUGIN}") \ No newline at end of file +message("# GRPC_CPP_PLUGIN: ${GRPC_CPP_PLUGIN}") +message("# GRPC_CPP_REFLECTION: ${GRPC_CPP_REFLECTION}") \ No newline at end of file diff --git a/cpp/tools/install_depends.sh b/cpp/tools/install_depends.sh index 8c2b9caa..108a6aae 100644 --- a/cpp/tools/install_depends.sh +++ b/cpp/tools/install_depends.sh @@ -38,7 +38,7 @@ install_gsasl_depend() fi # ubuntu if [[ "${os_type}" == "ubuntu" ]];then - cd libgsasl-1.8.0 && ./configure --with-pic --disable-ntlm --disable-gs2 --disable-gssapi --without-stringprep && make -j4 && make install + cd libgsasl-1.8.0 && ./configure --with-pic && make -j4 && make install fi LOG_INFO "download and install gsasl success..." } diff --git a/cpp/wedpr-computing/ppc-pir/tests/CMakeLists.txt b/cpp/wedpr-computing/ppc-pir/tests/CMakeLists.txt index 6a969962..8af1b5f9 100644 --- a/cpp/wedpr-computing/ppc-pir/tests/CMakeLists.txt +++ b/cpp/wedpr-computing/ppc-pir/tests/CMakeLists.txt @@ -9,4 +9,4 @@ target_include_directories(${TEST_BINARY_NAME} PRIVATE .) # target_link_libraries(${TEST_BINARY_NAME} ${PIR_TARGET} ${RPC_TARGET} ${CRYPTO_TARGET} ${BOOST_UNIT_TEST}) target_link_libraries(${TEST_BINARY_NAME} PUBLIC ${IO_TARGET} ${FRONT_TARGET} ${BCOS_UTILITIES_TARGET} ${TARS_PROTOCOL_TARGET} ${PIR_TARGET} ${RPC_TARGET} ${CRYPTO_TARGET} ${PROTOCOL_TARGET} ${BOOST_UNIT_TEST}) -add_test(NAME test-psi WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND ${TEST_BINARY_NAME}) \ No newline at end of file +add_test(NAME test-pir WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND ${TEST_BINARY_NAME}) \ No newline at end of file diff --git a/cpp/wedpr-computing/ppc-pir/tests/data/AysPreDataset.csv b/cpp/wedpr-computing/ppc-pir/tests/data/AysPreDataset.csv new file mode 100644 index 00000000..590b61f2 --- /dev/null +++ b/cpp/wedpr-computing/ppc-pir/tests/data/AysPreDataset.csv @@ -0,0 +1,3 @@ +id,x1 +1,test +2,test2 diff --git a/cpp/wedpr-protocol/grpc/CMakeLists.txt b/cpp/wedpr-protocol/grpc/CMakeLists.txt index b72e308b..fbd1fbc6 100644 --- a/cpp/wedpr-protocol/grpc/CMakeLists.txt +++ b/cpp/wedpr-protocol/grpc/CMakeLists.txt @@ -23,7 +23,7 @@ foreach(proto_file ${MESSAGES_PROTOS}) endforeach() add_library(${SERVICE_PB_TARGET} ${GRPC_MESSAGES_SRCS}) -target_link_libraries(${SERVICE_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure gRPC::grpc++_reflection) +target_link_libraries(${SERVICE_PB_TARGET} PUBLIC ${PB_PROTOCOL_TARGET} gRPC::grpc++_unsecure ${GRPC_CPP_REFLECTION}) add_subdirectory(client) add_subdirectory(server) diff --git a/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp b/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp index f71e47ce..6e7af353 100644 --- a/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp +++ b/cpp/wedpr-protocol/protocol/src/JsonTaskImpl.cpp @@ -211,7 +211,7 @@ void JsonTaskImpl::decodeDataResourceDesc(DataResourceDesc::Ptr _desc, Json::Val BOOST_THROW_EXCEPTION( InvalidParameter() << errinfo_comment("The \"..\" cannot be in the path")); } - if (path.starts_with("/")) + if (path.rfind("/", 0) == 0) { _desc->setPath(path); } diff --git a/cpp/wedpr-protocol/tars/TarsStruct.h b/cpp/wedpr-protocol/tars/TarsStruct.h index 395725cd..4818237d 100644 --- a/cpp/wedpr-protocol/tars/TarsStruct.h +++ b/cpp/wedpr-protocol/tars/TarsStruct.h @@ -19,22 +19,11 @@ */ #pragma once +#include #include -#include namespace ppctars::serialize { template -concept TarsStruct = requires(TarsStructType tarsStruct) -{ - { - tarsStruct.className() - } - ->std::same_as; - { - tarsStruct.MD5() - } - ->std::same_as; - tarsStruct.resetDefautlt(); -}; +concept TarsStruct = std::derived_from; } // namespace ppctars::serialize diff --git a/python/ppc_scheduler/__init__.py b/python/ppc_scheduler/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/common/__init__.py b/python/ppc_scheduler/common/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/common/global_context.py b/python/ppc_scheduler/common/global_context.py deleted file mode 100644 index 0d96ed01..00000000 --- a/python/ppc_scheduler/common/global_context.py +++ /dev/null @@ -1,3 +0,0 @@ -from ppc_scheduler.common.initializer import Initializer - -components = Initializer() \ No newline at end of file diff --git a/python/ppc_scheduler/common/global_job_manager.py b/python/ppc_scheduler/common/global_job_manager.py deleted file mode 100644 index 14d91556..00000000 --- a/python/ppc_scheduler/common/global_job_manager.py +++ /dev/null @@ -1,15 +0,0 @@ - - -from ppc_scheduler.job.job_manager import JobManager - - -global_job_manager: JobManager = None - -def update_job_manager(job_manager): - global global_job_manager - global_job_manager = job_manager - -def get_job_manager(): - global global_job_manager - return global_job_manager - diff --git a/python/ppc_scheduler/common/initializer.py b/python/ppc_scheduler/common/initializer.py deleted file mode 100644 index 162feebf..00000000 --- a/python/ppc_scheduler/common/initializer.py +++ /dev/null @@ -1,72 +0,0 @@ -import logging -import logging.config -import os -from contextlib import contextmanager - -import yaml -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker - -from ppc_common.deps_services import storage_loader -from ppc_common.ppc_utils import common_func - -class Initializer: - def __init__(self): - self.config_data = None - self.job_cache_dir = None - self.config_data = None - self.sql_session = None - self.sql_engine = None - self.storage_client = None - - def init_all(self, config_data): - self.config_data = config_data - - # self.init_log() - # self.init_config() - self.init_cache() - self.init_sql_client() - self.init_storage_client() - - @staticmethod - def init_config(config_path: str): - with open(config_path, 'rb') as f: - config_data = yaml.safe_load(f.read()) - return config_data - - @staticmethod - def init_log(log_config_path: str): - logging.config.fileConfig(log_config_path) - - def init_cache(self): - self.job_cache_dir = common_func.get_config_value( - "JOB_TEMP_DIR", "/tmp", self.config_data, False) - if not os.path.exists(self.job_cache_dir): - os.makedirs(self.job_cache_dir) - - def init_sql_client(self): - self.sql_engine = create_engine(self.config_data['SQLALCHEMY_DATABASE_URI'], pool_pre_ping=True) - self.sql_session = sessionmaker(bind=self.sql_engine) - - @contextmanager - def create_sql_session(self): - session = self.sql_session() - try: - yield session - session.commit() - except: - session.rollback() - raise - finally: - session.close() - - def init_storage_client(self): - self.storage_client = storage_loader.load( - self.config_data, self.logger()) - - def update_thread_event_manager(self, thread_event_manager): - self.thread_event_manager = thread_event_manager - - @staticmethod - def logger(name=None): - return logging.getLogger(name) diff --git a/python/ppc_scheduler/common/log_utils.py b/python/ppc_scheduler/common/log_utils.py deleted file mode 100644 index 159b9e5c..00000000 --- a/python/ppc_scheduler/common/log_utils.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- coding: utf-8 -*- - -import os - -from ppc_common.ppc_utils import utils, path - - -def job_start_log_info(job_id): - return f"=====================start_job_{job_id}=====================" - -def job_end_log_info(job_id): - return f"======================end_job_{job_id}======================" - -def worker_start_log_info(work_id): - return f"=====================start_work_{work_id}=====================" - -def worker_end_log_info(work_id): - return f"======================end_work_{work_id}======================" - -def upload_job_log(storage_client, job_id, extra=None): - job_log_path = None - try: - file_path = path.get_path() - job_log_path = utils.get_log_temp_file_path(file_path, job_id) - origin_log_path = utils.get_log_file_path(file_path) - filter_job_log(job_id, origin_log_path, job_log_path) - if extra is not None: - job_log = open(job_log_path, 'a+') - job_log.write('\n' * 3) - job_log.write(extra) - job_log.close() - storage_client.upload_file(job_log_path, job_id + os.sep + utils.LOG_NAME) - except (Exception) as e: - print(e) - finally: - if os.path.exists(job_log_path): - os.remove(job_log_path) - - -def read_line_inverse(file): - file.seek(0, 2) - current_position = file.tell() - position = 0 - while position + current_position >= 0: - while True: - position -= 1 - try: - file.seek(position, 2) - if file.read(1) == b'\n': - break - except: - # point at file header - file.seek(0, 0) - break - line = file.readline() - yield line - - -def filter_job_log(job_id, origin_log_path, job_log_path): - origin_log_file = open(origin_log_path, 'rb') - line_list = [] - need_record = False - - # search job log - for line in read_line_inverse(origin_log_file): - if need_record: - line_list.append(line) - if not need_record and str(line).__contains__(job_end_log_info(job_id)): - need_record = True - line_list.append(line) - elif str(line).__contains__(job_start_log_info(job_id)): - break - origin_log_file.close() - - # save log lines into temp file - job_log_file = open(job_log_path, 'wb+') - job_log_file.writelines(list(reversed(line_list))) - job_log_file.close() diff --git a/python/ppc_scheduler/conf/application-sample.yml b/python/ppc_scheduler/conf/application-sample.yml deleted file mode 100644 index 6d43e9b3..00000000 --- a/python/ppc_scheduler/conf/application-sample.yml +++ /dev/null @@ -1,13 +0,0 @@ -HTTP_HOST: "0.0.0.0" -HTTP_PORT: 43471 -HTTP_THREAD_NUM: 8 - -JOB_TIMEOUT_H: 1800 -WORKSPACE: "/data/app/files/job" -STORAGE_TYPE: "HDFS" -HDFS_URL: "http://127.0.0.1:50070" - -# mysql or dm -DB_TYPE: "mysql" -SQLALCHEMY_DATABASE_URI: "mysql://[*user_ppcsmodeladm]:[*pass_ppcsmodeladm]@[@4346-TDSQL_VIP]:[@4346-TDSQL_PORT]/ppcsmodeladm?autocommit=true&charset=utf8mb4" -# SQLALCHEMY_DATABASE_URI: "dm+dmPython://ppcv16:ppc12345678@127.0.0.1:5236" diff --git a/python/ppc_scheduler/conf/logging.conf b/python/ppc_scheduler/conf/logging.conf deleted file mode 100644 index b472e2ab..00000000 --- a/python/ppc_scheduler/conf/logging.conf +++ /dev/null @@ -1,40 +0,0 @@ -[loggers] -keys=root,wsgi - -[logger_root] -level=INFO -handlers=consoleHandler,fileHandler - -[logger_wsgi] -level = INFO -handlers = accessHandler -qualname = wsgi -propagate = 0 - -[handlers] -keys=fileHandler,consoleHandler,accessHandler - -[handler_accessHandler] -class=handlers.TimedRotatingFileHandler -args=('./logs/appmonitor.log', 'D', 1, 30, 'utf-8') -level=INFO -formatter=simpleFormatter - -[handler_fileHandler] -class=handlers.TimedRotatingFileHandler -args=('./logs/scheduler.log', 'D', 1, 30, 'utf-8') -level=INFO -formatter=simpleFormatter - -[handler_consoleHandler] -class=StreamHandler -args=(sys.stdout,) -level=ERROR -formatter=simpleFormatter - -[formatters] -keys=simpleFormatter - -[formatter_simpleFormatter] -format=[%(levelname)s][%(asctime)s %(msecs)03d][%(process)d][%(filename)s:%(lineno)d] %(message)s -datefmt=%Y-%m-%d %H:%M:%S diff --git a/python/ppc_scheduler/database/__init__.py b/python/ppc_scheduler/database/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/database/computing_node_mapper.py b/python/ppc_scheduler/database/computing_node_mapper.py deleted file mode 100644 index 5b6417a3..00000000 --- a/python/ppc_scheduler/database/computing_node_mapper.py +++ /dev/null @@ -1,74 +0,0 @@ -from sqlalchemy import update, and_, select, delete - -from ppc_common.db_models.computing_node import ComputingNodeRecord - - -# def insert_computing_node(session, node_id: str, url: str, node_type: str, loading: int): -# new_node = ComputingNodeRecord( -# id=node_id, -# url=url, -# type=node_type, -# loading=loading -# ) - -# session.add(new_node) - - -# def delete_computing_node(session, url: str, node_type: str): -# stmt = ( -# delete(ComputingNodeRecord).where( -# and_( -# ComputingNodeRecord.url == url, -# ComputingNodeRecord.type == node_type -# ) -# ) -# ) - -# result = session.execute(stmt) - -# return result.rowcount > 0 - - -def get_and_update_min_loading_url(session, node_type): - - # select min_loading node - min_loading_node_id = session.query(ComputingNodeRecord.id).filter( - ComputingNodeRecord.type == node_type - ).order_by(ComputingNodeRecord.loading.asc()).first() - - - # update loading - stmt = ( - update(ComputingNodeRecord).where( - and_( - ComputingNodeRecord.id == min_loading_node_id.id, - ComputingNodeRecord.type == node_type - ) - ).values( - loading=ComputingNodeRecord.loading + 1 - ) - ) - session.execute(stmt) - - # select min_loading node - record = session.query(ComputingNodeRecord.url, ComputingNodeRecord.token).filter( - ComputingNodeRecord.id == min_loading_node_id.id - ).order_by(ComputingNodeRecord.loading.asc()).first() - - return record - -def release_loading(session, url: str, node_type: str): - stmt = ( - update(ComputingNodeRecord).where( - and_( - ComputingNodeRecord.url == url, - ComputingNodeRecord.type == node_type, - ComputingNodeRecord.loading > 0 - ) - ).values( - loading=ComputingNodeRecord.loading - 1 - ) - ) - result = session.execute(stmt) - - return result.rowcount > 0 diff --git a/python/ppc_scheduler/database/job_worker_mapper.py b/python/ppc_scheduler/database/job_worker_mapper.py deleted file mode 100644 index 0d8a7e8b..00000000 --- a/python/ppc_scheduler/database/job_worker_mapper.py +++ /dev/null @@ -1,52 +0,0 @@ -import datetime -import json -# from sqlalchemy.dialects.mysql import insert -from sqlalchemy import and_, update, insert -from sqlalchemy.exc import NoResultFound - -from ppc_common.db_models.job_worker_record import JobWorkerRecord -from ppc_common.ppc_utils import utils -from ppc_scheduler.workflow.common import codec -from ppc_scheduler.workflow.common.worker_status import WorkerStatus - -def insert_job_worker(session, worker): - upstreams_str = json.dumps(worker['upstreams']) - inputs_statement_str = json.dumps(worker['inputs_statement']) - args_str=json.dumps(worker['args']) - - stmt = insert(JobWorkerRecord).prefix_with(" IGNORE").values( - worker_id=worker['worker_id'], - job_id=worker['job_id'], - type=worker['type'], - status=WorkerStatus.PENDING, - args=args_str, - upstreams=upstreams_str, - inputs_statement=inputs_statement_str - ) - - result = session.execute(stmt) - return result.rowcount > 0 - -def query_job_worker(session, job_id, worker_id): - try: - return session.query(JobWorkerRecord).filter( - and_(JobWorkerRecord.worker_id == worker_id, - JobWorkerRecord.job_id == job_id)).one() - except NoResultFound: - return None - - -def update_job_worker(session, job_id, worker_id, status, outputs): - stmt = ( - update(JobWorkerRecord).where( - and_( - JobWorkerRecord.job_id == job_id, - JobWorkerRecord.worker_id == worker_id - ) - ).values( - status=status, - outputs=codec.serialize_worker_outputs(outputs) - ) - ) - result = session.execute(stmt) - return result.rowcount > 0 diff --git a/python/ppc_scheduler/demo/default_flow_config_sample.py b/python/ppc_scheduler/demo/default_flow_config_sample.py deleted file mode 100644 index 123df9c2..00000000 --- a/python/ppc_scheduler/demo/default_flow_config_sample.py +++ /dev/null @@ -1,224 +0,0 @@ -flow_dict = { - - "SHELL": [ - { - "index": 1, - "type": "T_SHELL" - }, - { - "index": 2, - "type": "T_SHELL", - "upstreams": [ - { - "index": 1 - } - ] - }, - { - "index": 3, - "type": "T_SHELL", - "upstreams": [ - { - "index": 2 - } - ] - } - ], - - "PSI": [ - { - "index": 1, - "type": "T_PSI" - } - ], - - "MPC": [ - { - "index": 1, - "type": "T_MPC" - } - ], - - "PSI_MPC": [ - { - "index": 1, - "type": "T_PSI" - }, - { - "index": 2, - "type": "T_MPC", - "upstreams": [ - { - "index": 1, - "output_input_map": [ - "0:0" - ] - } - ] - } - ], - - "PREPROCESSING": [ - { - "index": 1, - "type": "T_MODEL" - } - ], - - "FEATURE_ENGINEERING": [ - { - "index": 1, - "type": "T_MODEL" - }, - { - "index": 2, - "type": "T_MODEL", - "upstreams": [ - { - "index": 1 - } - ] - } - ], - - "TRAINING": [ - { - "index": 1, - "type": "T_MODEL" - }, - { - "index": 2, - "type": "T_MODEL", - "upstreams": [ - { - "index": 1 - } - ] - } - ], - - "PREDICTION": [ - { - "index": 1, - "type": "T_MODEL" - }, - { - "index": 2, - "type": "T_MODEL", - "upstreams": [ - { - "index": 1 - } - ] - } - ], - - "FEATURE_ENGINEERING_TRAINING": [ - { - "index": 1, - "type": "T_MODEL" - }, - { - "index": 2, - "type": "T_MODEL", - "upstreams": [ - { - "index": 1 - } - ] - }, - { - "index": 3, - "type": "T_MODEL", - "upstreams": [ - { - "index": 2 - } - ] - } - ], - - "PSI_FEATURE_ENGINEERING": [ - { - "index": 1, - "type": "T_PSI" - }, - { - "index": 2, - "type": "T_MODEL", - "upstreams": [ - { - "index": 1 - } - ] - }, - { - "index": 3, - "type": "T_MODEL", - "upstreams": [ - { - "index": 2 - } - ] - } - ], - - "PSI_TRAINING": [ - { - "index": 1, - "type": "T_PSI" - }, - { - "index": 2, - "type": "T_MODEL", - "upstreams": [ - { - "index": 1 - } - ] - }, - { - "index": 3, - "type": "T_MODEL", - "upstreams": [ - { - "index": 2 - } - ] - } - ], - - "PSI_FEATURE_ENGINEERING_TRAINING": [ - { - "index": 1, - "type": "T_PSI" - }, - { - "index": 2, - "type": "T_MODEL", - "upstreams": [ - { - "index": 1 - } - ] - }, - { - "index": 3, - "type": "T_MODEL", - "upstreams": [ - { - "index": 2 - } - ] - }, - { - "index": 4, - "type": "T_MODEL", - "upstreams": [ - { - "index": 3 - } - ] - } - ] -} diff --git a/python/ppc_scheduler/demo/prefect_workflow_sample.py b/python/ppc_scheduler/demo/prefect_workflow_sample.py deleted file mode 100644 index 9a146260..00000000 --- a/python/ppc_scheduler/demo/prefect_workflow_sample.py +++ /dev/null @@ -1,79 +0,0 @@ -import time -from prefect import Task, Flow -from prefect.triggers import all_successful, any_failed -from prefect import Flow -from prefect.executors import LocalDaskExecutor - -class SuccessTask(Task): - def run(self): - print(" ===>>>> Success Task Ran") - -class FailTask(Task): - def run(self): - print(" ===>>> Fail Task Ran") - -class JobTask(Task): - def __init__(self, name): - super().__init__(name=name) - self.name = name - - def run(self, x, y): - print(" ==> " + self.name + " is running, " + str(x) + ", " + str(y)) - return x + y - -# with Flow("My Flow") as f: -# t1 = a(1, 2) # t1 != a -# t2 = a(5, 7) # t2 != a - -with Flow("example_flow") as flow: - - task1 = JobTask("t1") - task2 = JobTask("t2") - task3 = JobTask("t3") - task4 = JobTask("t4") - - # task1=t1(1,2) - # task2=t2(3,4) - # task3=t3(4,5) - # task4=t4(6,7) - task1.bind(x = 1, y = 2) - task2.bind(x = 3, y = 4) - task3.bind(x = 5, y = 6) - task4.bind(x = 7, y = 8) - - task3.set_upstream(task1, flow=flow) - task3.set_upstream(task2, flow=flow) - task4.set_upstream(task3, flow=flow) - - # 定义成功任务,仅当所有上游任务成功时才运行 - success_task = SuccessTask(name="Success Task") - success_task.trigger = all_successful - - # 定义失败任务,只要任何一个上游任务失败就运行 - fail_task = FailTask(name="Fail Task") - fail_task.trigger = any_failed - - # 设置依赖关系 - task1.set_downstream(success_task, flow=flow) - task2.set_downstream(success_task, flow=flow) - task3.set_downstream(success_task, flow=flow) - task4.set_downstream(success_task, flow=flow) - - task1.set_downstream(fail_task, flow=flow) - task2.set_downstream(fail_task, flow=flow) - task3.set_downstream(fail_task, flow=flow) - task4.set_downstream(fail_task, flow=flow) - - -# flow.executor = LocalDaskExecutor() - -start_time = time.time() -# 运行工作流 -flow_state=flow.run() - -end_time = time.time() -print(f" ## costs: {end_time - start_time}") - -print(flow_state.result) - -flow.visualize(flow_state, "./my_flow", 'svg') \ No newline at end of file diff --git a/python/ppc_scheduler/demo/web_submit_job.json b/python/ppc_scheduler/demo/web_submit_job.json deleted file mode 100644 index f5b6d9cd..00000000 --- a/python/ppc_scheduler/demo/web_submit_job.json +++ /dev/null @@ -1,134 +0,0 @@ -[ - { - "job": { - "jobType": "PSI", - "projectName": "octo-test", - "param": "{\"dataSetList\":[{\"idFields\":[\"x200\"],\"dataset\":{\"owner\":\"octo1\",\"ownerAgency\":\"WeBank\",\"path\":\"/user/ppc/webank/octo1/d-9323009025845253\",\"storageTypeStr\":\"HDFS\",\"datasetID\":\"d-9323009025845253\"},\"receiveResult\":false},{\"idFields\":[\" x106\"],\"dataset\":{\"owner\":\"octopuswang1\",\"ownerAgency\":\"SGD\",\"path\":\"/user/ppc/sgd/octopuswang1/d-9322966927878149\",\"storageTypeStr\":\"HDFS\",\"datasetID\":\"d-9322966927878149\"},\"receiveResult\":true}]}" - }, - "taskParties": [ - { - "userName": "octo1", - "agency": "WeBank" - }, - { - "userName": "octopuswang1", - "agency": "SGD" - } - ], - "datasetList": [ - "d-xxx", - "d-yyy" - ] - }, - { - "dataSetList": [ - { - "idFields": [ - "x200" - ], - "dataset": { - "owner": "octo1", - "ownerAgency": "WeBank", - "path": "/user/ppc/webank/octo1/d-9323009025845253", - "storageTypeStr": "HDFS", - "datasetID": "d-9323009025845253" - }, - "receiveResult": false - }, - { - "idFields": [ - " x106" - ], - "dataset": { - "owner": "octopuswang1", - "ownerAgency": "SGD", - "path": "/user/ppc/sgd/octopuswang1/d-9322966927878149", - "storageTypeStr": "HDFS", - "datasetID": "d-9322966927878149" - }, - "receiveResult": true - } - ] - } - ] - - { - "jsonrpc": "1", - "method": "asyncRunTask", - "token": "", - "id": 61480, - "params": { - "taskID": "9441012273129477-PSI", - "type": 0, - "algorithm": 0, - "syncResult": true, - "lowBandwidth": false, - "parties": [ - { - "id": "WeBank", - "partyIndex": 0, - "data": { - "id": "9441012273129477", - "input": { - "datasetID": "d-9431392664627205", - "type": 2, - "storageTypeStr": "HDFS", - "path": "/user/ppc/webank/octo1/d-9323009025845253", - "owner": "octo1", - "ownerAgency": "WeBank", - "storagePath": { - "storageType": "HDFS", - "filePath": "/user/ppc/webank/octo1/d-9323009025845253" - } - }, - "output": { - "datasetID": null, - "type": 2, - "storageTypeStr": "HDFS", - "path": "/user/wedpr//home/cache/octo1/9441012273129477/psi_result.csv", - "owner": "octo1", - "ownerAgency": "WeBank", - "storagePath": { - "storageType": "HDFS", - "filePath": "/user/wedpr/home/cache/octo1/9441012273129477/psi_result.csv" - } - } - } - }, - { - "id": "SGD", - "partyIndex": 0, - "data": { - "id": "9441012273129477", - "input": { - "datasetID": "d-9431392714827781", - "type": 2, - "storageTypeStr": "HDFS", - "path": "/user/ppc/webank/octo1/d-9323009025845253", - "owner": "octopuswang1", - "ownerAgency": "SGD", - "storagePath": { - "storageType": "HDFS", - "filePath": "/user/ppc/webank/octo1/d-9323009025845253" - } - }, - "output": { - "datasetID": null, - "type": 2, - "storageTypeStr": "HDFS", - "path": "/user/wedpr//home/cache/octopuswang1/9441012273129477/psi_result.csv", - "owner": "octopuswang1", - "ownerAgency": "SGD", - "storagePath": { - "storageType": "HDFS", - "filePath": "/user/wedpr/home/cache/octopuswang1/9441012273129477/psi_result.csv" - } - } - } - } - ], - "receiverList": [ - "SGD" - ] - } - } \ No newline at end of file diff --git a/python/ppc_scheduler/demo/workflow_args_sample.json b/python/ppc_scheduler/demo/workflow_args_sample.json deleted file mode 100644 index d77a9c1c..00000000 --- a/python/ppc_scheduler/demo/workflow_args_sample.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "jobId": "x12345678", - "agency": "WeBank", - "workflow": [ - { - "index": 1, - "type": "SHELL", - "args": [ - "sleep 10" - ] - }, - { - "index": 2, - "type": "SHELL", - "args": [ - "sleep 10" - ], - "upstreams": [ - { - "index": 1 - } - ] - }, - { - "index": 3, - "type": "SHELL", - "args": [ - "sleep 10" - ], - "upstreams": [ - { - "index": 2 - } - ] - }, - { - "index": 4, - "type": "SHELL", - "args": [ - "sleep 10" - ], - "upstreams": [ - { - "index": 3 - } - ] - }, - { - "index": 5, - "type": "SHELL", - "args": [ - "sleep 10" - ], - "upstreams": [ - { - "index": 4 - } - ] - }, - { - "index": 6, - "type": "SHELL", - "args": [ - "sleep 10" - ], - "upstreams": [ - { - "index": 5 - } - ] - } - ] -} \ No newline at end of file diff --git a/python/ppc_scheduler/endpoints/__init__.py b/python/ppc_scheduler/endpoints/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/endpoints/body_schema.py b/python/ppc_scheduler/endpoints/body_schema.py deleted file mode 100644 index 01ad24b8..00000000 --- a/python/ppc_scheduler/endpoints/body_schema.py +++ /dev/null @@ -1,18 +0,0 @@ -# -*- coding: utf-8 -*- -import json - -from flask_restx import fields - -from ppc_model.network.http.restx import api - -response_base = api.model('Response base info', { - 'errorCode': fields.Integer(description='return code'), - 'message': fields.String(description='return message') -}) - -response_job_status = api.inherit('Task status', response_base, { - 'data': fields.Raw(description='Task status data as key-value dictionary', example={ - 'status': 'RUNNING', - 'time_costs': '30s' - }) -}) diff --git a/python/ppc_scheduler/endpoints/job_controller.py b/python/ppc_scheduler/endpoints/job_controller.py deleted file mode 100644 index dbf8d9cb..00000000 --- a/python/ppc_scheduler/endpoints/job_controller.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- - -from flask import request -from flask_restx import Resource - -from ppc_common.ppc_utils import utils -from ppc_common.ppc_utils.exception import PpcErrorCode, PpcException -from ppc_scheduler.common.global_context import components -from ppc_scheduler.endpoints.body_schema import response_job_status, response_base -from ppc_scheduler.endpoints.restx import api -from ppc_scheduler.common.global_job_manager import get_job_manager - -ns = api.namespace('/wedpr/v3/scheduler/job', - description='Operations related to run job') - - -@ns.route('/') -class JobCollection(Resource): - - @api.response(200, 'Job started successfully.', response_base) - def post(self, job_id): - """ - Run a specific job by job_id. - """ - request_body = request.get_json() - - if 'jobId' not in request_body: - raise PpcException( - PpcErrorCode.PARAMETER_CHECK_ERROR.get_code(), - f"Missing 'jobId' in request") - # if 'agency' not in request_body: - # raise PpcException( - # PpcErrorCode.PARAMETER_CHECK_ERROR.get_code(), - # f"Missing 'agency' in request") - if 'workflow' not in request_body: - raise PpcException( - PpcErrorCode.PARAMETER_CHECK_ERROR.get_code(), - f"Missing 'workflow' in request") - - components.logger().info(f"Recv run job request, job_id: {job_id}, request: {request_body}") - - get_job_manager().run_task(job_id, request_body) - return utils.BASE_RESPONSE - - @api.response(200, 'Job status retrieved successfully.', response_job_status) - def get(self, job_id): - """ - Get the status of a specific job by job_id. - """ - response = utils.BASE_RESPONSE - - components.logger().info(f"Recv query job request, job_id: {job_id}") - - status, time_costs = get_job_manager().status(job_id) - response['data'] = { - 'status': status, - 'time_costs': time_costs - } - return response - - @api.response(200, 'Job killed successfully.', response_base) - def delete(self, job_id): - """ - Kill a specific job by job_id. - """ - - components.logger().info(f"receive kill job request, job_id: {job_id}") - - get_job_manager().kill_job(job_id) - return utils.BASE_RESPONSE diff --git a/python/ppc_scheduler/endpoints/restx.py b/python/ppc_scheduler/endpoints/restx.py deleted file mode 100644 index 28f9829f..00000000 --- a/python/ppc_scheduler/endpoints/restx.py +++ /dev/null @@ -1,35 +0,0 @@ -# -*- coding: utf-8 -*- - -from flask_restx import Api - -from ppc_common.ppc_utils.exception import PpcException, PpcErrorCode -from ppc_model.common.global_context import components - -authorizations = { - 'apikey': { - 'type': 'apiKey', - 'in': 'header', - 'name': 'Authorization' - } -} - -api = Api(version='1.0', title='Ppc Scheduler Service', - authorizations=authorizations, security='apikey') - - -@api.errorhandler(PpcException) -def default_error_handler(e): - components.logger().exception(e) - info = e.to_dict() - response = {'errorCode': info['code'], 'message': info['message']} - components.logger().error(f"OnError: code: {info['code']}, message: {info['message']}") - return response, 500 - - -@api.errorhandler(BaseException) -def default_error_handler(e): - components.logger().exception(e) - message = 'unknown error.' - response = {'errorCode': PpcErrorCode.INTERNAL_ERROR, 'message': message} - components.logger().error(f"OnError: unknown error") - return response, 500 diff --git a/python/ppc_scheduler/job/__init__.py b/python/ppc_scheduler/job/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/job/job_manager.py b/python/ppc_scheduler/job/job_manager.py deleted file mode 100644 index 05e85a00..00000000 --- a/python/ppc_scheduler/job/job_manager.py +++ /dev/null @@ -1,154 +0,0 @@ -import datetime -import threading -import time -from typing import Union - -from readerwriterlock import rwlock - -from ppc_common.ppc_async_executor.async_thread_executor import AsyncThreadExecutor -from ppc_common.ppc_async_executor.thread_event_manager import ThreadEventManager -from ppc_common.ppc_utils.exception import PpcException, PpcErrorCode -from ppc_scheduler.common import log_utils -from ppc_scheduler.job.job_status import JobStatus -from ppc_scheduler.workflow.scheduler.scheduler import Scheduler -from ppc_scheduler.workflow.builder.flow_builder import FlowBuilder -from ppc_scheduler.workflow.common.job_context import JobContext - - -class JobManager: - def __init__(self, logger, - scheduler: Scheduler, - thread_event_manager: ThreadEventManager, - workspace: str, - job_timeout_h: Union[int, float]): - self.logger = logger - self._thread_event_manager = thread_event_manager - self._scheduler = scheduler - self._workspace = workspace - self._job_timeout_s = job_timeout_h * 3600 - self._rw_lock = rwlock.RWLockWrite() - self._jobs: dict[str, list] = {} - self._flow_builder = FlowBuilder(logger=logger) - self._async_executor = AsyncThreadExecutor( - event_manager=self._thread_event_manager, logger=logger) - self._cleanup_thread = threading.Thread(target=self._loop_action) - self._cleanup_thread.daemon = True - self._cleanup_thread.start() - - def run_task(self, job_id, request_body): - """ - run task - param job_id: job id - param request: job params - """ - # TODO: The database persists job information - with self._rw_lock.gen_wlock(): - if job_id in self._jobs: - self.logger.info( - f"Job already exists, job_id: {job_id}, status: {self._jobs[job_id][0]}") - return - self._jobs[job_id] = [ - JobStatus.RUNNING, datetime.datetime.now(), 0] - self.logger.info(log_utils.job_start_log_info(job_id)) - - # Create job context - job_context = JobContext.create_job_context( - request_body, self._workspace) - # Build job workflow - flow_context = self._flow_builder.build_flow_context( - job_id=job_context.job_id, workflow_configs=job_context.workflow_configs) - # Persistent workflow - self._flow_builder.save_flow_context(job_context.job_id, flow_context) - # Run workflow - self._async_executor.execute( - job_id, self._run_job_flow, self._on_task_finish, (job_context, flow_context)) - - def _run_job_flow(self, job_context, flow_context): - """ - run job flow - """ - - # the scheduler module starts scheduling tasks - self._scheduler.run(job_context, flow_context) - - def kill_job(self, job_id: str): - with self._rw_lock.gen_rlock(): - if job_id not in self._jobs or self._jobs[job_id][0] != JobStatus.RUNNING: - return - - self.logger.info(f"Kill job, job_id: {job_id}") - self._async_executor.kill(job_id) - - with self._rw_lock.gen_wlock(): - self._jobs[job_id][0] = JobStatus.FAILURE - - def status(self, job_id: str) -> tuple[str, float]: - """ - query task status - return: task status and task cost(s) - """ - with self._rw_lock.gen_rlock(): - if job_id not in self._jobs: - raise PpcException( - PpcErrorCode.JOB_NOT_FOUND.get_code(), - PpcErrorCode.JOB_NOT_FOUND.get_msg()) - status = self._jobs[job_id][0] - time_costs = self._jobs[job_id][2] - return status, time_costs - - def _on_task_finish(self, job_id: str, is_succeeded: bool, e: Exception = None): - with self._rw_lock.gen_wlock(): - time_costs = (datetime.datetime.now() - - self._jobs[job_id][1]).total_seconds() - self._jobs[job_id][2] = time_costs - if is_succeeded: - self._jobs[job_id][0] = JobStatus.SUCCESS - self.logger.info( - f"Job {job_id} completed, time_costs: {time_costs}s") - else: - self._jobs[job_id][0] = JobStatus.FAILURE - self.logger.warn( - f"Job {job_id} failed, time_costs: {time_costs}s, error: {e}") - self.logger.info(log_utils.job_end_log_info(job_id)) - - def _loop_action(self): - while True: - time.sleep(20) - self._terminate_timeout_jobs() - # TODO: store into the database - # self._cleanup_finished_jobs() - self._report_jobs() - - def _terminate_timeout_jobs(self): - jobs_to_kill = [] - with self._rw_lock.gen_rlock(): - for job_id, value in self._jobs.items(): - alive_time = (datetime.datetime.now() - - value[1]).total_seconds() - if alive_time >= self._job_timeout_s and value[0] == JobStatus.RUNNING: - jobs_to_kill.append(job_id) - - for job_id in jobs_to_kill: - self.logger.warn(f"Job is timeout, job_id: {job_id}") - self.kill_job(job_id) - - def _cleanup_finished_jobs(self): - jobs_to_cleanup = [] - with self._rw_lock.gen_rlock(): - for job_id, value in self._jobs.items(): - alive_time = (datetime.datetime.now() - - value[1]).total_seconds() - if alive_time >= self._job_timeout_s + 3600: - jobs_to_cleanup.append((job_id, value[3])) - self.logger.info(f"Job is finished, job_id: {job_id}") - with self._rw_lock.gen_wlock(): - for job_id, job_id in jobs_to_cleanup: - if job_id in self._jobs: - del self._jobs[job_id] - self._thread_event_manager.remove_event(job_id) - self.logger.info(f"Cleanup job cache, job_id: {job_id}") - - def _report_jobs(self): - with self._rw_lock.gen_rlock(): - job_count = len(self._jobs) - self.logger.info(f" ## Report job count: {job_count}") diff --git a/python/ppc_scheduler/job/job_status.py b/python/ppc_scheduler/job/job_status.py deleted file mode 100644 index c9cd8fa2..00000000 --- a/python/ppc_scheduler/job/job_status.py +++ /dev/null @@ -1,5 +0,0 @@ -class JobStatus: - RUNNING = 'RUNNING' - SUCCESS = 'SUCCESS' - FAILURE = 'FAILED' - KILLED = 'KILLED' diff --git a/python/ppc_scheduler/job/job_type.py b/python/ppc_scheduler/job/job_type.py deleted file mode 100644 index 22e67f02..00000000 --- a/python/ppc_scheduler/job/job_type.py +++ /dev/null @@ -1,12 +0,0 @@ - -class JobType: - PSI = "PSI" - MPC = "MPC" - PREPROCESSING = 'PREPROCESSING' - FEATURE_ENGINEERING = 'FEATURE_ENGINEERING' - TRAINING = 'TRAINING' - PREDICTION = 'PREDICTION' - - SHELL = "SHELL" - PYTHON = "PYTHON" - API = "API" diff --git a/python/ppc_scheduler/mpc_generator/__init__.py b/python/ppc_scheduler/mpc_generator/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/mpc_generator/generator.py b/python/ppc_scheduler/mpc_generator/generator.py deleted file mode 100644 index 86466192..00000000 --- a/python/ppc_scheduler/mpc_generator/generator.py +++ /dev/null @@ -1,468 +0,0 @@ -from enum import Enum - -import sqlparse -import sqlvalidator -from sqlparse.exceptions import SQLParseError -from sqlparse.sql import Comparison, Identifier, Function -from sqlparse.tokens import Punctuation, Operator, Name, Token - -from ppc_scheduler.mpc_generator import mpc_func_str -from ppc_common.ppc_utils.exception import PpcException, PpcErrorCode -from ppc_common.ppc_utils.utils import PPC_RESULT_FIELDS_FLAG, PPC_RESULT_VALUES_FLAG - - -class SqlPattern(Enum): - BASIC_ARITH_OPE = 1 - AGGR_FUNC_ONLY = 2 - AGGR_FUNC_WITH_GROUP_BY = 3 - - -INDENT = " " - -SUPPORTED_KEYWORDS = [ - 'SELECT', - 'FROM', - 'WHERE', - 'JOIN', - 'INNER JOIN', - 'ON', - 'AS', - 'GROUP BY', - 'COUNT', - 'SUM', - 'AVG', - 'MAX', - 'MIN', -] - -VALUE_TYPE = 'pfix' - -GROUP_BY_COLUMN_CODE = 'group_indexes_key[i]' - -DISPLAY_FIELDS_FUNC = 'set_display_field_names' - -DISPLAY_RESULT_FUNC = 'display_data' - - -class CodeGenerator: - - def __init__(self, sql_str): - self.sql_str = sql_str - - # three patterns supported - self.sql_pattern = SqlPattern.BASIC_ARITH_OPE - - # based on ID only - self.need_run_psi = False - - # record dataset sources - self.table_set = set() - - # 0: table number, 1: column index - self.group_by_column = [] - - # filter conditions - self.condition_list = [] - - self.result_fields = [] - - self.max_column_index = [] - - def sql_to_mpc_code(self): - operators = self.pre_parsing() - format_sql_str = sqlparse.format(self.sql_str, reindent=True, keyword_case='upper') - mpc_str = self.generate_common_code(format_sql_str) - mpc_str = self.generate_function_code(mpc_str) - mpc_str = self.generate_result_calculation_code(operators, mpc_str) - mpc_str = self.generate_result_print_code(mpc_str) - mpc_str = self.generate_mpc_execution_code(mpc_str) - mpc_str = self.replace_max_filed(mpc_str) - return mpc_str - - def pre_parsing(self): - try: - if not sqlvalidator.parse(self.sql_str).is_valid(): - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "bad sql statement") - - # format sql str - format_sql_str = sqlparse.format(self.sql_str, reindent=True, keyword_case='upper') - - tokens = sqlparse.parse(format_sql_str)[0].tokens - - # warning unsupported keywords - self.check_sql_tokens(tokens) - - # parse table aliases - aliases = self.find_table_alias(tokens, {}, False) - - # recover table aliases - new_sql_str = self.recover_table_name(tokens, aliases, '') - format_new_sql_str = sqlparse.format(new_sql_str, reindent=True, keyword_case='upper') - tokens = sqlparse.parse(format_new_sql_str)[0].tokens - - # parse filters (only 'id' based column alignment is supported currently) - self.find_table_and_condition(tokens, False) - - # check table suffix and number of participants - self.check_table() - - # ensure that all tables participate in alignment - self.check_table_alignment(self.need_run_psi, len(self.table_set)) - - self.max_column_index = [0 for _ in range(len(self.table_set))] - - self.check_sql_pattern(tokens) - - operators = self.extract_operators(format_new_sql_str) - - return operators - except SQLParseError: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "bad sql statement") - - def check_sql_tokens(self, tokens): - for token in tokens: - if token.is_keyword and token.value not in SUPPORTED_KEYWORDS: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), f"keyword '{token.value}' not supported") - if hasattr(token, 'tokens'): - self.check_sql_tokens(token.tokens) - - def find_table_alias(self, tokens, aliases, after_from): - end_current_round = False - for i in range(len(tokens)): - if after_from and tokens[i].value == 'AS': - # find a alias - end_current_round = True - aliases[tokens[i + 2].value] = tokens[i - 2].value - for i in range(len(tokens)): - if tokens[i].value == 'FROM': - after_from = True - if after_from and not end_current_round and hasattr(tokens[i], 'tokens'): - self.find_table_alias(tokens[i].tokens, aliases, after_from) - return aliases - - def recover_table_name(self, tokens, aliases, format_sql_str): - for i in range(len(tokens)): - if tokens[i].value == 'AS' and tokens[i + 2].value in aliases: - break - elif not tokens[i].is_group: - if tokens[i].value in aliases: - format_sql_str += aliases[tokens[i].value] - else: - format_sql_str += tokens[i].value - elif hasattr(tokens[i], 'tokens'): - format_sql_str = self.recover_table_name(tokens[i].tokens, aliases, format_sql_str) - return format_sql_str - - def find_table_and_condition(self, tokens, after_from): - for token in tokens: - if token.value == 'FROM': - after_from = True - if after_from: - if type(token) == Comparison: - self.check_equal_comparison(token.tokens) - self.condition_list.append(token.value) - if type(token) == Identifier and '.' not in token.value: - self.table_set.add(token.value) - elif hasattr(token, 'tokens'): - self.find_table_and_condition(token.tokens, after_from) - - def check_equal_comparison(self, tokens): - for i in range(len(tokens)): - if tokens[i].value == '=': - self.need_run_psi = True - elif tokens[i].value == '.' and tokens[i + 1].value != 'id': - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), - f"only 'id' based column alignment is supported currently") - elif hasattr(tokens[i], 'tokens'): - self.check_equal_comparison(tokens[i].tokens) - - def check_table_alignment(self, has_equal_comparison, table_count): - if has_equal_comparison: - column = self.condition_list[0].split('=')[0].strip() - table = column[0:column.find('.')] - - # all tables must be aligned - self.equal_comparison_dfs(table, [0 for _ in range(len(self.condition_list))]) - if len(self.table_set) != table_count: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "all tables must be aligned") - - def equal_comparison_dfs(self, table, flag_array): - for i in range(len(self.condition_list)): - if flag_array[i] == 0 and table in self.condition_list[i]: - flag_array[i] = 1 - columns = self.condition_list[i].split('=') - for column in columns: - table = column[0:column.find('.')].strip() - self.table_set.add(table) - self.equal_comparison_dfs(table, flag_array) - - def check_table(self): - table_count = len(self.table_set) - if table_count > 5 or table_count < 2: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "source must be 2 to 5 parties") - suffixes = set() - for table in self.table_set: - suffixes.add(table[-1]) - for i in range(table_count): - if str(i) not in suffixes: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "invalid suffix of table name") - - def update_max_field(self, table_number, field_number): - if field_number > self.max_column_index[table_number]: - self.max_column_index[table_number] = field_number - - def replace_max_filed(self, mpc_str): - for i in range(len(self.max_column_index)): - mpc_str = mpc_str.replace(f'$(source{i}_column_count)', str(self.max_column_index[i] + 1)) - return mpc_str - - def check_sql_pattern(self, tokens): - for i in range(len(tokens)): - if tokens[i].value == 'GROUP BY': - self.sql_pattern = SqlPattern.AGGR_FUNC_WITH_GROUP_BY - items = tokens[i + 2].value.split('.') - self.group_by_column = [int(items[0][-1]), get_column_number(items[1])] - elif type(tokens[i]) == Function: - self.sql_pattern = SqlPattern.AGGR_FUNC_ONLY - elif hasattr(tokens[i], 'tokens'): - self.check_sql_pattern(tokens[i].tokens) - - def extract_operators(self, format_sql_str): - start = format_sql_str.find("SELECT") - end = format_sql_str.find("FROM") - operators = format_sql_str[start + 6:end].split(',') - for i in range(len(operators)): - if ' AS ' in operators[i]: - index = operators[i].find(' AS ') - self.result_fields.append(operators[i][index + 4:].strip().strip('\n').strip('\'').strip('\"').strip()) - operators[i] = operators[i][:index].strip() - else: - self.result_fields.append(f"result{i}") - operators[i] = operators[i].strip() - if ' ' in self.result_fields[-1]: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "result field cannot contain space") - return operators - - def generate_common_code(self, format_sql_str): - table_count = len(self.table_set) - result_column_count = len(self.result_fields) - - if self.need_run_psi: - mpc_str = '# PSI_OPTION=True\n\n' - else: - mpc_str = '' - mpc_str += '# BIT_LENGTH = 128\n\n' - mpc_str += '# This file is generated automatically by ams\n' - mpc_str += f"'''\n{format_sql_str}\n'''\n\n" - mpc_str += "from ppc import *\n\n" - # mpc_str += "program.use_trunc_pr = True\n" - # mpc_str += "program.use_split(3)\n" - mpc_str += "n_threads = 8\n" - mpc_str += f"value_type = {VALUE_TYPE}\n\n" - if VALUE_TYPE == 'pfix': - mpc_str += f"pfix.set_precision(16, 47)\n\n" - - for i in range(table_count): - mpc_str += f"SOURCE{i} = {i}\n" - mpc_str += f"source{i}_record_count = $(source{i}_record_count)\n" - mpc_str += f"source{i}_column_count = $(source{i}_column_count)\n" - mpc_str += f"source{i}_record = Matrix(source{i}_record_count, source{i}_column_count, value_type)\n\n" - - if self.sql_pattern == SqlPattern.BASIC_ARITH_OPE: - mpc_str += "# basic arithmetic operation means that all parties have same number of record\n" - mpc_str += "result_record = $(source0_record_count)\n" - mpc_str += f"results = Matrix(result_record, {result_column_count}, value_type)\n\n\n" - elif self.sql_pattern == SqlPattern.AGGR_FUNC_ONLY: - mpc_str += f"results = Matrix(1, {result_column_count}, value_type)\n\n\n" - elif self.sql_pattern == SqlPattern.AGGR_FUNC_WITH_GROUP_BY: - mpc_str += "# group by means all parties have same number of record\n" - mpc_str += "source_record_count = $(source0_record_count)\n" - mpc_str += "result_record = cint(source_record_count)\n" - mpc_str += f"results = Matrix(source_record_count, {result_column_count}, value_type)\n\n\n" - - mpc_str += "def read_data_collection(data_collection, party_id):\n" - mpc_str += f"{INDENT}if data_collection.sizes[0] > 0:\n" - mpc_str += f"{INDENT}{INDENT}data_collection.input_from(party_id)\n\n\n" - - return mpc_str - - def generate_function_code(self, mpc_str): - if self.sql_pattern == SqlPattern.AGGR_FUNC_ONLY: - mpc_str += mpc_func_str.FUNC_COMPUTE_SUM - mpc_str += mpc_func_str.FUNC_COMPUTE_COUNT - mpc_str += mpc_func_str.FUNC_COMPUTE_AVG - mpc_str += mpc_func_str.FUNC_COMPUTE_MAX - mpc_str += mpc_func_str.FUNC_COMPUTE_MIN - elif self.sql_pattern == SqlPattern.AGGR_FUNC_WITH_GROUP_BY: - mpc_str += mpc_func_str.GROUP_BY_GLOBAL_VARIABLE - mpc_str += mpc_func_str.FUNC_COMPUTE_GROUP_BY_INDEXES - mpc_str += mpc_func_str.FUNC_COMPUTE_SUM_WITH_GROUP_BY - mpc_str += mpc_func_str.FUNC_COMPUTE_COUNT_WITH_GROUP_BY - mpc_str += mpc_func_str.FUNC_COMPUTE_AVG_WITH_GROUP_BY - mpc_str += mpc_func_str.FUNC_COMPUTE_MAX_WITH_GROUP_BY - mpc_str += mpc_func_str.FUNC_COMPUTE_MIN_WITH_GROUP_BY - return mpc_str - - def generate_result_calculation_code(self, operators, mpc_str): - for i in range(len(operators)): - tokens = sqlparse.parse(operators[i])[0].tokens - participants_set = set() - formula_str = self.generate_formula(tokens, '', participants_set) - if len(participants_set) == 1: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "disabled query pattern") - - if self.sql_pattern == SqlPattern.BASIC_ARITH_OPE or self.sql_pattern == SqlPattern.AGGR_FUNC_WITH_GROUP_BY: - mpc_str += f"def calculate_result_{i}():\n" \ - f"{INDENT}@for_range_opt_multithread(n_threads, result_record)\n" \ - f"{INDENT}def _(i):\n" \ - f"{INDENT}{INDENT}results[i][{i}] = {formula_str}\n\n\n" - elif self.sql_pattern == SqlPattern.AGGR_FUNC_ONLY: - mpc_str += f"def calculate_result_{i}():\n" \ - f"{INDENT}results[0][{i}] = {formula_str}\n\n\n" - return mpc_str - - def generate_result_print_code(self, mpc_str): - field_print_str = f"{PPC_RESULT_FIELDS_FLAG} = ['{self.result_fields[0]}'" - for i in range(1, len(self.result_fields)): - field_print_str += f", '{self.result_fields[i]}'" - field_print_str += ']' - - if self.sql_pattern == SqlPattern.BASIC_ARITH_OPE or self.sql_pattern == SqlPattern.AGGR_FUNC_WITH_GROUP_BY: - result_print_str = f"{PPC_RESULT_VALUES_FLAG} = [results[i][0].reveal()" - for i in range(1, len(self.result_fields)): - result_print_str += f", results[i][{i}].reveal()" - result_print_str += ']' - mpc_str += f"def print_results():\n" \ - f"{INDENT}{field_print_str}\n" \ - f"{INDENT}{DISPLAY_FIELDS_FUNC}({PPC_RESULT_FIELDS_FLAG})\n\n" \ - f"{INDENT}@for_range_opt(result_record)\n" \ - f"{INDENT}def _(i):\n" \ - f"{INDENT}{INDENT}{result_print_str}\n" \ - f"{INDENT}{INDENT}{DISPLAY_RESULT_FUNC}({PPC_RESULT_VALUES_FLAG})\n\n\n" - elif self.sql_pattern == SqlPattern.AGGR_FUNC_ONLY: - result_print_str = f"{PPC_RESULT_VALUES_FLAG} = [results[0][0].reveal()" - for i in range(1, len(self.result_fields)): - result_print_str += f", results[0][{i}].reveal()" - result_print_str += ']' - mpc_str += f"def print_results():\n" \ - f"{INDENT}{field_print_str}\n" \ - f"{INDENT}{DISPLAY_FIELDS_FUNC}({PPC_RESULT_FIELDS_FLAG})\n\n" \ - f"{INDENT}{result_print_str}\n" \ - f"{INDENT}{DISPLAY_RESULT_FUNC}({PPC_RESULT_VALUES_FLAG})\n\n\n" - return mpc_str - - def generate_mpc_execution_code(self, mpc_str): - mpc_str += 'def ppc_main():\n' - for i in range(len(self.table_set)): - mpc_str += f"{INDENT}read_data_collection(source{i}_record, SOURCE{i})\n" - - if self.sql_pattern == SqlPattern.AGGR_FUNC_WITH_GROUP_BY: - mpc_str += f"\n{INDENT}compute_group_by_indexes(source{self.group_by_column[0]}_record, " \ - f"{self.group_by_column[1]})\n\n" - - for i in range(len(self.result_fields)): - mpc_str += f"{INDENT}calculate_result_{i}()\n" - - mpc_str += f"\n{INDENT}print_results()\n\n\n" - - mpc_str += "ppc_main()\n" - return mpc_str - - def generate_formula(self, tokens, formula_str, participants_set): - for token in tokens: - if token.ttype == Punctuation \ - or token.ttype == Operator \ - or token.ttype == Token.Literal.Number.Integer \ - or token.ttype == Token.Operator.Comparison: - formula_str += token.value - elif type(token) == Function: - formula_str += self.handle_function(token) - elif type(token) == Identifier and token.tokens[0].ttype == Name and len(token.tokens) >= 3: - (table_number, field_number) = self.handle_basic_identifier(token) - if self.sql_pattern == SqlPattern.AGGR_FUNC_WITH_GROUP_BY: - if table_number != self.group_by_column[0] or field_number != self.group_by_column[1]: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "bad sql statement") - self.update_max_field(table_number, field_number) - formula_str += GROUP_BY_COLUMN_CODE - elif self.sql_pattern == SqlPattern.AGGR_FUNC_ONLY: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "disabled query pattern") - elif self.sql_pattern == SqlPattern.BASIC_ARITH_OPE: - if token.value == token.parent.value: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), - "disabled query pattern") - self.update_max_field(table_number, field_number) - formula_str += f"source{table_number}_record[i][{field_number}]" - participants_set.add(table_number) - elif hasattr(token, 'tokens'): - formula_str = self.generate_formula(token.tokens, formula_str, participants_set) - return formula_str - - def handle_function(self, token): - tokens = token.tokens - func_name = tokens[0].value - (table_number, field_number) = self.handle_parenthesis(tokens[1]) - self.update_max_field(table_number, field_number) - return self.func_to_formula(func_name, table_number, field_number) - - def func_to_formula(self, func, table_number, field_number): - if self.sql_pattern == SqlPattern.AGGR_FUNC_ONLY: - formula = { - 'COUNT': f"{mpc_func_str.FUNC_COMPUTE_COUNT_NAME}(source{table_number}_record_count)", - 'SUM': f"{mpc_func_str.FUNC_COMPUTE_SUM_NAME}(source{table_number}_record, " - f"source{table_number}_record_count, {field_number})", - 'AVG': f"{mpc_func_str.FUNC_COMPUTE_AVG_NAME}(source{table_number}_record, " - f"source{table_number}_record_count, {field_number})", - 'MAX': f"{mpc_func_str.FUNC_COMPUTE_MAX_NAME}(source{table_number}_record, " - f"source{table_number}_record_count, {field_number})", - 'MIN': f"{mpc_func_str.FUNC_COMPUTE_MIN_NAME}(source{table_number}_record, " - f"source{table_number}_record_count, {field_number})" - } - elif self.sql_pattern == SqlPattern.AGGR_FUNC_WITH_GROUP_BY: - formula = { - 'COUNT': f"{mpc_func_str.FUNC_COMPUTE_COUNT_WITH_GROUP_BY_NAME}(i)", - 'SUM': f"{mpc_func_str.FUNC_COMPUTE_SUM_WITH_GROUP_BY_NAME}(source{table_number}_record," - f" {field_number}, i)", - 'AVG': f"{mpc_func_str.FUNC_COMPUTE_AVG_WITH_GROUP_BY_NAME}(source{table_number}_record," - f" {field_number}, i)", - 'MAX': f"{mpc_func_str.FUNC_COMPUTE_MAX_WITH_GROUP_BY_NAME}(source{table_number}_record," - f" {field_number}, i)", - 'MIN': f"{mpc_func_str.FUNC_COMPUTE_MIN_WITH_GROUP_BY_NAME}(source{table_number}_record," - f" {field_number}, i)" - } - else: - formula = {} - - return formula.get(func, '') - - def handle_parenthesis(self, token): - for token in token.tokens: - if type(token) == Identifier: - (table_number, field_number) = self.handle_basic_identifier(token) - return table_number, field_number - - def handle_basic_identifier(self, token): - tokens = token.tokens - - if tokens[0].value not in self.table_set: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "table name not matched") - if tokens[1].value != '.': - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), "invalid identifier") - - field_num = get_column_number(tokens[2].value) - - return int(tokens[0].value[-1]), field_num - - -def get_column_number(field_name): - field_len = len(field_name) - field_num = 0 - for i in range(field_len, 0, -1): - try: - int(field_name[i - 1:field_len]) - except ValueError: - if i == field_len: - raise PpcException(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), - f"invalid field suffix of table column '{field_name}'") - field_num = int(field_name[i:field_len]) - break - return field_num diff --git a/python/ppc_scheduler/mpc_generator/mpc_func_str.py b/python/ppc_scheduler/mpc_generator/mpc_func_str.py deleted file mode 100644 index 87501124..00000000 --- a/python/ppc_scheduler/mpc_generator/mpc_func_str.py +++ /dev/null @@ -1,64 +0,0 @@ -import os - -FILE_PATH = os.path.abspath(__file__) - -CURRENT_PATH = os.path.abspath(os.path.dirname(FILE_PATH) + os.path.sep + ".") - -AGGR_FUNC_SAMPLE_PATH = f"{CURRENT_PATH}{os.sep}mpc_sample{os.sep}aggr_func_only.mpc" -GROUP_BY_SAMPLE_PATH = f"{CURRENT_PATH}{os.sep}mpc_sample{os.sep}aggr_func_with_group_by.mpc" - -FUNC_COMPUTE_SUM_NAME = 'compute_sum' -FUNC_COMPUTE_COUNT_NAME = 'compute_count' -FUNC_COMPUTE_AVG_NAME = 'compute_avg' -FUNC_COMPUTE_MAX_NAME = 'compute_max' -FUNC_COMPUTE_MIN_NAME = 'compute_min' - -FUNC_COMPUTE_GROUP_BY_INDEXES_NAME = 'compute_group_by_indexes' -FUNC_COMPUTE_SUM_WITH_GROUP_BY_NAME = 'compute_sum_with_group_by' -FUNC_COMPUTE_COUNT_WITH_GROUP_BY_NAME = 'compute_count_with_group_by' -FUNC_COMPUTE_AVG_WITH_GROUP_BY_NAME = 'compute_avg_with_group_by' -FUNC_COMPUTE_MAX_WITH_GROUP_BY_NAME = 'compute_max_with_group_by' -FUNC_COMPUTE_MIN_WITH_GROUP_BY_NAME = 'compute_min_with_group_by' - -with open(AGGR_FUNC_SAMPLE_PATH, "r") as file: - AGGR_FUNC_SAMPLE_STR = file.read() - -with open(GROUP_BY_SAMPLE_PATH, "r") as file: - GROUP_BY_SAMPLE_STR = file.read() - - -def get_body_str_by_name(start_str, end_str, sql_pattern): - if sql_pattern == 1: - source_str = AGGR_FUNC_SAMPLE_STR - elif sql_pattern == 2: - source_str = GROUP_BY_SAMPLE_STR - else: - return '' - - start_index = source_str.find(start_str) - source_str = source_str[start_index:] - - end_index = source_str.find(end_str) + len(end_str) - return source_str[:end_index] - - -def get_func_str_by_name(func_name, sql_pattern): - start_str = f"def {func_name}" - end_str = "\n\n\n" - return get_body_str_by_name(start_str, end_str, sql_pattern) - - -FUNC_COMPUTE_SUM = get_func_str_by_name(FUNC_COMPUTE_SUM_NAME, 1) -FUNC_COMPUTE_COUNT = get_func_str_by_name(FUNC_COMPUTE_COUNT_NAME, 1) -FUNC_COMPUTE_AVG = get_func_str_by_name(FUNC_COMPUTE_AVG_NAME, 1) -FUNC_COMPUTE_MAX = get_func_str_by_name(FUNC_COMPUTE_MAX_NAME, 1) -FUNC_COMPUTE_MIN = get_func_str_by_name(FUNC_COMPUTE_MIN_NAME, 1) - -GROUP_BY_GLOBAL_VARIABLE = get_body_str_by_name("# matrix of indexes", "\n\n\n", 2) - -FUNC_COMPUTE_GROUP_BY_INDEXES = get_func_str_by_name(FUNC_COMPUTE_GROUP_BY_INDEXES_NAME, 2) -FUNC_COMPUTE_SUM_WITH_GROUP_BY = get_func_str_by_name(FUNC_COMPUTE_SUM_WITH_GROUP_BY_NAME, 2) -FUNC_COMPUTE_COUNT_WITH_GROUP_BY = get_func_str_by_name(FUNC_COMPUTE_COUNT_WITH_GROUP_BY_NAME, 2) -FUNC_COMPUTE_AVG_WITH_GROUP_BY = get_func_str_by_name(FUNC_COMPUTE_AVG_WITH_GROUP_BY_NAME, 2) -FUNC_COMPUTE_MAX_WITH_GROUP_BY = get_func_str_by_name(FUNC_COMPUTE_MAX_WITH_GROUP_BY_NAME, 2) -FUNC_COMPUTE_MIN_WITH_GROUP_BY = get_func_str_by_name(FUNC_COMPUTE_MIN_WITH_GROUP_BY_NAME, 2) diff --git a/python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_only.mpc b/python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_only.mpc deleted file mode 100644 index 9ddb3c34..00000000 --- a/python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_only.mpc +++ /dev/null @@ -1,136 +0,0 @@ -# PSI_OPTION=True - -# BIT_LENGTH = 128 - -# This file is generated automatically by ams -''' -SELECT COUNT(s1.field3) + COUNT(s2.field3) AS r0, - SUM(s1.field3) + COUNT(s0.field0) AS 'count', - (MAX(s0.field1) + MAX(s2.field1)) / 2 AS r1, - (AVG(s1.field2) + AVG(s2.field2)) / 2 AS r2, - MIN(s1.field0) - MIN(s0.field0) AS r3 -FROM (source0 AS s0 - INNER JOIN source1 AS s1 ON s0.id = s1.id) -INNER JOIN source2 AS s2 ON s0.id = s2.id; -''' - -from ppc import * - -n_threads = 8 -value_type = pfix - -pfix.set_precision(16, 47) - -SOURCE0 = 0 -source0_record_count = $(source0_record_count) -source0_column_count = 2 -source0_record = Matrix(source0_record_count, source0_column_count, value_type) - -SOURCE1 = 1 -source1_record_count = $(source1_record_count) -source1_column_count = 4 -source1_record = Matrix(source1_record_count, source1_column_count, value_type) - -SOURCE2 = 2 -source2_record_count = $(source2_record_count) -source2_column_count = 4 -source2_record = Matrix(source2_record_count, source2_column_count, value_type) - -results = Matrix(1, 5, value_type) - - -def read_data_collection(data_collection, party_id): - if data_collection.sizes[0] > 0: - data_collection.input_from(party_id) - - -def compute_sum(source, record_count, col_index): - records_sum = Array(1, value_type) - records_sum[0] = source[0][col_index] - - @for_range(1, record_count) - def _(i): - records_sum[0] = records_sum[0] + source[i][col_index] - - return records_sum[0] - - -def compute_count(record_count): - return record_count - - -def compute_avg(source, record_count, col_index): - records_sum = Array(1, value_type) - records_sum[0] = source[0][col_index] - - @for_range(1, record_count) - def _(i): - records_sum[0] = records_sum[0] + source[i][col_index] - - return records_sum[0] / record_count - - -def compute_max(source, record_count, col_index): - max_record = Array(1, value_type) - max_record[0] = source[0][col_index] - - @for_range(1, record_count) - def _(i): - max_record[0] = condition(max_record[0] < source[i][col_index], source[i][col_index], max_record[0]) - - return max_record[0] - - -def compute_min(source, record_count, col_index): - min_record = Array(1, value_type) - min_record[0] = source[0][col_index] - - @for_range(1, record_count) - def _(i): - min_record[0] = condition(min_record[0] > source[i][col_index], source[i][col_index], min_record[0]) - - return min_record[0] - - -def calculate_result_0(): - results[0][0] = compute_count(source1_record_count)+compute_count(source2_record_count) - - -def calculate_result_1(): - results[0][1] = compute_sum(source1_record, source1_record_count, 3)+compute_count(source0_record_count) - - -def calculate_result_2(): - results[0][2] = (compute_max(source0_record, source0_record_count, 1)+compute_max(source2_record, source2_record_count, 1))/2 - - -def calculate_result_3(): - results[0][3] = (compute_avg(source1_record, source1_record_count, 2)+compute_avg(source2_record, source2_record_count, 2))/2 - - -def calculate_result_4(): - results[0][4] = compute_min(source1_record, source1_record_count, 0)-compute_min(source0_record, source0_record_count, 0) - - -def print_results(): - result_fields = ['r0', 'count', 'r1', 'r2', 'r3'] - set_display_field_names(result_fields) - - result_values = [results[0][0].reveal(), results[0][1].reveal(), results[0][2].reveal(), results[0][3].reveal(), results[0][4].reveal()] - display_data(result_values) - - -def ppc_main(): - read_data_collection(source0_record, SOURCE0) - read_data_collection(source1_record, SOURCE1) - read_data_collection(source2_record, SOURCE2) - calculate_result_0() - calculate_result_1() - calculate_result_2() - calculate_result_3() - calculate_result_4() - - print_results() - - -ppc_main() diff --git a/python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_with_group_by.mpc b/python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_with_group_by.mpc deleted file mode 100644 index f1a01339..00000000 --- a/python/ppc_scheduler/mpc_generator/mpc_sample/aggr_func_with_group_by.mpc +++ /dev/null @@ -1,234 +0,0 @@ -# PSI_OPTION=True - -# BIT_LENGTH = 128 - -# This file is generated automatically by ams -''' -SELECT 3*s1.field4 AS r0, - COUNT(s1.field4) AS 'count', - AVG(s0.field1) * 2 + s1.field4 AS r1, - (SUM(s0.field2) + SUM(s1.field2))/(COUNT(s1.field3) + 100/(MIN(s0.field1)+MIN(s1.field1))) + 10, - MAX(s1.field1), - MIN(s2.field2) -FROM (source0 AS s0 - INNER JOIN source1 AS s1 ON s0.id = s1.id) -INNER JOIN source2 AS s2 ON s0.id = s2.id -GROUP BY s1.field4; -''' - -from ppc import * - -n_threads = 8 -value_type = pfix - -pfix.set_precision(16, 47) - -SOURCE0 = 0 -source0_record_count = $(source0_record_count) -source0_column_count = 3 -source0_record = Matrix(source0_record_count, source0_column_count, value_type) - -SOURCE1 = 1 -source1_record_count = $(source1_record_count) -source1_column_count = 5 -source1_record = Matrix(source1_record_count, source1_column_count, value_type) - -SOURCE2 = 2 -source2_record_count = $(source2_record_count) -source2_column_count = 3 -source2_record = Matrix(source2_record_count, source2_column_count, value_type) - -# group by means all parties have same number of record -source_record_count = $(source0_record_count) -result_record = cint(source_record_count) -results = Matrix(source_record_count, 6, value_type) - - -def read_data_collection(data_collection, party_id): - if data_collection.sizes[0] > 0: - data_collection.input_from(party_id) - - -# matrix of indexes after group by: -# 0 1 2 -# 0 count1 start_index end_index -# 1 count2 start_index end_index -# 2 count3 start_index end_index -# ... -# source_record_count - 1 ... -group_indexes_key = Array(source_record_count, value_type) -group_indexes_matrix = Matrix(source_record_count, 3, pint) -group_column = Array(source_record_count, value_type) - - -def compute_group_by_indexes(source, col_index): - # group_count group_index group_flag - group_states = Array(3, cint) - - @for_range_opt(source_record_count) - def _(i): - group_column[i] = source[i][col_index] - group_states[1] = 0 - group_states[2] = 0 - - @for_range(group_states[0]) - def _(j): - @if_(pint(group_indexes_key[j] == source[i][col_index]).reveal()) - def _(): - group_states[1] = j - group_states[2] = 1 - - @if_e(group_states[2] == 0) - def _(): - # new item - group_indexes_key[group_states[0]] = source[i][col_index] - group_indexes_matrix[group_states[0]][0] = 1 - group_indexes_matrix[group_states[0]][1] = i - group_indexes_matrix[group_states[0]][2] = i - group_states[0] = group_states[0] + 1 - - @else_ - def _(): - group_indexes_matrix[group_states[1]][0] = group_indexes_matrix[group_states[1]][0] + 1 - group_indexes_matrix[group_states[1]][2] = i - - global result_record - result_record = group_states[0] - - -def compute_sum_with_group_by(source, col_index, group_row_index): - records_sum = Array(1, value_type) - - start_index = group_indexes_matrix[group_row_index][1].reveal() - end_index = group_indexes_matrix[group_row_index][2].reveal() - - records_sum[0] = source[start_index][col_index] - - @for_range(start_index + 1, end_index + 1) - def _(i): - @if_(pint(group_indexes_key[group_row_index] == group_column[i]).reveal()) - def _(): - records_sum[0] = records_sum[0] + source[i][col_index] - - return records_sum[0] - - -def compute_count_with_group_by(group_row_index): - return group_indexes_matrix[group_row_index][0] - - -def compute_avg_with_group_by(source, col_index, group_row_index): - records_sum = Array(1, value_type) - - start_index = group_indexes_matrix[group_row_index][1].reveal() - end_index = group_indexes_matrix[group_row_index][2].reveal() - - records_sum[0] = source[start_index][col_index] - - @for_range(start_index + 1, end_index + 1) - def _(i): - @if_(pint(group_indexes_key[group_row_index] == group_column[i]).reveal()) - def _(): - records_sum[0] = records_sum[0] + source[i][col_index] - - return value_type(records_sum[0] / group_indexes_matrix[group_row_index][0]) - - -def compute_max_with_group_by(source, col_index, group_row_index): - max_records = Array(1, value_type) - - start_index = group_indexes_matrix[group_row_index][1].reveal() - end_index = group_indexes_matrix[group_row_index][2].reveal() - - max_records[0] = source[start_index][col_index] - - @for_range(start_index + 1, end_index + 1) - def _(i): - @if_(pint(group_indexes_key[group_row_index] == group_column[i]).reveal()) - def _(): - max_records[0] = condition(max_records[0] < source[i][col_index], source[i][col_index], max_records[0]) - - return max_records[0] - - -def compute_min_with_group_by(source, col_index, group_row_index): - min_records = Array(1, value_type) - - start_index = group_indexes_matrix[group_row_index][1].reveal() - end_index = group_indexes_matrix[group_row_index][2].reveal() - - min_records[0] = source[start_index][col_index] - - @for_range(start_index + 1, end_index + 1) - def _(i): - @if_(pint(group_indexes_key[group_row_index] == group_column[i]).reveal()) - def _(): - min_records[0] = condition(min_records[0] > source[i][col_index], source[i][col_index], min_records[0]) - - return min_records[0] - - -def calculate_result_0(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][0] = 3*group_indexes_key[i] - - -def calculate_result_1(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][1] = compute_count_with_group_by(i) - - -def calculate_result_2(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][2] = compute_avg_with_group_by(source0_record, 1, i)*2+group_indexes_key[i] - - -def calculate_result_3(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][3] = (compute_sum_with_group_by(source0_record, 2, i)+compute_sum_with_group_by(source1_record, 2, i))/(compute_count_with_group_by(i)+100/(compute_min_with_group_by(source0_record, 1, i)+compute_min_with_group_by(source1_record, 1, i)))+10 - - -def calculate_result_4(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][4] = compute_max_with_group_by(source1_record, 1, i) - - -def calculate_result_5(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][5] = compute_min_with_group_by(source2_record, 2, i) - - -def print_results(): - result_fields = ['r0', 'count', 'r1', 'result3', 'result4', 'result5'] - set_display_field_names(result_fields) - - @for_range_opt(result_record) - def _(i): - result_values = [results[i][0].reveal(), results[i][1].reveal(), results[i][2].reveal(), results[i][3].reveal(), results[i][4].reveal(), results[i][5].reveal()] - display_data(result_values) - - -def ppc_main(): - read_data_collection(source0_record, SOURCE0) - read_data_collection(source1_record, SOURCE1) - read_data_collection(source2_record, SOURCE2) - - compute_group_by_indexes(source1_record, 4) - - calculate_result_0() - calculate_result_1() - calculate_result_2() - calculate_result_3() - calculate_result_4() - calculate_result_5() - - print_results() - - -ppc_main() diff --git a/python/ppc_scheduler/mpc_generator/mpc_sample/basic_arith_ope.mpc b/python/ppc_scheduler/mpc_generator/mpc_sample/basic_arith_ope.mpc deleted file mode 100644 index fa2fc97d..00000000 --- a/python/ppc_scheduler/mpc_generator/mpc_sample/basic_arith_ope.mpc +++ /dev/null @@ -1,78 +0,0 @@ -# PSI_OPTION=True - -# BIT_LENGTH = 128 - -# This file is generated automatically by ams -''' -SELECT 3*(s1.field3 + s2.field3) - s0.field3 AS r0, - (s0.field1 + s2.field1) / 2 * s1.field1 AS r1 -FROM (source0 AS s0 - INNER JOIN source1 AS s1 ON s0.id = s1.id) -INNER JOIN source2 AS s2 ON s0.id = s2.id; -''' - -from ppc import * - -n_threads = 8 -value_type = pfix - -pfix.set_precision(16, 47) - -SOURCE0 = 0 -source0_record_count = $(source0_record_count) -source0_column_count = 4 -source0_record = Matrix(source0_record_count, source0_column_count, value_type) - -SOURCE1 = 1 -source1_record_count = $(source1_record_count) -source1_column_count = 4 -source1_record = Matrix(source1_record_count, source1_column_count, value_type) - -SOURCE2 = 2 -source2_record_count = $(source2_record_count) -source2_column_count = 4 -source2_record = Matrix(source2_record_count, source2_column_count, value_type) - -# basic arithmetic operation means that all parties have same number of record -result_record = $(source0_record_count) -results = Matrix(result_record, 2, value_type) - - -def read_data_collection(data_collection, party_id): - if data_collection.sizes[0] > 0: - data_collection.input_from(party_id) - - -def calculate_result_0(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][0] = 3*(source1_record[i][3]+source2_record[i][3])-source0_record[i][3] - - -def calculate_result_1(): - @for_range_opt_multithread(n_threads, result_record) - def _(i): - results[i][1] = (source0_record[i][1]+source2_record[i][1])/2*source1_record[i][1] - - -def print_results(): - result_fields = ['r0', 'r1'] - set_display_field_names(result_fields) - - @for_range_opt(result_record) - def _(i): - result_values = [results[i][0].reveal(), results[i][1].reveal()] - display_data(result_values) - - -def ppc_main(): - read_data_collection(source0_record, SOURCE0) - read_data_collection(source1_record, SOURCE1) - read_data_collection(source2_record, SOURCE2) - calculate_result_0() - calculate_result_1() - - print_results() - - -ppc_main() diff --git a/python/ppc_scheduler/mpc_generator/test_generator.py b/python/ppc_scheduler/mpc_generator/test_generator.py deleted file mode 100644 index 9a37a2e1..00000000 --- a/python/ppc_scheduler/mpc_generator/test_generator.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -import os -import unittest - -from ppc_scheduler.mpc_generator.generator import CodeGenerator -from ppc_common.ppc_utils.exception import PpcException, PpcErrorCode - -FILE_PATH = os.path.abspath(__file__) - -CURRENT_PATH = os.path.abspath(os.path.dirname(FILE_PATH) + os.path.sep + ".") - -BASIC_ARITH_OPE_PATH = f"{CURRENT_PATH}{os.sep}mpc_sample{os.sep}basic_arith_ope.mpc" -AGGR_FUNC_SAMPLE_PATH = f"{CURRENT_PATH}{os.sep}mpc_sample{os.sep}aggr_func_only.mpc" -GROUP_BY_SAMPLE_PATH = f"{CURRENT_PATH}{os.sep}mpc_sample{os.sep}aggr_func_with_group_by.mpc" - -with open(BASIC_ARITH_OPE_PATH, "r") as file: - BASIC_ARITH_OPE_STR = file.read() - -with open(AGGR_FUNC_SAMPLE_PATH, "r") as file: - AGGR_FUNC_SAMPLE_STR = file.read() - -with open(GROUP_BY_SAMPLE_PATH, "r") as file: - GROUP_BY_SAMPLE_STR = file.read() - - -class TestGenerator(unittest.TestCase): - - def test_bad_sql(self): - try: - sql = "select from a from b where c = d" - code_generator = CodeGenerator(sql) - code_generator.sql_to_mpc_code() - except PpcException as e: - self.assertEqual(PpcErrorCode.ALGORITHM_BAD_SQL.get_code(), e.code) - - sql = "select a0.f1 + b1.f1 from a0, b1 where a0.id=b1.id" - code_generator = CodeGenerator(sql) - self.assertIsNotNone(code_generator.sql_to_mpc_code()) - - def test_unsupported_keyword(self): - try: - sql = "select s0.f1 + s1.f1 from s0, s1 where s0.f1 > 1 and s0.f1 < 10" - code_generator = CodeGenerator(sql) - code_generator.sql_to_mpc_code() - except PpcException as e: - self.assertEqual("keyword 'AND' not supported", e.message) - - def test_disabled_query_pattern(self): - try: - sql = "select s0.f1, 3 + s1.f1 from s0, s1" - code_generator = CodeGenerator(sql) - code_generator.sql_to_mpc_code() - except PpcException as e: - self.assertEqual("disabled query pattern", e.message) - - try: - sql = "select s0.f1, s1.f1 + s1.f1 from s0, s1" - code_generator = CodeGenerator(sql) - code_generator.sql_to_mpc_code() - except PpcException as e: - self.assertEqual("disabled query pattern", e.message) - - def test_basic_pattern(self): - sql = "SELECT 3*(s1.field3 + s2.field3) - s0.field3 AS r0, \ - (s0.field1 + s2.field1) / 2 * s1.field1 AS r1\ - FROM (source0 AS s0\ - INNER JOIN source1 AS s1 ON s0.id = s1.id)\ - INNER JOIN source2 AS s2 ON s0.id = s2.id;" - code_generator = CodeGenerator(sql) - self.assertEqual(BASIC_ARITH_OPE_STR, code_generator.sql_to_mpc_code()) - - def test_single_aggre_pattern(self): - sql = "SELECT COUNT(s1.field3) + COUNT(s2.field3) AS r0,\ - SUM(s1.field3) + COUNT(s0.field0) AS 'count',\ - (MAX(s0.field1) + MAX(s2.field1)) / 2 AS r1,\ - (AVG(s1.field2) + AVG(s2.field2)) / 2 AS r2,\ - MIN(s1.field0) - MIN(s0.field0) AS r3\ - FROM (source0 AS s0\ - INNER JOIN source1 AS s1 ON s0.id = s1.id)\ - INNER JOIN source2 AS s2 ON s0.id = s2.id;" - code_generator = CodeGenerator(sql) - self.assertEqual(AGGR_FUNC_SAMPLE_STR, code_generator.sql_to_mpc_code()) - - def test_group_by_pattern(self): - sql = "SELECT 3*s1.field4 AS r0,\ - COUNT(s1.field4) AS 'count', \ - AVG(s0.field1) * 2 + s1.field4 AS r1,\ - (SUM(s0.field2) + SUM(s1.field2))/(COUNT(s1.field3) + 100/(MIN(s0.field1)+MIN(s1.field1))) + 10,\ - MAX(s1.field1),\ - MIN(s2.field2)\ - FROM (source0 AS s0\ - INNER JOIN source1 AS s1 ON s0.id = s1.id)\ - INNER JOIN source2 AS s2 ON s0.id = s2.id\ - GROUP BY s1.field4;" - code_generator = CodeGenerator(sql) - self.assertEqual(GROUP_BY_SAMPLE_STR, code_generator.sql_to_mpc_code()) - - -if __name__ == '__main__': - unittest.main(verbosity=1) diff --git a/python/ppc_scheduler/node/__init__.py b/python/ppc_scheduler/node/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/node/computing_node_client/__init__.py b/python/ppc_scheduler/node/computing_node_client/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/node/computing_node_client/model_node_client.py b/python/ppc_scheduler/node/computing_node_client/model_node_client.py deleted file mode 100644 index 0bb9d9fe..00000000 --- a/python/ppc_scheduler/node/computing_node_client/model_node_client.py +++ /dev/null @@ -1,95 +0,0 @@ -import json -import time - -from ppc_common.ppc_utils import http_utils -from ppc_common.ppc_utils.exception import PpcException, PpcErrorCode - -RUN_MODEL_API_PREFIX = "/api/ppc-model/pml/run-model-task/" -GET_MODEL_LOG_API_PREFIX = "/api/ppc-model/pml/record-model-log/" - - -class ModelClient: - def __init__(self, logger, endpoint, token, polling_interval_s: int = 5, max_retries: int = 5, retry_delay_s: int = 5): - self.logger = logger - self.endpoint = endpoint - self.token = token - self.polling_interval_s = polling_interval_s - self.max_retries = max_retries - self.retry_delay_s = retry_delay_s - self._completed_status = 'COMPLETED' - self._failed_status = 'FAILED' - - def run(self, *args): - - params = args[0] - if type(params) == str: - params = json.loads(params) - - task_id = params['task_id'] - - try: - self.logger.info(f"model client begin to run model task {task_id}") - response = self._send_request_with_retry(http_utils.send_post_request, - endpoint=self.endpoint, - uri=RUN_MODEL_API_PREFIX + task_id, - params=params) - check_response(response) - return self._poll_task_status(task_id) - except Exception as e: - self.logger.error(f"model client run model task exception, task: {task_id}, e: {e}") - raise e - - def kill(self, task_id): - try: - self.logger.info(f"model client begin to kill model task {task_id}") - response = self._send_request_with_retry(http_utils.send_delete_request, - endpoint=self.endpoint, - uri=RUN_MODEL_API_PREFIX + task_id) - check_response(response) - self.logger.info(f"model client task {task_id} was killed") - return response - except Exception as e: - self.logger.warn(f"model client kill task {task_id} exception, e: {e}") - raise e - - def _poll_task_status(self, task_id): - while True: - response = self._send_request_with_retry(http_utils.send_get_request, - endpoint=self.endpoint, - uri=RUN_MODEL_API_PREFIX + task_id) - check_response(response) - if response['data']['status'] == self._completed_status: - self.logger.info(f"model client task {task_id} completed, response: {response['data']}") - return response - elif response['data']['status'] == self._failed_status: - self.logger.warn(f"model client task {task_id} failed, response: {response['data']}") - raise PpcException(PpcErrorCode.CALL_SCS_ERROR.get_code(), response['data']) - else: - time.sleep(self.polling_interval_s) - - def get_remote_log(self, remote_id): - response = self._send_request_with_retry(http_utils.send_get_request, - endpoint=self.endpoint, - uri=GET_MODEL_LOG_API_PREFIX + remote_id) - check_response(response) - return response['data'] - - def _send_request_with_retry(self, request_func, *args, **kwargs): - attempt = 0 - while attempt < self.max_retries: - try: - response = request_func(*args, **kwargs) - return response - except Exception as e: - self.logger.warn(f"Request failed: {e}, attempt {attempt + 1}/{self.max_retries}") - attempt += 1 - if attempt < self.max_retries: - time.sleep(self.retry_delay_s) - else: - self.logger.warn(f"Request failed after {self.max_retries} attempts") - raise e - - -def check_response(response): - if response['errorCode'] != 0: - raise PpcException(PpcErrorCode.CALL_SCS_ERROR.get_code(), response['message']) diff --git a/python/ppc_scheduler/node/computing_node_client/mpc_node_client.py b/python/ppc_scheduler/node/computing_node_client/mpc_node_client.py deleted file mode 100644 index f64ece20..00000000 --- a/python/ppc_scheduler/node/computing_node_client/mpc_node_client.py +++ /dev/null @@ -1,32 +0,0 @@ -import random - -from ppc_common.ppc_utils import http_utils, utils -from ppc_scheduler.node.computing_node_client.utils import check_privacy_service_response - - -class MpcClient: - def __init__(self, endpoint): - self.endpoint = endpoint - - def run(self, job_info, token): - params = { - 'jsonrpc': '2', - 'method': 'run', - 'token': token, - 'id': random.randint(1, 65535), - 'params': job_info - } - response = http_utils.send_post_request(self.endpoint, None, params) - check_privacy_service_response(response) - return response['result'] - - def kill(self, job_id, token): - params = { - 'jsonrpc': '2', - 'method': 'kill', - 'token': token, - 'id': random.randint(1, 65535), - 'params': {'jobId': job_id} - } - http_utils.send_post_request(self.endpoint, None, params) - return utils.make_response(0, "success", None) diff --git a/python/ppc_scheduler/node/computing_node_client/psi_client.py b/python/ppc_scheduler/node/computing_node_client/psi_client.py deleted file mode 100644 index 7dcd5ba3..00000000 --- a/python/ppc_scheduler/node/computing_node_client/psi_client.py +++ /dev/null @@ -1,74 +0,0 @@ -import json -import random -import time - -from ppc_common.ppc_utils import http_utils -from ppc_common.ppc_utils.exception import PpcException, PpcErrorCode -from ppc_scheduler.node.computing_node_client.utils import check_privacy_service_response - - -class PsiClient: - def __init__(self, logger, endpoint, token, polling_interval_s: int = 3, max_retries: int = 3, retry_delay_s: int = 3): - self.logger = logger - self.endpoint = endpoint - self.token = token - self.polling_interval_s = polling_interval_s - self.max_retries = max_retries - self.retry_delay_s = retry_delay_s - self._get_task_status_method = 'getTaskStatus' - self._completed_status = 'COMPLETED' - self._failed_status = 'FAILED' - - def run(self, *args): - - params = args[0] - if type(params) == str: - params = json.loads(params) - - task_id = params['taskID'] - - json_rpc_request = { - 'jsonrpc': '1', - 'method': 'asyncRunTask', - 'token': self.token, - 'id': random.randint(1, 65535), - 'params': params - } - response = self._send_request_with_retry(http_utils.send_post_request, self.endpoint, None, json_rpc_request) - check_privacy_service_response(response) - return self._poll_task_status(task_id) - - def _poll_task_status(self, task_id: str): - while True: - params = { - 'jsonrpc': '1', - 'method': 'getTaskStatus', - 'token': self.token, - 'id': random.randint(1, 65535), - 'params': { - 'taskID': task_id, - } - } - response = self._send_request_with_retry(http_utils.send_post_request, self.endpoint, None, params) - check_privacy_service_response(response) - if response['result']['status'] == self._completed_status: - return response['result'] - elif response['result']['status'] == self._failed_status: - self.logger.warn(f"task {task_id} failed, response: {response['data']}") - raise PpcException(PpcErrorCode.CALL_SCS_ERROR.get_code(), response['data']) - time.sleep(self.polling_interval_s) - - def _send_request_with_retry(self, request_func, *args, **kwargs): - attempt = 0 - while attempt < self.max_retries: - try: - response = request_func(*args, **kwargs) - return response - except Exception as e: - self.logger.warn(f"Request failed: {e}, attempt {attempt + 1}/{self.max_retries}") - attempt += 1 - if attempt < self.max_retries: - time.sleep(self.retry_delay_s) - else: - self.logger.warn(f"Request failed after {self.max_retries} attempts") - raise e diff --git a/python/ppc_scheduler/node/computing_node_client/utils.py b/python/ppc_scheduler/node/computing_node_client/utils.py deleted file mode 100644 index 3d3527c8..00000000 --- a/python/ppc_scheduler/node/computing_node_client/utils.py +++ /dev/null @@ -1,8 +0,0 @@ -from ppc_common.ppc_utils.exception import PpcErrorCode, PpcException - - -def check_privacy_service_response(response): - if 'result' not in response.keys(): - raise PpcException(PpcErrorCode.CALL_SCS_ERROR.get_code(), "http request error") - elif 0 != response['result']['code'] or response['result']['status'] == 'FAILED': - raise PpcException(PpcErrorCode.CALL_SCS_ERROR.get_code(), response['result']['message']) diff --git a/python/ppc_scheduler/node/node_manager.py b/python/ppc_scheduler/node/node_manager.py deleted file mode 100644 index c180714b..00000000 --- a/python/ppc_scheduler/node/node_manager.py +++ /dev/null @@ -1,31 +0,0 @@ -from ppc_scheduler.database import computing_node_mapper -from ppc_scheduler.workflow.common.worker_type import WorkerType - - -class ComputingNodeManager: - type_map = { - WorkerType.T_PSI: 'PSI', - WorkerType.T_MPC: 'MPC', - WorkerType.T_MODEL: 'MODEL' - } - - def __init__(self, components): - self.components = components - - def add_node(self, node_id: str, url: str, worker_type: str): - with self.components.create_sql_session() as session: - computing_node_mapper.insert_computing_node( - session, node_id, url, self.type_map[worker_type], 0) - - def remove_node(self, url: str, worker_type: str): - with self.components.create_sql_session() as session: - computing_node_mapper.delete_computing_node( - session, url, self.type_map[worker_type]) - - def get_node(self, worker_type: str): - with self.components.create_sql_session() as session: - return computing_node_mapper.get_and_update_min_loading_url(session, self.type_map[worker_type]) - - def release_node(self, url: str, worker_type: str): - with self.components.create_sql_session() as session: - return computing_node_mapper.release_loading(session, url, self.type_map[worker_type]) diff --git a/python/ppc_scheduler/ppc_scheduler_app.py b/python/ppc_scheduler/ppc_scheduler_app.py deleted file mode 100644 index 0195236f..00000000 --- a/python/ppc_scheduler/ppc_scheduler_app.py +++ /dev/null @@ -1,125 +0,0 @@ -# Note: here can't be refactored by autopep - -import os -import sys -import argparse - -current_file_path = os.path.abspath(__file__) -current_file_real_path = os.path.realpath(current_file_path) - - -current_dir = os.path.dirname(current_file_real_path) -parent_dir = os.path.dirname(current_dir) - -sys.path.append(current_dir) -sys.path.append(parent_dir) -print(sys.path) - -from ppc_scheduler.endpoints.restx import api -from ppc_scheduler.endpoints.job_controller import ns as job_namespace -from ppc_common.ppc_async_executor.thread_event_manager import ThreadEventManager -from ppc_scheduler.job.job_manager import JobManager -from ppc_scheduler.workflow.scheduler.scheduler import Scheduler -from ppc_scheduler.common.global_job_manager import update_job_manager -from ppc_scheduler.common.global_context import components -from ppc_scheduler.workflow.scheduler.scheduler_api import SchedulerApi -from paste.translogger import TransLogger -from flask import Flask, Blueprint -from cheroot.wsgi import Server as WSGIServer - -app = Flask(__name__) - -def init_thread_event_manager(): - thread_event_manager = ThreadEventManager() - return thread_event_manager - -def init_scheduler(config_data, workspace: str, logger): - scheduler_api = Scheduler(workspace, logger=logger) - return scheduler_api - -def init_job_manager(config_data, workspace: str, thread_event_manager: ThreadEventManager, scheduler: SchedulerApi, logger): - - job_timeout_h = config_data['JOB_TIMEOUT_H'] - - job_manager = JobManager( - logger=logger, - scheduler=scheduler, - thread_event_manager=thread_event_manager, - workspace=workspace, - job_timeout_h=job_timeout_h - ) - - logger.info("Initialize job manager, job_timeout_h: %s", job_timeout_h) - - return job_manager - -def initialize_app(app, config_path, log_config_path): - # init log first - components.init_log(log_config_path=log_config_path) - logger = components.logger() - - # init config - config_data = components.init_config(config_path=config_path) - # workspaces - workspace = config_data['WORKSPACE'] - logger.info(f" ==> Initialize workspace: {workspace}") - - # event manager - thread_event_manager = init_thread_event_manager() - - # scheduler - scheduler = init_scheduler(config_data=config_data, workspace=workspace, logger=logger) - - # job manager - job_manager = init_job_manager(config_data=config_data, workspace=workspace, thread_event_manager=thread_event_manager, scheduler=scheduler, logger=logger) - - update_job_manager(job_manager) - - # initialize application components - components.init_all(config_data=config_data) - components.update_thread_event_manager(thread_event_manager) - - app.config.update(config_data) - - blueprint = Blueprint('api', __name__, url_prefix='/api') - api.init_app(blueprint) - api.add_namespace(job_namespace) - app.register_blueprint(blueprint) - - components.logger().info(app.url_map) - -def main(config_path, log_config_path): - - print(f"Using config: {config_path}") - print(f"Using logging config: {log_config_path}") - - initialize_app(app, config_path, log_config_path) - - app.config['SECRET_KEY'] = os.urandom(24) - - listen_ip = app.config['HTTP_HOST'] - listen_port = app.config['HTTP_PORT'] - thread_num = app.config['HTTP_THREAD_NUM'] - - server = WSGIServer((listen_ip, listen_port), - TransLogger(app, setup_console_handler=False), numthreads=thread_num) - - protocol = 'http' - message = f"Starting wedpr scheduler server at {protocol}://{listen_ip}:{listen_port}" - print(message) - components.logger().info(message) - server.start() - -if __name__ == "__main__": - # Create ArgumentParser - parser = argparse.ArgumentParser(description='wedpr scheduler') - # Add argument - parser.add_argument('--config', default='./conf/application.yml', - help='Path to the configuration file') - parser.add_argument('--log_config', default='./conf/logging.conf', - help='Path to the logging configuration file') - # Parser argument - args = parser.parse_args() - - # Run main program - main(config_path=args.config, log_config_path=args.log_config) diff --git a/python/ppc_scheduler/scripts/start.sh b/python/ppc_scheduler/scripts/start.sh deleted file mode 100755 index 0dbd65b6..00000000 --- a/python/ppc_scheduler/scripts/start.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env bash - -dirpath="$(cd "$(dirname "$0")" && pwd)" -cd $dirpath - -LANG=zh_CN.UTF-8 -############################################################################## -## -## Wedpr scheduler service start up script for UN*X -## -############################################################################## - -# @function: output log with red color (error log) -# @param: content: error message - -function LOG_ERROR() { - local content=${1} - echo -e "\033[31m"${content}"\033[0m" -} - -# @function: output information log -# @param: content: information message -function LOG_INFO() { - local content=${1} - echo -e "\033[32m"${content}"\033[0m" -} - -function Usage() { - LOG_INFO "Usage:start the console" - LOG_INFO "./start_scheduler.sh" -} - -function check_python() { - # check python - if ! command -v python >/dev/null 2>&1; then - echo "Python is not installed." - exit 1 - fi - - # check python version, 3.9 support - python_version=$(python --version) - if [[ $python_version =~ Python\ 3.9.* ]]; then - echo "Python version is 3.9, as required." - else - echo "Python version is not 3.9. Installed version is: $python_version" - exit 2 - fi -} - -# source venv/bin/activate - -pid=$(ps aux | grep $dirpath/ppc_scheduler/ppc_scheduler_app.py | grep -v grep | awk '{print $2}') -if [ ! -z ${pid} ]; then - echo " the scheduler has started, pid is ${pid}." - exit 0 -fi - -nohup python $dirpath/ppc_scheduler/ppc_scheduler_app.py --config $dirpath/conf/application.yml --log_config $dirpath/conf/logging.conf >/dev/null 2>&1 & - -sleep 3 - -pid=$(ps aux | grep $dirpath/ppc_scheduler/ppc_scheduler_app.py | grep -v grep | awk '{print $2}') -if [ ! -z ${pid} ]; then - echo " start scheduler successfully, pid is ${pid}." - exit 0 -else - echo " start scheduler failed." - exit 1 -fi diff --git a/python/ppc_scheduler/scripts/stop.sh b/python/ppc_scheduler/scripts/stop.sh deleted file mode 100755 index 686122f6..00000000 --- a/python/ppc_scheduler/scripts/stop.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env bash - -dirpath="$(cd "$(dirname "$0")" && pwd)" -cd $dirpath - -LANG=zh_CN.UTF-8 -############################################################################## -## -## Wedpr scheduler service start up script for UN*X -## -############################################################################## - -# @function: output log with red color (error log) -# @param: content: error message - -function LOG_ERROR() { - local content=${1} - echo -e "\033[31m"${content}"\033[0m" -} - -# @function: output information log -# @param: content: information message -function LOG_INFO() { - local content=${1} - echo -e "\033[32m"${content}"\033[0m" -} - -function Usage() { - LOG_INFO "Usage:start the console" - LOG_INFO "./start_scheduler.sh" -} - -pid=$(ps aux | grep $dirpath/ppc_scheduler/ppc_scheduler_app.py | grep -v grep | awk '{print $2}') - -if [ ! -z ${pid} ]; then - kill -9 ${pid} - echo " scheduler is running, pid is ${pid}, kill it." - exit 0 -else - echo " scheduler is not running." - exit 1 -fi diff --git a/python/ppc_scheduler/workflow/__init__.py b/python/ppc_scheduler/workflow/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/workflow/builder/__init__.py b/python/ppc_scheduler/workflow/builder/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/workflow/builder/flow_builder.py b/python/ppc_scheduler/workflow/builder/flow_builder.py deleted file mode 100644 index 207ff7fe..00000000 --- a/python/ppc_scheduler/workflow/builder/flow_builder.py +++ /dev/null @@ -1,86 +0,0 @@ -# -*- coding: utf-8 -*- -from datetime import datetime -import json -from ppc_common.db_models.job_worker_record import JobWorkerRecord -from ppc_scheduler.common.global_context import components -from ppc_scheduler.database import job_worker_mapper -from ppc_scheduler.workflow.common import flow_utils -from ppc_scheduler.workflow.common.worker_status import WorkerStatus - - -class FlowBuilder: - def __init__(self, logger): - self.logger = logger - - def build_flow_context(self, job_id, workflow_configs): - self.logger.info(f"## start build_flow_context, job_id: {job_id}") - flow_context = {} - index_type_map = {} - for workflow_config in workflow_configs: - workflow_type = workflow_config['type'] - workflow_index = workflow_config['index'] - workflow_args = workflow_config['args'] - index_type_map[workflow_index] = workflow_type - - for workflow_config in workflow_configs: - workflow_type = workflow_config['type'] - workflow_index = workflow_config['index'] - workflow_args = workflow_config['args'] - worker_id = flow_utils.cat_worker_id(job_id, workflow_index, workflow_type) - upstreams = [] - inputs_statement = [] - inputs_statement_tuple = [] - if 'upstreams' in workflow_config: - for upstream_config in workflow_config["upstreams"]: - index = upstream_config['index'] - upstream_id = flow_utils.cat_worker_id(job_id, index, index_type_map[index]) - upstreams.append(upstream_id), - if 'output_input_map' in upstream_config: - for mapping in upstream_config.get("output_input_map", []): - output_index, input_index = mapping.split(":") - inputs_statement_tuple.append((upstream_id, int(output_index), int(input_index))) - - inputs_statement_tuple.sort(key=lambda x: x[2]) - for upstream_id, output_index, _ in inputs_statement_tuple: - inputs_statement.append( - { - 'output_index': output_index, - 'upstream': upstream_id - } - ) - - worker_context = { - 'job_id': job_id, - 'worker_id': worker_id, - 'type': workflow_type, - 'status': WorkerStatus.PENDING, - 'args': workflow_args, - 'upstreams': upstreams, - 'inputs_statement': inputs_statement - } - - self.logger.debug(f"## mid build_flow_context, work_context:\n{worker_context}") - - flow_context[worker_id] = worker_context - self.logger.info(f"## end build_flow_context, flow_context:\n{flow_context}") - - return flow_context - - def save_flow_context(self, job_id, flow_context): - self.logger.info(f"## start save flow context, job_id: {job_id}") - with components.create_sql_session() as session: - for worker_id in flow_context: - worker_context = flow_context[worker_id] - insert_success = job_worker_mapper.insert_job_worker(session, worker_context) - if insert_success: - # insert - self.logger.info(f"## Save worker context successfully, job_id: {job_id}, worker_id: {worker_id}, work_context:\n{worker_context}") - continue - # worker already exist - worker_record = job_worker_mapper.query_job_worker(session,job_id, worker_id) - worker_context['status'] = worker_record.status - worker_context['args'] = json.loads(worker_record.args) - worker_context['upstreams'] = json.loads(worker_record.upstreams) - worker_context['inputs_statement'] = json.loads(worker_record.inputs_statement) - self.logger.info(f"Load worker context successfully, job_id: {job_id}, worker_id: {worker_id}, work_context:\n{worker_context}") - self.logger.info(f"## end save flow context, job_id: {job_id}") \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/common/__init__.py b/python/ppc_scheduler/workflow/common/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/workflow/common/codec.py b/python/ppc_scheduler/workflow/common/codec.py deleted file mode 100644 index 45fa3e7d..00000000 --- a/python/ppc_scheduler/workflow/common/codec.py +++ /dev/null @@ -1,19 +0,0 @@ -import json - -def deserialize_worker_outputs(outputs_str): - return json.loads(outputs_str) - -def serialize_worker_outputs(outputs): - return json.dumps(outputs) - -def deserialize_upstreams(upstreams_str): - return json.loads(upstreams_str) - -def serialize_upstreams(upstreams): - return json.dumps(upstreams) - -def deserialize_inputs_statement(inputs_statement_str): - return json.loads(inputs_statement_str) - -def serialize_inputs_statement(inputs_statement): - return json.dumps(inputs_statement) \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/common/flow_utils.py b/python/ppc_scheduler/workflow/common/flow_utils.py deleted file mode 100644 index e2486e4b..00000000 --- a/python/ppc_scheduler/workflow/common/flow_utils.py +++ /dev/null @@ -1,34 +0,0 @@ -from ppc_scheduler.workflow.common.worker_type import WorkerType - - -def cat_worker_id(job_id, index, worker_type): - return f"{job_id}_{index}_{worker_type}" - - -def success_id(job_id): - return cat_worker_id(job_id, 0, WorkerType.T_ON_SUCCESS) - - -def failure_id(job_id): - return cat_worker_id(job_id, 0, WorkerType.T_ON_FAILURE) - - -def to_origin_inputs(worker_inputs): - inputs = [] - for each in worker_inputs: - output_index = each['output_index'] - upstream_outputs = each['upstream_outputs'] - inputs.append(upstream_outputs[output_index]) - return inputs - - -def to_worker_inputs(job_workers, inputs_statement): - worker_inputs = [] - for each in inputs_statement: - output_index = each['output_index'] - upstream = each['upstream'] - worker_inputs.append({ - 'output_index': output_index, - 'upstream_outputs': job_workers[upstream] - }) - return worker_inputs diff --git a/python/ppc_scheduler/workflow/common/job_context.py b/python/ppc_scheduler/workflow/common/job_context.py deleted file mode 100644 index 387af114..00000000 --- a/python/ppc_scheduler/workflow/common/job_context.py +++ /dev/null @@ -1,74 +0,0 @@ -import os - -from ppc_scheduler.job.job_type import JobType -from ppc_scheduler.mpc_generator.generator import CodeGenerator -from ppc_scheduler.common.global_context import components - - -class JobContext: - PSI_PREPARE_FILE = "psi_inputs" - PSI_RESULT_INDEX_FILE = "psi_result_index" - PSI_RESULT_FILE = "psi_result" - MPC_PREPARE_FILE = "mpc_prepare.csv" - MPC_RESULT_FILE = "mpc_result.csv" - MPC_OUTPUT_FILE = "mpc_output.txt" - HDFS_STORAGE_PATH = "/user/ppc/" - - def __init__(self, job_id, workspace): - self.job_id = job_id - self.workspace = workspace - - self.workflow_view_path = 'workflow_view' - self.job_cache_dir = "{}{}{}".format(self.workspace, os.sep, self.job_id) - # self.dataset_file_path = "{}{}{}".format(self.job_cache_dir, os.sep, self.dataset_id) - self.psi_prepare_path = "{}{}{}".format(self.job_cache_dir, os.sep, JobContext.PSI_PREPARE_FILE) - self.psi_result_index_path = "{}{}{}".format(self.job_cache_dir, os.sep, JobContext.PSI_RESULT_INDEX_FILE) - self.psi_result_path = "{}{}{}".format(self.job_cache_dir, os.sep, JobContext.PSI_RESULT_FILE) - self.mpc_file_name = "{}.mpc".format(self.job_id) - self.mpc_model_module_name = "{}.json".format(self.job_id) - self.mpc_file_path = "{}{}{}".format(self.job_cache_dir, os.sep, self.mpc_file_name) - self.mpc_prepare_path = "{}{}{}".format(self.job_cache_dir, os.sep, JobContext.MPC_PREPARE_FILE) - self.mpc_result_path = "{}{}{}".format(self.job_cache_dir, os.sep, JobContext.MPC_RESULT_FILE) - self.mpc_output_path = "{}{}{}".format(self.job_cache_dir, os.sep, JobContext.MPC_OUTPUT_FILE) - - @staticmethod - def create_job_context(request: object, workspace: str): - job_context = JobContext(request, workspace) - - """ - request format - { - "jobId": "job_id", - "agency": "WeBank", - "workflow": [ - { - "index": 1, - "type": "WorkerType1", - "args": [ - "arg1", - "arg2" - ] - }, - { - "index": 2, - "type": "WorkerType1", - "args": [ - "arg1", - "arg2" - ], - "upstreams": [ - { - "index": 1 - } - ] - } - ] - } - """ - - job_context.request = request - job_context.job_id = request['jobId'] - job_context.agency = request['agency'] - job_context.workflow_configs = request['workflow'] - - return job_context \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/common/worker_status.py b/python/ppc_scheduler/workflow/common/worker_status.py deleted file mode 100644 index f915f473..00000000 --- a/python/ppc_scheduler/workflow/common/worker_status.py +++ /dev/null @@ -1,7 +0,0 @@ -class WorkerStatus: - PENDING = 'PENDING' - RUNNING = 'RUNNING' - SUCCESS = 'SUCCESS' - FAILURE = 'FAILURE' - TIMEOUT = 'TIMEOUT' - KILLED = 'KILLED' diff --git a/python/ppc_scheduler/workflow/common/worker_type.py b/python/ppc_scheduler/workflow/common/worker_type.py deleted file mode 100644 index 1a7314f2..00000000 --- a/python/ppc_scheduler/workflow/common/worker_type.py +++ /dev/null @@ -1,14 +0,0 @@ -class WorkerType: - # generic job worker - T_API = 'API' - T_PYTHON = 'PYTHON' - T_SHELL = 'SHELL' - - # specific job worker - T_PSI = 'PSI' - T_MPC = 'MPC' - T_MODEL = "MODEL" - - # finish job - T_ON_SUCCESS = 'T_ON_SUCCESS' - T_ON_FAILURE = 'T_ON_FAILURE' diff --git a/python/ppc_scheduler/workflow/scheduler/__init__.py b/python/ppc_scheduler/workflow/scheduler/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/workflow/scheduler/scheduler.py b/python/ppc_scheduler/workflow/scheduler/scheduler.py deleted file mode 100644 index 82f6262d..00000000 --- a/python/ppc_scheduler/workflow/scheduler/scheduler.py +++ /dev/null @@ -1,100 +0,0 @@ -import time -from prefect import Flow -from prefect.executors import LocalDaskExecutor -from prefect.triggers import all_successful, any_failed - -from ppc_scheduler.workflow.common import flow_utils -from ppc_scheduler.workflow.common.job_context import JobContext -from ppc_scheduler.workflow.common.worker_status import WorkerStatus -from ppc_scheduler.workflow.common.worker_type import WorkerType -from ppc_scheduler.workflow.builder.flow_builder import FlowBuilder -from ppc_scheduler.workflow.worker.worker_factory import WorkerFactory -from ppc_scheduler.workflow.scheduler.scheduler_api import SchedulerApi - - -class Scheduler(SchedulerApi): - def __init__(self, workspace, logger): - self.workspace = workspace - self.logger = logger - - def run(self, job_context: JobContext, flow_context: dict): - - job_id = job_context.job_id - job_flow = Flow(f"job_flow_{job_id}") - - finish_job_on_success = WorkerFactory.build_worker( - job_context, - flow_utils.success_id(job_id), - WorkerType.T_ON_SUCCESS, - None - ) - - finish_job_on_success.trigger = all_successful - finish_job_on_success.bind(worker_status=WorkerStatus.PENDING, worker_inputs=[], flow=job_flow) - job_flow.add_task(finish_job_on_success) - - # set reference task to bind job flow status - job_flow.set_reference_tasks([finish_job_on_success]) - - # create a final job worker to handle failure - finish_job_on_failure = WorkerFactory.build_worker( - job_context, - flow_utils.failure_id(job_id), - WorkerType.T_ON_FAILURE, - None - ) - - # do finish_job_on_failure while any job worker failed - finish_job_on_failure.trigger = any_failed - finish_job_on_failure.bind(worker_status=WorkerStatus.PENDING, worker_inputs=[], flow=job_flow) - job_flow.add_task(finish_job_on_failure) - - job_workers = {} - # create main job workers - for worker_id in flow_context: - worker_type = flow_context[worker_id]['type'] - worker_args = flow_context[worker_id]['args'] - - job_worker = WorkerFactory.build_worker(job_context=job_context, worker_id=worker_id, worker_type=worker_type, worker_args=worker_args) - job_flow.add_task(job_worker) - job_workers[worker_id] = job_worker - - # set upstream for final job - finish_job_on_success.set_upstream(job_worker, flow=job_flow) - finish_job_on_failure.set_upstream(job_worker, flow=job_flow) - - # customize main job workers - for worker_id in flow_context: - # set upstream - upstreams = flow_context[worker_id]['upstreams'] - status = flow_context[worker_id]['status'] - for upstream in upstreams: - if upstream not in job_workers: - raise Exception(-1, f"Not found upstream job worker : {upstream}, " - f"job_id: {job_context.job_id}") - job_workers[worker_id].set_upstream(job_workers[upstream], flow=job_flow) - - # bind worker inputs - inputs_statement = flow_context[worker_id]['inputs_statement'] - worker_inputs = flow_utils.to_worker_inputs(job_workers, inputs_statement) - job_workers[worker_id].bind(worker_status=status, - worker_inputs=worker_inputs, flow=job_flow) - - # enable parallel execution - job_flow.executor = LocalDaskExecutor() - - # - start_time = time.time() - - # run dag workflow - job_flow_state = job_flow.run() - - end_time = time.time() - - self.logger.info(f" ## Job worker result, job: {job_id}, success: {job_flow_state.is_successful()}, costs: {end_time - start_time}, flow_state: {job_flow_state}") - - # save workflow view as file - job_flow.visualize(job_flow_state, job_context.workflow_view_path, 'svg') - - if not job_flow_state.is_successful(): - raise Exception(-1, f"Job run failed, job_id: {job_id}") diff --git a/python/ppc_scheduler/workflow/scheduler/scheduler_api.py b/python/ppc_scheduler/workflow/scheduler/scheduler_api.py deleted file mode 100644 index 9ef5a25f..00000000 --- a/python/ppc_scheduler/workflow/scheduler/scheduler_api.py +++ /dev/null @@ -1,8 +0,0 @@ -from abc import ABC, abstractmethod - -from ppc_scheduler.workflow.common.job_context import JobContext - -class SchedulerApi(ABC): - @abstractmethod - def run(self, job_context: JobContext, flow_context: dict): - pass \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/worker/__init__.py b/python/ppc_scheduler/workflow/worker/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/workflow/worker/api_worker.py b/python/ppc_scheduler/workflow/worker/api_worker.py deleted file mode 100644 index 0160caf5..00000000 --- a/python/ppc_scheduler/workflow/worker/api_worker.py +++ /dev/null @@ -1,10 +0,0 @@ -from ppc_scheduler.workflow.worker.worker import Worker - - -class ApiWorker(Worker): - - def __init__(self, components, job_context, worker_id, worker_type, worker_args, *args, **kwargs): - super().__init__(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - - def engine_run(self, worker_inputs): - ... diff --git a/python/ppc_scheduler/workflow/worker/default_worker.py b/python/ppc_scheduler/workflow/worker/default_worker.py deleted file mode 100644 index 02d2cf5f..00000000 --- a/python/ppc_scheduler/workflow/worker/default_worker.py +++ /dev/null @@ -1,37 +0,0 @@ -import os -import time - -from ppc_common.ppc_utils import utils -from ppc_scheduler.common import log_utils -from ppc_scheduler.workflow.common.worker_type import WorkerType -from ppc_scheduler.workflow.worker.worker import Worker - - -class DefaultWorker(Worker): - - def __init__(self, components, job_context, worker_id, worker_type, worker_args, *args, **kwargs): - super().__init__(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - - def engine_run(self, worker_inputs): - # log_utils.upload_job_log(self.components.storage_client, self.job_context.job_id) - # self._save_workflow_view_file() - if self.worker_type == WorkerType.T_ON_FAILURE: - # notice job manager that this job has failed - raise Exception("Job run failed, job_id: " + self.job_context.job_id) - - # TODO: - # def _save_workflow_view_file(self): - # file = f"{self.job_context.workflow_view_path}.svg" - - # if not utils.file_exists(file): - # return - - # try_count = 10 - # while try_count > 0: - # if utils.file_exists(file): - # break - # time.sleep(1) - # try_count -= 1 - # self.components.storage_client.upload_file(file, - # self.job_context.job_id + os.sep + - # self.job_context.workflow_view_path) diff --git a/python/ppc_scheduler/workflow/worker/engine/__init__.py b/python/ppc_scheduler/workflow/worker/engine/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/ppc_scheduler/workflow/worker/engine/model_engine.py b/python/ppc_scheduler/workflow/worker/engine/model_engine.py deleted file mode 100644 index cd104310..00000000 --- a/python/ppc_scheduler/workflow/worker/engine/model_engine.py +++ /dev/null @@ -1,52 +0,0 @@ -import time - -from ppc_scheduler.workflow.common.job_context import JobContext -from ppc_scheduler.workflow.common.worker_type import WorkerType -from ppc_scheduler.workflow.worker.engine.work_engine import WorkerEngine - - -class ModelWorkerEngine(WorkerEngine): - def __init__(self, model_client, worker_type, worker_id, components, job_context: JobContext): - self.model_client = model_client - self.worker_type = worker_type - self.worker_id = worker_id - self.components = components - self.job_context = job_context - self.logger = self.components.logger() - - def run(self, *args) -> list: - if self.worker_type == WorkerType.T_MODEL: - pass - else: - raise ValueError(f"Unsupported worker type: {self.worker_type}") - - job_id = self.job_context.job_id - start_time = time.time() - - self.logger.info( - f"## model engine run begin, job_id={job_id}, worker_id={self.worker_id}, args: {args}") - - # send job request to model node and wait for the job to finish - self.model_client.run(*args) - - time_costs = time.time() - start_time - self.logger.info( - f"## model engine run finished, job_id={job_id}, timecost: {time_costs}s") - - # args = { - # 'job_id': job_id, - # 'task_id': task_id, - # 'task_type': 'PREPROCESSING', - # 'dataset_id': self.job_context.dataset_id, - # 'dataset_storage_path': dataset_storage_path, - # 'job_algorithm_type': self.job_context.job_type, - # 'need_run_psi': self.job_context.need_run_psi, - # 'model_dict': self.job_context.model_config_dict - # } - # self.log.info(f"start prepare_xgb, job_id: {job_id}, task_id: {task_id}, args: {args}") - # self.model_client.run(args) - # self.log.info( - # f"call compute_xgb_job service success, job: {job_id}, " - # f"task_id: {task_id}, timecost: {time.time() - start}") - - return [] diff --git a/python/ppc_scheduler/workflow/worker/engine/mpc_engine.py b/python/ppc_scheduler/workflow/worker/engine/mpc_engine.py deleted file mode 100644 index f55d091e..00000000 --- a/python/ppc_scheduler/workflow/worker/engine/mpc_engine.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -import os -from time import time - -import pandas as pd - -from ppc_common.ppc_utils import utils -from ppc_scheduler.mpc_generator.generator import CodeGenerator -from ppc_scheduler.workflow.common.job_context import JobContext -from ppc_scheduler.workflow.worker.engine.work_engine import WorkerEngine - - -class MpcWorkerEngine(WorkerEngine): - def __init__(self, mpc_client, worker_type, components, job_context: JobContext): - self.mpc_client = mpc_client - self.worker_type = worker_type - self.components = components - self.job_context = job_context - self.logger = self.components.logger() - - def run(self, *args) -> list: - - job_id = self.job_context.job_id - start_time = time.time() - - self.logger.info(f"computing mpc begin, job_id={job_id}") - - time_costs = time.time() - start_time - self.logger.info(f"computing mpc finished, job_id={job_id}, timecost: {time_costs}s") - - return [self.job_context.mpc_output_path] \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/worker/engine/psi_engine.py b/python/ppc_scheduler/workflow/worker/engine/psi_engine.py deleted file mode 100644 index b6e735cc..00000000 --- a/python/ppc_scheduler/workflow/worker/engine/psi_engine.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -import os -import time - -from ppc_common.ppc_utils import utils, common_func -from ppc_scheduler.workflow.common.job_context import JobContext -from ppc_scheduler.workflow.worker.engine.work_engine import WorkerEngine - - -class PsiWorkerEngine(WorkerEngine): - def __init__(self, psi_client, worker_id, worker_type, components, job_context: JobContext): - self.psi_client = psi_client - self.worker_id = worker_id - self.worker_type = worker_type - self.components = components - self.job_context = job_context - self.logger = self.components.logger() - - def run(self, *args) -> list: - job_id = self.job_context.job_id - start_time = time.time() - - self.logger.info(f"## psi engine run begin, job_id={job_id}, worker_id={self.worker_id}, args: {args}") - - # send job request to psi node and wait for the job to finish - self.psi_client.run(*args) - - time_costs = time.time() - start_time - self.logger.info(f"## psi engine run finished, job_id={job_id}, timecost: {time_costs}s") - - return [JobContext.HDFS_STORAGE_PATH + job_id + os.sep + self.job_context.PSI_RESULT_INDEX_FILE] \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/worker/engine/shell_engine.py b/python/ppc_scheduler/workflow/worker/engine/shell_engine.py deleted file mode 100644 index a406f8a7..00000000 --- a/python/ppc_scheduler/workflow/worker/engine/shell_engine.py +++ /dev/null @@ -1,16 +0,0 @@ -import os -from ppc_scheduler.workflow.worker.engine.work_engine import WorkerEngine -from ppc_scheduler.common.global_context import components - -class ShellEngine(WorkerEngine): - - def __init__(self, cmd: str) -> None: - super().__init__() - self.cmd = cmd - - def run(self, *args): - # print("shell engine is processing.") - components.logger().info(f"shell engine is processing, cmd: {self.cmd}") - - result = os.system(self.cmd) - return list(str(result)) \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/worker/engine/work_engine.py b/python/ppc_scheduler/workflow/worker/engine/work_engine.py deleted file mode 100644 index 6cff7e23..00000000 --- a/python/ppc_scheduler/workflow/worker/engine/work_engine.py +++ /dev/null @@ -1,6 +0,0 @@ -from abc import ABC, abstractmethod - -class WorkerEngine(ABC): - @abstractmethod - def run(self, *args): - pass \ No newline at end of file diff --git a/python/ppc_scheduler/workflow/worker/model_worker.py b/python/ppc_scheduler/workflow/worker/model_worker.py deleted file mode 100644 index 0268e705..00000000 --- a/python/ppc_scheduler/workflow/worker/model_worker.py +++ /dev/null @@ -1,19 +0,0 @@ -from ppc_scheduler.node.computing_node_client.model_node_client import ModelClient -from ppc_scheduler.workflow.worker.engine.model_engine import ModelWorkerEngine -from ppc_scheduler.workflow.worker.worker import Worker - - -class ModelWorker(Worker): - - def __init__(self, components, job_context, worker_id, worker_type, worker_args, *args, **kwargs): - super().__init__(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - - def engine_run(self, worker_inputs): - model_client_node = self.node_manager.get_node(self.worker_type) - model_client = ModelClient(self.components.logger(), model_client_node[0], model_client_node[1]) - model_engine = ModelWorkerEngine(model_client, self.worker_type, self.worker_id, self.components, self.job_context) - try: - outputs = model_engine.run(*self.worker_args) - return outputs - finally: - self.node_manager.release_node(model_client_node, self.worker_type) diff --git a/python/ppc_scheduler/workflow/worker/mpc_worker.py b/python/ppc_scheduler/workflow/worker/mpc_worker.py deleted file mode 100644 index c8fc64eb..00000000 --- a/python/ppc_scheduler/workflow/worker/mpc_worker.py +++ /dev/null @@ -1,19 +0,0 @@ -from ppc_scheduler.node.computing_node_client.mpc_node_client import MpcClient -from ppc_scheduler.workflow.worker.engine.mpc_engine import MpcWorkerEngine -from ppc_scheduler.workflow.worker.worker import Worker - - -class MpcWorker(Worker): - - def __init__(self, components, job_context, worker_id, worker_type, worker_args, *args, **kwargs): - super().__init__(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - - def engine_run(self, worker_inputs) -> list: - node_endpoint = self.node_manager.get_node(self.worker_type) - mpc_client = MpcClient(node_endpoint) - mpc_engine = MpcWorkerEngine(mpc_client, self.worker_type, self.components, self.job_context) - try: - outputs = mpc_engine.run() - return outputs - finally: - self.node_manager.release_node(node_endpoint, self.worker_type) diff --git a/python/ppc_scheduler/workflow/worker/psi_worker.py b/python/ppc_scheduler/workflow/worker/psi_worker.py deleted file mode 100644 index bf596268..00000000 --- a/python/ppc_scheduler/workflow/worker/psi_worker.py +++ /dev/null @@ -1,21 +0,0 @@ -from ppc_scheduler.node.computing_node_client.psi_client import PsiClient -from ppc_scheduler.workflow.worker.engine.psi_engine import PsiWorkerEngine -from ppc_scheduler.workflow.worker.worker import Worker - - -class PsiWorker(Worker): - - def __init__(self, components, job_context, worker_id, worker_type, worker_args, *args, **kwargs): - super().__init__(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - - def engine_run(self, worker_inputs) -> list: - psi_client_node = self.node_manager.get_node(self.worker_type) - logger = self.components.logger() - logger.info(f"## getting psi client : {psi_client_node}") - psi_client = PsiClient(logger, psi_client_node[0], psi_client_node[1]) - psi_engine = PsiWorkerEngine(psi_client, self.worker_id, self.worker_type, self.components, self.job_context) - try: - outputs = psi_engine.run(*self.worker_args) - return outputs - finally: - self.node_manager.release_node(psi_client_node, self.worker_type) diff --git a/python/ppc_scheduler/workflow/worker/python_worker.py b/python/ppc_scheduler/workflow/worker/python_worker.py deleted file mode 100644 index 9d5e295b..00000000 --- a/python/ppc_scheduler/workflow/worker/python_worker.py +++ /dev/null @@ -1,10 +0,0 @@ -from ppc_scheduler.workflow.worker.worker import Worker - - -class PythonWorker(Worker): - - def __init__(self, components, job_context, worker_id, worker_type, worker_args, *args, **kwargs): - super().__init__(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - - def engine_run(self, worker_inputs): - ... diff --git a/python/ppc_scheduler/workflow/worker/shell_worker.py b/python/ppc_scheduler/workflow/worker/shell_worker.py deleted file mode 100644 index 3317593d..00000000 --- a/python/ppc_scheduler/workflow/worker/shell_worker.py +++ /dev/null @@ -1,16 +0,0 @@ -from ppc_scheduler.workflow.worker.worker import Worker -from ppc_scheduler.workflow.worker.engine.shell_engine import ShellEngine - - -class ShellWorker(Worker): - - def __init__(self, components, job_context, worker_id, worker_type, worker_args, *args, **kwargs): - super().__init__(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - - def engine_run(self, worker_inputs): - - # self.log_worker() - - mpc_engine = ShellEngine(cmd=self.worker_args[0]) - outputs = mpc_engine.run() - return outputs diff --git a/python/ppc_scheduler/workflow/worker/worker.py b/python/ppc_scheduler/workflow/worker/worker.py deleted file mode 100644 index 290179f6..00000000 --- a/python/ppc_scheduler/workflow/worker/worker.py +++ /dev/null @@ -1,117 +0,0 @@ -import time - -from func_timeout import FunctionTimedOut -from prefect import Task -from prefect.engine import signals - -from ppc_scheduler.common import log_utils -from ppc_scheduler.database import job_worker_mapper -from ppc_scheduler.node.node_manager import ComputingNodeManager -from ppc_scheduler.workflow.common import codec, flow_utils -from ppc_scheduler.workflow.common.worker_status import WorkerStatus -from ppc_scheduler.workflow.common.worker_type import WorkerType - - -class Worker(Task): - def __init__(self, components, job_context, worker_id, worker_type, worker_args, retries=0, retry_delay_s=0, *args, **kwargs): - super().__init__(*args, **kwargs) - self.components = components - self.node_manager = ComputingNodeManager(components) - self.logger = components.logger() - self.job_context = job_context - self.worker_id = worker_id - self.worker_type = worker_type - self.worker_args = worker_args - self.retries = retries - self.retry_delay_s = retry_delay_s - - def log_worker(self): - self.components.logger().info(" ## View worker, job_id = %s, work_id = %s", self.job_context.job_id, self.worker_id) - - def engine_run(self, worker_inputs) -> list: - # this func should be implemented by subclass - ... - - def run(self, worker_status, worker_inputs): - self.components.logger().info("Worker begin run, job_id = %s, work_id = %s, work_status = %s", self.job_context.job_id, self.worker_id, worker_status) - start_time = time.time() - try: - # job is killed - if self.components.thread_event_manager.event_status(self.job_context.job_id): - self._write_failed_status(WorkerStatus.KILLED) - self.logger.warn( - f" worker was killed, job_id: {self.job_context.job_id}, worker_id: {self.worker_id}") - raise signals.FAIL(message='killed!') - - if worker_status == WorkerStatus.SUCCESS: - self.logger.info( - f" Worker has been executed successfully, job_id: {self.job_context.job_id}, worker_id: {self.worker_id}") - return self._load_output_from_db() - - inputs = [] - if self.worker_type != WorkerType.T_ON_SUCCESS \ - and self.worker_type != WorkerType.T_ON_FAILURE: - inputs = flow_utils.to_origin_inputs(worker_inputs) - - outputs = self._try_run_task(inputs) - if outputs is not None: - self._save_worker_result(outputs) - except FunctionTimedOut: - self._write_failed_status(WorkerStatus.FAILURE) - end_time = time.time() - self.logger.error( - f"[OnError] job worker was timeout, job_id: {self.job_context.job_id}, worker_id: {self.worker_id}, elapsed time: {end_time - start_time}") - raise signals.FAIL(message='timeout!') - except BaseException as be: - self._write_failed_status(WorkerStatus.FAILURE) - self.logger.error(f"[OnError] job worker failed, job_id: {self.job_context.job_id}, worker_id: {self.worker_id}") - self.logger.exception(be) - raise signals.FAIL(message='failed!') - - def _try_run_task(self, inputs): - start_time = time.time() - # self.logger.info(f"Worker try run task, job_id: {self.job_context.job_id}, worker_id: {self.worker_id}, inputs: {inputs}") - # parse inputs for worker - if self.retries: - attempt = 0 - while attempt <= self.retries: - try: - self.logger.info(log_utils.worker_start_log_info(self.worker_id)) - outputs = self.engine_run(inputs) - self.logger.info(log_utils.worker_end_log_info(self.worker_id)) - return outputs - except Exception as e: - attempt += 1 - if attempt > self.retries: - self.logger.error( - f"worker failed after {self.retries} attempts, " - f"job_id: {self.job_context.job_id}, worker_id: {self.worker_id}") - raise e - else: - self.logger.warn( - f"worker failed, attempts: {attempt}" - f"job_id: {self.job_context.job_id}, worker_id: {self.worker_id}") - self.logger.exception(e) - time.sleep(self.retry_delay_s) - else: - # outputs = self.engine_run(inputs) - self.logger.info(log_utils.worker_start_log_info(self.worker_id)) - outputs = self.engine_run(inputs) - self.logger.info(log_utils.worker_end_log_info(self.worker_id)) - end_time = time.time() - # self.logger.info(f"Worker try run task end, job_id: {self.job_context.job_id}, worker: {self.worker_id}, elapsed time: {end_time - start_time}, outputs: {outputs}") - return outputs - - def _load_output_from_db(self): - with self.components.create_sql_session() as session: - worker_record = job_worker_mapper.query_job_worker(session, self.job_context.job_id, self.worker_id) - return codec.deserialize_worker_outputs(worker_record.outputs) - - def _save_worker_result(self, outputs): - with self.components.create_sql_session() as session: - job_worker_mapper.update_job_worker(session, self.job_context.job_id, self.worker_id, - WorkerStatus.SUCCESS, outputs) - - def _write_failed_status(self, status): - with self.components.create_sql_session() as session: - job_worker_mapper.update_job_worker(session, self.job_context.job_id, self.worker_id, status, []) diff --git a/python/ppc_scheduler/workflow/worker/worker_factory.py b/python/ppc_scheduler/workflow/worker/worker_factory.py deleted file mode 100644 index c8573185..00000000 --- a/python/ppc_scheduler/workflow/worker/worker_factory.py +++ /dev/null @@ -1,34 +0,0 @@ -from ppc_common.ppc_utils.exception import PpcErrorCode, PpcException -from ppc_scheduler.common.global_context import components -from ppc_scheduler.workflow.common.worker_type import WorkerType -from ppc_scheduler.workflow.worker.api_worker import ApiWorker -from ppc_scheduler.workflow.worker.default_worker import DefaultWorker -from ppc_scheduler.workflow.worker.model_worker import ModelWorker -from ppc_scheduler.workflow.worker.mpc_worker import MpcWorker -from ppc_scheduler.workflow.worker.psi_worker import PsiWorker -from ppc_scheduler.workflow.worker.python_worker import PythonWorker -from ppc_scheduler.workflow.worker.shell_worker import ShellWorker - - -class WorkerFactory: - - @staticmethod - def build_worker(job_context, worker_id, worker_type, worker_args, *args, **kwargs): - if worker_type == WorkerType.T_API: - return ApiWorker(components, job_context, worker_id, worker_type, worker_args, *args, *kwargs) - elif worker_type == WorkerType.T_PYTHON: - return PythonWorker(components, job_context, worker_id, worker_type, worker_args, *args, *kwargs) - elif worker_type == WorkerType.T_SHELL: - return ShellWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - elif worker_type == WorkerType.T_PSI: - return PsiWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - elif worker_type == WorkerType.T_MPC: - return MpcWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - elif worker_type == WorkerType.T_MODEL: - return ModelWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - elif worker_type == WorkerType.T_ON_SUCCESS or \ - worker_type == WorkerType.T_ON_FAILURE: - return DefaultWorker(components, job_context, worker_id, worker_type, worker_args, *args, **kwargs) - else: - raise PpcException(PpcErrorCode.UNSUPPORTED_WORK_TYPE, - f"Unsupported worker type: {worker_type}") diff --git a/python/requirements.txt b/python/requirements.txt index e464227f..b92953e4 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -37,7 +37,6 @@ google~=3.0.0 paste~=3.5.0 func_timeout==4.3.0 cheroot==8.5.2 -prefect==1.4.0 gmssl~=3.2.1 readerwriterlock~=1.0.4 jsoncomment~=0.2.3