Skip to content

Commit

Permalink
rename ppc_dev to wedpr-ml-toolkit (#55)
Browse files Browse the repository at this point in the history
* update secure lr

* update model and predict

* update ppc_dev

* update model setting

* Update booster.py

* update wedpr_ml_toolkit
  • Loading branch information
yanxinyi620 authored Oct 16, 2024
1 parent abf582f commit 456e83c
Show file tree
Hide file tree
Showing 23 changed files with 197 additions and 149 deletions.
53 changes: 0 additions & 53 deletions python/ppc_dev/job_exceuter/hdfs_client.py

This file was deleted.

35 changes: 0 additions & 35 deletions python/ppc_dev/wedpr_data/data_context.py

This file was deleted.

File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ def __init__(self, project_id, user_name, pws_endpoint=None, hdfs_endpoint=None,
self.pws_endpoint = pws_endpoint
self.hdfs_endpoint = hdfs_endpoint
self.token = token
self.workspace = os.path.join(self.project_id, self.user_name)
self.workspace = './milestone2'
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ppc_dev.common.base_context import BaseContext
from wedpr_ml_toolkit.common.base_context import BaseContext


class BaseResult:
Expand Down
File renamed without changes.
47 changes: 47 additions & 0 deletions python/wedpr_ml_toolkit/job_exceuter/hdfs_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pandas as pd
import io

from ppc_common.deps_services import storage_loader


class HDFSApi:
def __init__(self, hdfs_endpoint):
self.hdfs_endpoint = hdfs_endpoint

config_data = {}
config_data['STORAGE_TYPE'] = 'HDFS'
config_data['HDFS_URL'] = self.hdfs_endpoint
config_data['HDFS_ENDPOINT'] = self.hdfs_endpoint
self.storage_client = storage_loader.load(config_data, logger=None)

def upload(self, dataframe, hdfs_path):
"""
上传Pandas DataFrame到HDFS
:param dataframe: 要上传的Pandas DataFrame
:param hdfs_path: HDFS目标路径
:return: 响应信息
"""
# 将DataFrame转换为CSV格式
csv_buffer = io.StringIO()
dataframe.to_csv(csv_buffer, index=False)
self.storage_client.save_data(csv_buffer.getvalue(), hdfs_path)
return

def download(self, hdfs_path):
"""
从HDFS下载数据并返回为Pandas DataFrame
:param hdfs_path: HDFS文件路径
:return: Pandas DataFrame
"""
content = self.storage_client.get_data(hdfs_path)
dataframe = pd.read_csv(io.BytesIO(content))
return dataframe

def download_byte(self, hdfs_path):
"""
从HDFS下载数据
:param hdfs_path: HDFS文件路径
:return: text
"""
content = self.storage_client.get_data(hdfs_path)
return content
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
import random
import time
import requests

from ppc_common.ppc_utils import http_utils
from ppc_common.ppc_utils.exception import PpcException, PpcErrorCode


PWS_URL = '/api/wedpr/v3/project/submitJob'


class PWSApi:
def __init__(self, endpoint, token,
polling_interval_s: int = 5, max_retries: int = 5, retry_delay_s: int = 5):
self.endpoint = endpoint
self.pws_url = endpoint + PWS_URL
self.token = token
self.polling_interval_s = polling_interval_s
self.max_retries = max_retries
self.retry_delay_s = retry_delay_s
self._async_run_task_method = 'asyncRunTask'
self._get_task_status_method = 'getTaskStatus'
self._completed_status = 'COMPLETED'
self._failed_status = 'FAILED'

def run(self, datasets, params):
params = {
'jsonrpc': '1',
'method': self._async_run_task_method,
'token': self.token,
'id': random.randint(1, 65535),
'dataset': datasets,
'params': params
def run(self, params):

headers = {
"Authorization": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MzEzMTUwMTksInVzZXIiOiJ7XCJ1c2VybmFtZVwiOlwiZmx5aHVhbmcxXCIsXCJncm91cEluZm9zXCI6W3tcImdyb3VwSWRcIjpcIjEwMDAwMDAwMDAwMDAwMDBcIixcImdyb3VwTmFtZVwiOlwi5Yid5aeL55So5oi357uEXCIsXCJncm91cEFkbWluTmFtZVwiOlwiYWRtaW5cIn1dLFwicm9sZU5hbWVcIjpcIm9yaWdpbmFsX3VzZXJcIixcInBlcm1pc3Npb25zXCI6bnVsbCxcImFjY2Vzc0tleUlEXCI6bnVsbCxcImFkbWluXCI6ZmFsc2V9In0.1jZFOVbiISzCvvE9SOsTx0IWb0-OQc3o3rJgCu9GM9A",
"content-type": "application/json"
}
response = self._send_request_with_retry(http_utils.send_post_request, self.endpoint, None, params)

payload = {
"job": {
"jobType": params['jobType'],
"projectName": params['projectName'],
"param": params['param']
},
"taskParties": params['taskParties'],
"datasetList": params['datasetList']
}

response = requests.request("POST", self.pws_url, json=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"创建任务失败: {response.json()}")
return self._poll_task_status(response.job_id, self.token)
return
# return self._poll_task_status(response.data, self.token)

def _poll_task_status(self, job_id, token):
while True:
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from ppc_dev.wedpr_data.data_context import DataContext
from ppc_dev.common.base_result import BaseResult
from wedpr_ml_toolkit.wedpr_data.data_context import DataContext
from wedpr_ml_toolkit.common.base_result import BaseResult


class FeResult(BaseResult):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from ppc_common.ppc_utils import utils

from ppc_dev.wedpr_data.data_context import DataContext
from ppc_dev.common.base_result import BaseResult
from ppc_dev.job_exceuter.hdfs_client import HDFSApi
from wedpr_ml_toolkit.wedpr_data.data_context import DataContext
from wedpr_ml_toolkit.common.base_result import BaseResult
from wedpr_ml_toolkit.job_exceuter.hdfs_client import HDFSApi


class ModelResult(BaseResult):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from ppc_dev.wedpr_data.data_context import DataContext
from ppc_dev.common.base_result import BaseResult
from wedpr_ml_toolkit.wedpr_data.data_context import DataContext
from wedpr_ml_toolkit.common.base_result import BaseResult


class PSIResult(BaseResult):
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@
import pandas as pd
from sklearn import metrics

from ppc_dev.common.base_context import BaseContext
from ppc_dev.utils.agency import Agency
from ppc_dev.wedpr_data.wedpr_data import WedprData
from ppc_dev.wedpr_data.data_context import DataContext
from ppc_dev.wedpr_session.wedpr_session import WedprSession
from wedpr_ml_toolkit.common.base_context import BaseContext
from wedpr_ml_toolkit.utils.agency import Agency
from wedpr_ml_toolkit.wedpr_data.wedpr_data import WedprData
from wedpr_ml_toolkit.wedpr_data.data_context import DataContext
from wedpr_ml_toolkit.wedpr_session.wedpr_session import WedprSession


# 从jupyter环境中获取project_id等信息
# create workspace
# 相同项目/刷新专家模式project_id固定
project_id = 'p-123'
user = 'admin'
my_agency='WeBank'
pws_endpoint = '0.0.0.0:0000'
hdfs_endpoint = '0.0.0.0:0001'
user = 'flyhuang1'
my_agency='sgd'
pws_endpoint = 'http://139.159.202.235:8005' # http
hdfs_endpoint = 'http://192.168.0.18:50070' # client
token = 'abc...'


# 自定义合作方机构
partner_agency1='SG'
partner_agency1='webank'
partner_agency2='TX'

# 初始化project ctx 信息
Expand All @@ -36,23 +36,28 @@
# pd.Dataframe
df = pd.DataFrame({
'id': np.arange(0, 100), # id列,顺序整数
'y': np.random.randint(0, 2, size=100),
**{f'x{i}': np.random.rand(100) for i in range(1, 11)} # x1到x10列,随机数
})
dataset1 = WedprData(ctx, values=df, agency=agency1)

dataset1 = WedprData(ctx, values=df, agency=agency1, is_label_holder=True)
dataset1.storage_client = None
dataset1.save_values(path='./project_id/user/data/d-101')
dataset1.save_values(path='d-101') # './milestone2\\sgd\\flyhuang1\\share\\d-101'

# hdfs_path
dataset2 = WedprData(ctx, dataset_path='./data_path/d-123', agency=agency2, is_label_holder=True)
dataset2 = WedprData(ctx, dataset_path='/user/ppc/milestone2/webank/flyhuang/d-9606695119693829', agency=agency2)
dataset2.storage_client = None
dataset2.load_values()

# 支持更新dataset的values数据
df2 = pd.DataFrame({
'id': np.arange(0, 100), # id列,顺序整数
'y': np.random.randint(0, 2, size=100),
**{f'x{i}': np.random.rand(100) for i in range(1, 11)} # x1到x10列,随机数
})
dataset2.update_values(values=df2)
# dataset2.load_values()
if dataset2.storage_client is None:
# 支持更新dataset的values数据
df2 = pd.DataFrame({
'id': np.arange(0, 100), # id列,顺序整数
**{f'z{i}': np.random.rand(100) for i in range(1, 11)} # x1到x10列,随机数
})
dataset2.update_values(values=df2)
if dataset1.storage_client is not None:
dataset1.update_values(path='/user/ppc/milestone2/sgd/flyhuang1/d-9606704699156485')
dataset1.load_values()

# 构建 dataset context
dataset = DataContext(dataset1, dataset2)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
46 changes: 46 additions & 0 deletions python/wedpr_ml_toolkit/wedpr_data/data_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os

from wedpr_ml_toolkit.utils import utils


class DataContext:

def __init__(self, *datasets):
self.datasets = list(datasets)
self.ctx = self.datasets[0].ctx

self._check_datasets()

def _save_dataset(self, dataset):
if dataset.dataset_path is None:
dataset.dataset_id = utils.make_id(utils.IdPrefixEnum.DATASET.value)
dataset.dataset_path = os.path.join(dataset.storage_workspace, dataset.dataset_id)
if dataset.storage_client is not None:
dataset.storage_client.upload(dataset.values, dataset.dataset_path)

def _check_datasets(self):
for dataset in self.datasets:
self._save_dataset(dataset)

def to_psi_format(self, merge_filed, result_receiver_id_list):
dataset_psi = []
for dataset in self.datasets:
if dataset.agency.agency_id in result_receiver_id_list:
result_receiver = "true"
else:
result_receiver = "false"
dataset_psi_info = {"idFields": [merge_filed],
"dataset": {"owner": dataset.ctx.user_name,
"ownerAgency": dataset.agency.agency_id,
"path": dataset.dataset_path,
"storageTypeStr": "HDFS",
"datasetID": dataset.dataset_id},
"receiveResult": result_receiver}
dataset_psi.append(dataset_psi_info)
return dataset_psi

def to_model_formort(self):
dataset_model = []
for dataset in self.datasets:
dataset_model.append(dataset.dataset_path)
return dataset_model
Loading

0 comments on commit 456e83c

Please sign in to comment.