Skip to content

Commit

Permalink
add ppml and ppc-model-gateway (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull authored Aug 21, 2024
1 parent 1a2c0ec commit d95e5f3
Show file tree
Hide file tree
Showing 194 changed files with 16,071 additions and 0 deletions.
Empty file added python/__init__.py
Empty file.
Empty file added python/ppc_common/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions python/ppc_common/application-sample.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# mysql or dm
DB_TYPE: "mysql"
SQLALCHEMY_DATABASE_URI: "mysql://root:[email protected]:3306/ppc?autocommit=true&charset=utf8"
# SQLALCHEMY_DATABASE_URI: "dm+dmPython://ppcv16:[email protected]:5236"

MPC_BIT_LENGTH: [@IDC_PPCS_COMMON_MPC_BIT]
6 changes: 6 additions & 0 deletions python/ppc_common/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# mysql or dm
DB_TYPE: "mysql"
SQLALCHEMY_DATABASE_URI: "mysql://root:[email protected]:3306/ppc?autocommit=true&charset=utf8"
# SQLALCHEMY_DATABASE_URI: "dm+dmPython://ppcv16:[email protected]:5236"

MPC_BIT_LENGTH: 1000
18 changes: 18 additions & 0 deletions python/ppc_common/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
import os
import yaml


dirName, _ = os.path.split(os.path.abspath(__file__))
config_path = '{}/application.yml'.format(dirName)

CONFIG_DATA = {}


def read_config():
global CONFIG_DATA
with open(config_path, 'rb') as f:
CONFIG_DATA = yaml.safe_load(f.read())


read_config()
6 changes: 6 additions & 0 deletions python/ppc_common/db_models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from flask_sqlalchemy import SQLAlchemy


db = SQLAlchemy()

# __all__ = ['computation_provider','data_provider','job_computation_queue','job_data_queue', 'job_result']
12 changes: 12 additions & 0 deletions python/ppc_common/db_models/file_object_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ppc_common.db_models import db
from sqlalchemy import text


class FileObjectMeta(db.Model):
__tablename__ = 't_file_object'
file_path = db.Column(db.String(255), primary_key=True)
file_count = db.Column(db.Integer)
create_time = db.Column(db.TIMESTAMP(
True), nullable=False, server_default=text('NOW()'))
last_update_time = db.Column(db.TIMESTAMP(True), nullable=False, server_default=text(
'CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'))
10 changes: 10 additions & 0 deletions python/ppc_common/db_models/file_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ppc_common.db_models import db


class FilePathRecord(db.Model):
__tablename__ = 't_file_path'
path = db.Column(db.String(255), primary_key=True)
storage_type = db.Column(db.String(255))
file_id = db.Column(db.String(255))
file_hash = db.Column(db.String(255))
create_time = db.Column(db.BigInteger)
14 changes: 14 additions & 0 deletions python/ppc_common/db_models/job_unit_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

from ppc_common.db_models import db


class JobUnitRecord(db.Model):
__tablename__ = 't_job_unit'
unit_id = db.Column(db.String(100), primary_key=True)
job_id = db.Column(db.String(255), index=True)
type = db.Column(db.String(255))
status = db.Column(db.String(255), index=True)
upstream_units = db.Column(db.Text)
inputs_statement = db.Column(db.Text)
outputs = db.Column(db.Text)
update_time = db.Column(db.BigInteger)
Empty file.
85 changes: 85 additions & 0 deletions python/ppc_common/deps_services/file_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
from enum import Enum
from abc import ABC, abstractmethod


class SplitMode(Enum):
NONE = 0 # not split
SIZE = 1 # split by size
LINES = 2 # split by lines


class FileObject(ABC):
@abstractmethod
def split(self, split_mode, granularity):
"""
split large file into many small files with given granularity
"""
pass

@abstractmethod
def download(self, enforce_flush=False):
"""
download the files
"""
pass

@abstractmethod
def upload(self, split_mode, granularity):
"""
upload the files
"""
pass

@abstractmethod
def delete(self):
"""
delete the files
"""
pass

@abstractmethod
def existed(self) -> bool:
"""
check the file object exist or not
"""
pass

@abstractmethod
def rename(self, storage_path: str):
"""
rename the file object
"""
pass

@abstractmethod
def hit_local_cache(self):
"""hit the local cache or not
"""
pass

@abstractmethod
def get_local_path(self):
"""get the local path
"""
pass

@abstractmethod
def get_remote_path(self):
"""get the remote path
"""
pass

@abstractmethod
def get_data(self):
"""get data
"""
pass

@abstractmethod
def save_data(self, data, split_mode, granularity):
pass

@abstractmethod
def update_data(self, updated_data, split_mode, granularity):
pass
97 changes: 97 additions & 0 deletions python/ppc_common/deps_services/hdfs_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
from typing import AnyStr

from hdfs.client import InsecureClient
from ppc_common.ppc_utils import common_func
from ppc_common.deps_services.storage_api import StorageApi, StorageType

from ppc_common.ppc_utils import utils


class HdfsStorage(StorageApi):

DEFAULT_HDFS_USER = "ppc"
DEFAULT_HDFS_USER_PATH = "/user/"

def __init__(self, endpoint, hdfs_user, hdfs_home=None):

# self.client = Client('http://127.0.0.1:9870')
self.endpoint = endpoint
self._user = common_func.get_config_value(
"HDFS_USER", HdfsStorage.DEFAULT_HDFS_USER, hdfs_user, False)
self._hdfs_storage_path = hdfs_home
if hdfs_home is None:
self._hdfs_storage_path = os.path.join(
HdfsStorage.DEFAULT_HDFS_USER_PATH, self._user)

self.client = InsecureClient(endpoint, user=self._user)
# print(self.client.list('/'))
# print(self.client.list('/user/root/'))

def get_home_path(self):
return self._hdfs_storage_path

def storage_type(self):
return StorageType.HDFS

def download_file(self, hdfs_path, local_file_path, enable_cache=False):
# hit the cache
if enable_cache is True and utils.file_exists(local_file_path):
return
if utils.file_exists(local_file_path):
utils.delete_file(local_file_path)
local_path = os.path.dirname(local_file_path)
if len(local_path) > 0 and not os.path.exists(local_path):
os.makedirs(local_path)
self.client.download(os.path.join(self._hdfs_storage_path,
hdfs_path), local_file_path)
return

def upload_file(self, local_file_path, hdfs_path):
self.make_file_path(hdfs_path)
self.client.upload(os.path.join(self._hdfs_storage_path, hdfs_path),
local_file_path, overwrite=True)
return

def make_file_path(self, hdfs_path):
hdfs_dir = os.path.dirname(hdfs_path)
if self.client.status(os.path.join(self._hdfs_storage_path, hdfs_dir), strict=False) is None:
self.client.makedirs(os.path.join(
self._hdfs_storage_path, hdfs_dir))
return

def delete_file(self, hdfs_path):
self.client.delete(os.path.join(
self._hdfs_storage_path, hdfs_path), recursive=True)
return

def save_data(self, data: AnyStr, hdfs_path):
self.make_file_path(hdfs_path)
self.client.write(os.path.join(self._hdfs_storage_path,
hdfs_path), data, overwrite=True)
return

def get_data(self, hdfs_path) -> AnyStr:
with self.client.read(os.path.join(self._hdfs_storage_path, hdfs_path)) as reader:
content = reader.read()
return content

def mkdir(self, hdfs_dir):
self.client.makedirs(hdfs_dir)

def file_existed(self, hdfs_path):
if self.client.status(os.path.join(self._hdfs_storage_path, hdfs_path), strict=False) is None:
return False
return True

def file_rename(self, old_hdfs_path, hdfs_path):
old_path = os.path.join(self._hdfs_storage_path, old_hdfs_path)
new_path = os.path.join(self._hdfs_storage_path, hdfs_path)
# return for the file not exists
if not self.file_existed(old_path):
return
parent_path = os.path.dirname(new_path)
if len(parent_path) > 0 and not self.file_existed(parent_path):
self.mkdir(parent_path)
self.client.rename(old_path, new_path)
return
65 changes: 65 additions & 0 deletions python/ppc_common/deps_services/mysql_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
from ppc_common.deps_services.sql_storage_api import SQLStorageAPI
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from sqlalchemy import delete
from sqlalchemy import text
from contextlib import contextmanager


class MySQLStorage(SQLStorageAPI):
def __init__(self, storage_config):
self._engine_url = storage_config.engine_url
self._storage_config = storage_config
connect_args = {}
if storage_config.db_name is not None:
connect_args = {'schema': storage_config.db_name}
self._mysql_engine = create_engine(self._engine_url, pool_recycle=self._storage_config.pool_recycle,
pool_size=self._storage_config.pool_size, max_overflow=self._storage_config.max_overflow,
pool_timeout=self._storage_config.pool_timeout, connect_args=connect_args)
self._session_factory = sessionmaker(bind=self._mysql_engine)
# Note: scoped_session is threadLocal
self._session = scoped_session(self._session_factory)

@contextmanager
def _get_session(self):
session = self._session()
try:
yield session
session.commit()
except Exception:
session.rollback()
self._session.remove()
raise
finally:
session.close()

def query(self, object, condition):
"""
query according to the condition
"""
with self._get_session() as session:
return session.query(object).filter(condition)

def merge(self, record):
"""merge the given record to db
Args:
record (Any): the record should been inserted
"""
with self._get_session() as session:
session.merge(record)

def execute(self, sql: str):
text_sql = text(sql)
with self._get_session() as session:
session.execute(text_sql)

def delete(self, object, condition):
"""delete according to condition
Args:
object (Any): the object
condition (Any): the condition
"""
stmt = delete(object).where(condition)
with self._get_session() as session:
session.execute(stmt)
7 changes: 7 additions & 0 deletions python/ppc_common/deps_services/serialize_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
from enum import Enum


class SerializeType(Enum):
CSV = 'csv'
JSON = 'gain'
Loading

0 comments on commit d95e5f3

Please sign in to comment.