From ca55a8b4ee1829f54dfe9100852ac79f79d652cf Mon Sep 17 00:00:00 2001 From: ivanzhovannik Date: Tue, 12 Mar 2024 17:28:13 +0100 Subject: [PATCH 1/8] applied initial version4 syntax --- v6_logistic_regression_py/__init__.py | 48 ++++++++++++++++++--------- v6_logistic_regression_py/example.py | 27 ++++++++------- v6_logistic_regression_py/helper.py | 9 ++--- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/v6_logistic_regression_py/__init__.py b/v6_logistic_regression_py/__init__.py index db7e83d..428b31d 100644 --- a/v6_logistic_regression_py/__init__.py +++ b/v6_logistic_regression_py/__init__.py @@ -12,18 +12,25 @@ from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss from sklearn.metrics import confusion_matrix -from vantage6.tools.util import info +from typing import List +from vantage6.algorithm.client import AlgorithmClient +from vantage6.algorithm.tools.util import info +from vantage6.algorithm.tools.decorators import algorithm_client, data from v6_logistic_regression_py.helper import coordinate_task from v6_logistic_regression_py.helper import set_initial_params from v6_logistic_regression_py.helper import set_model_params from v6_logistic_regression_py.helper import get_model_parameters - +@algorithm_client def master( - client, data: pd.DataFrame, predictors: list, outcome: str, - classes: list, max_iter: int = 15, delta: float = 0.01, - org_ids: list = None + client: AlgorithmClient, + predictors: List[str], + outcome: str, + classes: list, + max_iter: int = 15, + delta: float = 0.01, + org_ids: List[int] = None ) -> dict: """ Master algorithm that coordinates the tasks and performs averaging @@ -54,8 +61,8 @@ def master( # Get all organization ids that are within the collaboration or # use the provided ones - info('Collecting participating organizations') - organizations = client.get_organizations_in_my_collaboration() + info('Collecting the identification of the participating organizations') + organizations = client.organization.list() ids = [organization.get('id') for organization in organizations if not org_ids or organization.get('id') in org_ids] @@ -150,9 +157,12 @@ def master( return result - -def RPC_logistic_regression_partial( - df: pd.DataFrame, parameters, predictors, outcome +@data(1) +def logistic_regression_partial( + df: pd.DataFrame, + parameters, + predictors, + outcome ) -> dict: """ Partial method for federated logistic regression @@ -202,8 +212,12 @@ def RPC_logistic_regression_partial( return results -def RPC_compute_loss_partial( - df: pd.DataFrame, model, predictors, outcome +@data(1) +def compute_loss_partial( + df: pd.DataFrame, + model, + predictors, + outcome ) -> dict: """ Partial method for calculation of loss @@ -242,9 +256,13 @@ def RPC_compute_loss_partial( return results -def RPC_run_validation( - df: pd.DataFrame, parameters: list, classes: list, - predictors: list, outcome: str +@data(1) +def run_validation( + df: pd.DataFrame, + parameters: list, + classes: list, + predictors: list, + outcome: str ) -> dict: """ Method for running model validation diff --git a/v6_logistic_regression_py/example.py b/v6_logistic_regression_py/example.py index 0f3bb84..ca5cd98 100644 --- a/v6_logistic_regression_py/example.py +++ b/v6_logistic_regression_py/example.py @@ -3,28 +3,27 @@ """ Sample code to test the federated algorithm with a mock client """ import os -from vantage6.tools.mock_client import ClientMockProtocol +from pathlib import Path +from vantage6.algorithm.tools.mock_client import MockAlgorithmClient # Start mock client -data_dir = os.path.join( - os.getcwd(), 'v6_logistic_regression_py', 'local' -) -client = ClientMockProtocol( - datasets=[ - os.path.join(data_dir, 'data1.csv'), - os.path.join(data_dir, 'data2.csv') - ], +data_directory = Path('./v6_logistic_regression_py') / 'local' +dataset_1 = {"database": data_directory / "data1.csv", "db_type": "csv"} +dataset_2 = {"database": data_directory / "data2.csv", "db_type": "csv"} + +client = MockAlgorithmClient( + datasets=[[dataset_1], [dataset_2]], module='v6_logistic_regression_py' ) # Get mock organisations -organizations = client.get_organizations_in_my_collaboration() +organizations = client.organization.list() print(organizations) ids = [organization['id'] for organization in organizations] # Check master method -master_task = client.create_new_task( +master_task = client.task.create( input_={ 'master': True, 'method': 'master', @@ -37,7 +36,7 @@ 'delta': 0.0001 } }, - organization_ids=[0, 1] + organizations=[0, 1] ) results = client.get_results(master_task.get('id')) model = results[0]['model'] @@ -46,7 +45,7 @@ print(f'Number of iterations: {iteration}') # Check validation method -master_task = client.create_new_task( +master_task = client.task.create( input_={ 'master': False, 'method': 'run_validation', @@ -57,7 +56,7 @@ 'outcome': 'vital_status', } }, - organization_ids=[0] + organizations=[0] ) results = client.get_results(master_task.get('id')) accuracy = results[0]['score'] diff --git a/v6_logistic_regression_py/helper.py b/v6_logistic_regression_py/helper.py index 35d4ce5..574a51f 100644 --- a/v6_logistic_regression_py/helper.py +++ b/v6_logistic_regression_py/helper.py @@ -7,14 +7,15 @@ import numpy as np from sklearn.linear_model import LogisticRegression -from vantage6.tools.util import info +from vantage6.algorithm.client import AlgorithmClient +from vantage6.algorithm.tools.util import info from typing import Tuple, Union XY = Tuple[np.ndarray, np.ndarray] LogRegParams = Union[XY, Tuple[np.ndarray]] -def coordinate_task(client, input: dict, ids: list) -> list: +def coordinate_task(client: AlgorithmClient, input: dict, ids: list) -> list: """ Coordinate tasks to be sent to data nodes, which includes dispatching the task, waiting for results to return and collect completed results @@ -35,9 +36,9 @@ def coordinate_task(client, input: dict, ids: list) -> list: # Create a new task for the desired organizations info('Dispatching node tasks') - task = client.create_new_task( + task = client.task.create( input_=input, - organization_ids=ids + organizations=ids ) # Wait for nodes to return results From 240a9e9ca47a7807bf9402c428aa8fea1554c2a1 Mon Sep 17 00:00:00 2001 From: ivanzhovannik Date: Tue, 12 Mar 2024 18:03:13 +0100 Subject: [PATCH 2/8] added model export/import --- v6_logistic_regression_py/__init__.py | 21 +++++++++-- v6_logistic_regression_py/helper.py | 53 ++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/v6_logistic_regression_py/__init__.py b/v6_logistic_regression_py/__init__.py index 428b31d..33f4967 100644 --- a/v6_logistic_regression_py/__init__.py +++ b/v6_logistic_regression_py/__init__.py @@ -21,6 +21,9 @@ from v6_logistic_regression_py.helper import set_initial_params from v6_logistic_regression_py.helper import set_model_params from v6_logistic_regression_py.helper import get_model_parameters +from v6_logistic_regression_py.helper import init_model, export_model + +MODEL_ATTRIBUTE_KEYS = ["coef_", "intercept_", ] @algorithm_client def master( @@ -202,6 +205,8 @@ def logistic_regression_partial( warnings.simplefilter('ignore') model.fit(X, y) info('Training round finished') + + model_dict = export_model(model, attribute_keys=[]) # Results results = { @@ -292,10 +297,18 @@ def run_validation( y = df[outcome].values # Logistic regression model - model = LogisticRegression() - model.coef_ = np.array(parameters[1]) - model.intercept_ = np.array(parameters[0]) - model.classes_ = np.array(classes) + model = init_model( + LogisticRegression, + model_attributes=dict( + intercept_ = np.array(parameters[0]), + coef_ = np.array(parameters[1]), + classes_ = np.array(classes) + ) + ) + # model = LogisticRegression() + # model.coef_ = np.array(parameters[1]) + # model.intercept_ = np.array(parameters[0]) + # model.classes_ = np.array(classes) # Compute model accuracy score = model.score(X, y) diff --git a/v6_logistic_regression_py/helper.py b/v6_logistic_regression_py/helper.py index 574a51f..22ab199 100644 --- a/v6_logistic_regression_py/helper.py +++ b/v6_logistic_regression_py/helper.py @@ -6,10 +6,11 @@ import numpy as np +from sklearn.base import BaseEstimator from sklearn.linear_model import LogisticRegression from vantage6.algorithm.client import AlgorithmClient from vantage6.algorithm.tools.util import info -from typing import Tuple, Union +from typing import Any, Dict, List, Tuple, Type, Union XY = Tuple[np.ndarray, np.ndarray] LogRegParams = Union[XY, Tuple[np.ndarray]] @@ -83,3 +84,53 @@ def set_model_params( if model.fit_intercept: model.intercept_ = params[1] return model + + +def init_model( + model_class: Type[BaseEstimator], + model_attributes: Dict[str, Any], + *model_init_args: Any, **model_init_kwargs: Any +) -> BaseEstimator: + """ + Initializes an instance of the provided model class with corresponding model attributes. + + Parameters + ---------- + model_class : Type[BaseEstimator] + Class of the model to be initialized. + model_attributes : Dict[str, Any] + Dictionary mapping attribute names to their corresponding values. + model_init_args : Any + Positional arguments for model class initialization. + model_init_kwargs : Any + Keyword arguments for model class initialization. + + Returns + ------- + model : BaseEstimator + The initialized model object. + """ + model = model_class(*model_init_args, **model_init_kwargs) + for key, value in model_attributes.items(): + setattr(model, key, value) + return model + + +def export_model(model: BaseEstimator, attribute_keys: List[str]) -> Dict[str, Any]: + """ + Exports model attributes given a model and list of attributes. + + Parameters + ---------- + model : BaseEstimator + An instance of the Scikit-Learn's BaseEstimator (such as LogisticRegression, SVC, etc.). + attribute_keys : List[str] + List of attribute names that are to be extracted from the model. + + Returns + ------- + attributes : Dict[str, Any] + A Dictionary mapping attribute keys to their corresponding values extracted from the model. + """ + attributes = {key: getattr(model, key) for key in attribute_keys} + return attributes \ No newline at end of file From 3a517474a6e435a110d0ee504bd52484720d886b Mon Sep 17 00:00:00 2001 From: ivanzhovannik Date: Wed, 13 Mar 2024 15:39:44 +0100 Subject: [PATCH 3/8] made everything work in version4, still need to clean up stuff --- v6_logistic_regression_py/__init__.py | 152 +++++++++++++------------- v6_logistic_regression_py/example.py | 22 ++-- v6_logistic_regression_py/helper.py | 132 ++++++++++++++-------- 3 files changed, 174 insertions(+), 132 deletions(-) diff --git a/v6_logistic_regression_py/__init__.py b/v6_logistic_regression_py/__init__.py index 33f4967..709b40b 100644 --- a/v6_logistic_regression_py/__init__.py +++ b/v6_logistic_regression_py/__init__.py @@ -12,18 +12,20 @@ from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss from sklearn.metrics import confusion_matrix -from typing import List +from typing import Dict, List from vantage6.algorithm.client import AlgorithmClient from vantage6.algorithm.tools.util import info from vantage6.algorithm.tools.decorators import algorithm_client, data from v6_logistic_regression_py.helper import coordinate_task -from v6_logistic_regression_py.helper import set_initial_params -from v6_logistic_regression_py.helper import set_model_params -from v6_logistic_regression_py.helper import get_model_parameters -from v6_logistic_regression_py.helper import init_model, export_model +from v6_logistic_regression_py.helper import ( + aggregate, + export_model, + initialize_model, +) -MODEL_ATTRIBUTE_KEYS = ["coef_", "intercept_", ] +MODEL_ATTRIBUTE_KEYS = ["coef_", "intercept_", "classes_"] +MODEL_AGGREGATION_KEYS = ["coef_", "intercept_"] @algorithm_client def master( @@ -70,10 +72,15 @@ def master( if not org_ids or organization.get('id') in org_ids] # Initialise the weights for the logistic regression - info('Initializing logistic regression weights') - model = LogisticRegression() - model = set_initial_params(model, len(predictors), classes) - parameters = get_model_parameters(model) + info('Initializing logistic regression estimator') + model_initial_attributes = dict( + classes_ =np.array(classes), + coef_ =np.zeros((1, len(predictors))), + intercept_=np.zeros((1,)) + ) + global_model = initialize_model(LogisticRegression, model_initial_attributes) + model_attributes = export_model(global_model, attribute_keys=MODEL_ATTRIBUTE_KEYS) + info(model_attributes) # The next steps are run until the maximum number of iterations or # convergence is reached @@ -82,11 +89,11 @@ def master( loss_diff = 2*delta while (iteration < max_iter) and (loss_diff > delta): # The input for the partial algorithm - info('Defining input parameters') + info(f'######## ITERATION #{iteration} #########') input_ = { 'method': 'logistic_regression_partial', 'kwargs': { - 'parameters': parameters, + 'model_attributes': model_attributes, 'predictors': predictors, 'outcome': outcome } @@ -94,35 +101,44 @@ def master( # Send partial task and collect results results = coordinate_task(client, input_, ids) - info(f'Results: {results}') - - # Average model weights with weighted average - info('Run global averaging for model weights') - coefficients = np.zeros((1, len(predictors))) - for i in range(coefficients.shape[1]): - coefficients[0, i] = np.sum([ - result['model'].coef_[0, i]*result['size'] - for result in results - ]) / np.sum([ - result['size'] for result in results - ]) - intercept = np.sum([ - result['model'].intercept_*result['size'] for result in results - ]) / np.sum([ - result['size'] for result in results - ]) - intercept = np.array([intercept]) + info(f'Results before aggregation: {results}') + + # # Reassign model parameters + # global_model = update_model(global_model, model_attributes=results['model_attributes']) + + # # Average model weights with weighted average + # info(f'Run global averaging for model weights: {results}') + # coefficients = np.zeros((1, len(predictors))) + # for i in range(coefficients.shape[1]): + # coefficients[0, i] = np.sum([ + # result['model'].coef_[0, i]*result['size'] + # for result in results + # ]) / np.sum([ + # result['size'] for result in results + # ]) + # intercept = np.sum([ + # result['model'].intercept_*result['size'] for result in results + # ]) / np.sum([ + # result['size'] for result in results + # ]) + # intercept = np.array([intercept]) # Re-define the global parameters - parameters = (coefficients, intercept) - model = set_model_params(model, parameters) + # parameters = (coefficients, intercept) + # model = set_model_params(model, parameters) + + # Aggregate the results + info("Aggregating partial modeling results") + global_model = aggregate(global_model, results=results, aggregation_keys=MODEL_AGGREGATION_KEYS) + info("Exporting global model") + global_model_attributes = export_model(model=global_model, attribute_keys=MODEL_ATTRIBUTE_KEYS) # The input for the partial algorithm that computes the loss info('Computing local losses') input_ = { 'method': 'compute_loss_partial', 'kwargs': { - 'model': model, + 'model_attributes': global_model_attributes, 'predictors': predictors, 'outcome': outcome } @@ -151,19 +167,17 @@ def master( # Update iterations counter iteration += 1 - # Final result - result = { - 'model': model, + return { + 'model_attributes': global_model_attributes, 'loss': loss, 'iteration': iteration } - return result @data(1) def logistic_regression_partial( df: pd.DataFrame, - parameters, + model_attributes: Dict[str, List[float]], predictors, outcome ) -> dict: @@ -173,7 +187,7 @@ def logistic_regression_partial( ---------- df DataFrame with input data - parameters + model_attributes Model weigths of logistic regression predictors List with columns to be used as predictors @@ -192,35 +206,32 @@ def logistic_regression_partial( X = df[predictors].values y = df[outcome].values - # Create LogisticRegression Model - model = LogisticRegression( + # Create local LogisticRegression estimator object + model_kwargs = dict( max_iter=1, # local epoch warm_start=True, # prevent refreshing weights when fitting ) - - # Fitting local model - model = set_model_params(model, parameters) + model = initialize_model(LogisticRegression, model_attributes=model_attributes, **model_kwargs) + # Ignore convergence failure due to low local epochs with warnings.catch_warnings(): warnings.simplefilter('ignore') model.fit(X, y) info('Training round finished') - model_dict = export_model(model, attribute_keys=[]) + model_attributes = export_model(model, attribute_keys=MODEL_ATTRIBUTE_KEYS) + info(f'MODEL ATTRIBUTES: {model_attributes}') - # Results - results = { - 'model': model, + return { + 'model_attributes': model_attributes, 'size': X.shape[0] } - return results - @data(1) def compute_loss_partial( df: pd.DataFrame, - model, + model_attributes: Dict[str, list], predictors, outcome ) -> dict: @@ -230,8 +241,8 @@ def compute_loss_partial( ---------- df DataFrame with input data - model - Logistic regression model object + model_attributes + Serializable model parameters in Dict[str, list] format predictors List with columns to be used as predictors outcome @@ -249,17 +260,17 @@ def compute_loss_partial( X = df[predictors].values y = df[outcome].values + # Initialize local model instance + model = initialize_model(LogisticRegression, model_attributes) + # Compute loss loss = log_loss(y, model.predict_proba(X)) - # Results - results = { + return { 'loss': loss, 'size': X.shape[0] } - return results - @data(1) def run_validation( @@ -296,32 +307,23 @@ def run_validation( X = df[predictors].values y = df[outcome].values - # Logistic regression model - model = init_model( - LogisticRegression, - model_attributes=dict( + # Initialize LogisticRegression estimator + model_attributes=dict( intercept_ = np.array(parameters[0]), coef_ = np.array(parameters[1]), classes_ = np.array(classes) ) - ) - # model = LogisticRegression() - # model.coef_ = np.array(parameters[1]) - # model.intercept_ = np.array(parameters[0]) - # model.classes_ = np.array(classes) + model = initialize_model(LogisticRegression, model_attributes) # Compute model accuracy score = model.score(X, y) - # Confusion matrix - cm = confusion_matrix( + # Compute confusion matrix + confusion_matrix_ = confusion_matrix( y, model.predict(X), labels=model.classes_ - ) + ).tolist() - # Results - results = { + return { 'score': score, - 'confusion_matrix': cm - } - - return results + 'confusion_matrix': confusion_matrix_ + } \ No newline at end of file diff --git a/v6_logistic_regression_py/example.py b/v6_logistic_regression_py/example.py index ca5cd98..1c2f4c3 100644 --- a/v6_logistic_regression_py/example.py +++ b/v6_logistic_regression_py/example.py @@ -4,8 +4,11 @@ """ import os from pathlib import Path +from sklearn.linear_model import LogisticRegression from vantage6.algorithm.tools.mock_client import MockAlgorithmClient +from v6_logistic_regression_py.helper import initialize_model + # Start mock client data_directory = Path('./v6_logistic_regression_py') / 'local' @@ -19,7 +22,7 @@ # Get mock organisations organizations = client.organization.list() -print(organizations) +print(f"Participating organizations: {organizations}") ids = [organization['id'] for organization in organizations] # Check master method @@ -32,18 +35,19 @@ 'predictors': ['t', 'n', 'm'], 'outcome': 'vital_status', 'classes': ['alive', 'dead'], - 'max_iter': 100, + 'max_iter': 10, 'delta': 0.0001 } }, - organizations=[0, 1] + organizations=[0] ) -results = client.get_results(master_task.get('id')) -model = results[0]['model'] -iteration = results[0]['iteration'] +results = client.result.get(master_task.get('id')) +model = initialize_model(LogisticRegression, results['model_attributes'])# +iteration = results['iteration'] print(model.coef_, model.intercept_) print(f'Number of iterations: {iteration}') +print([model.intercept_.tolist(), model.coef_.tolist()]) # Check validation method master_task = client.task.create( input_={ @@ -58,8 +62,8 @@ }, organizations=[0] ) -results = client.get_results(master_task.get('id')) -accuracy = results[0]['score'] -cm = results[0]['confusion_matrix'] +results = client.result.get(master_task.get('id')) +accuracy = results['score'] +cm = results['confusion_matrix'] print(f'Accuracy: {accuracy}') print(f'Confusion matrix: {cm}') diff --git a/v6_logistic_regression_py/helper.py b/v6_logistic_regression_py/helper.py index 22ab199..ac9348f 100644 --- a/v6_logistic_regression_py/helper.py +++ b/v6_logistic_regression_py/helper.py @@ -2,21 +2,19 @@ """ Helper functions for running logistic regression """ -import time - import numpy as np from sklearn.base import BaseEstimator from sklearn.linear_model import LogisticRegression from vantage6.algorithm.client import AlgorithmClient -from vantage6.algorithm.tools.util import info +from vantage6.algorithm.tools.util import info, warn from typing import Any, Dict, List, Tuple, Type, Union XY = Tuple[np.ndarray, np.ndarray] LogRegParams = Union[XY, Tuple[np.ndarray]] -def coordinate_task(client: AlgorithmClient, input: dict, ids: list) -> list: +def coordinate_task(client: AlgorithmClient, input_: dict, ids: List[int]) -> List[Dict[str, Any]]: """ Coordinate tasks to be sent to data nodes, which includes dispatching the task, waiting for results to return and collect completed results @@ -38,55 +36,67 @@ def coordinate_task(client: AlgorithmClient, input: dict, ids: list) -> list: # Create a new task for the desired organizations info('Dispatching node tasks') task = client.task.create( - input_=input, + input_=input_, organizations=ids ) # Wait for nodes to return results info('Waiting for results') - task_id = task.get('id') - task = client.get_task(task_id) - while not task.get('complete'): - task = client.get_task(task_id) - info('Waiting for results') - time.sleep(1) - - # Collecting results - info('Obtaining results') - results = client.get_results(task_id=task.get('id')) + results = client.wait_for_results(task_id=task.get("id"), interval=1) + info(f'Results obtained for {input_["method"]}!') return results -def set_initial_params(model: LogisticRegression, ncoef, classes): - """Sets initial parameters as zeros""" - model.classes_ = np.array(classes) - model.coef_ = np.zeros((1, ncoef)) - if model.fit_intercept: - model.intercept_ = np.zeros((1,)) - return model - - -def get_model_parameters(model): - """Returns the paramters of a sklearn LogisticRegression model""" - if model.fit_intercept: - params = (model.coef_, model.intercept_) - else: - params = (model.coef_,) - return params - - -def set_model_params( - model: LogisticRegression, params: LogRegParams -) -> LogisticRegression: - """Sets the parameters of a sklean LogisticRegression model""" - model.coef_ = params[0] - if model.fit_intercept: - model.intercept_ = params[1] - return model - +def aggregate( + global_model: BaseEstimator, + results: List[Dict[str, Any]], + aggregation_keys: List[str] +) -> BaseEstimator: + """ + Aggregate local results into a global model by weighted average of parameters. -def init_model( + Parameters + ---------- + global_model : BaseEstimator + Global model instance to be updated. + results: List[Dict[str, Any]] + List of local results, each a dictionary containing model attributes and the data size. + aggregation_keys: List[str] + List of keys, which values have to be aggregated. Other keys' values will be taken from the first site + + Returns + ------- + BaseEstimator + Updated global model with averaged parameters. + """ + # Global sample size + total_data_size = sum(result['size'] for result in results) + + # Initialize dictionary to hold summed attributes using numpy arrays + summed_attributes = { + key: np.zeros_like(results[0]['model_attributes'][key]) + for key in results[0]['model_attributes'] + if key in aggregation_keys} + + for result in results: + for key, value in result['model_attributes'].items(): + if key not in aggregation_keys: continue + # Ensure value is a numpy array for vectorized operations + current_value = np.array(value) + summed_attributes[key] += current_value * result['size'] # Weight by size + info(summed_attributes) + + # Calculate the weighted average of attributes + aggregated_attributes = {key: value / total_data_size for key, value in summed_attributes.items()} + + # Update the global model with averaged attributes + global_model = update_model(global_model, aggregated_attributes) + + return global_model + + +def initialize_model( model_class: Type[BaseEstimator], model_attributes: Dict[str, Any], *model_init_args: Any, **model_init_kwargs: Any @@ -111,14 +121,15 @@ def init_model( The initialized model object. """ model = model_class(*model_init_args, **model_init_kwargs) - for key, value in model_attributes.items(): - setattr(model, key, value) + model = update_model(model, model_attributes) return model def export_model(model: BaseEstimator, attribute_keys: List[str]) -> Dict[str, Any]: """ - Exports model attributes given a model and list of attributes. + Exports model attributes given a model and list of attribute keys. + + WARNING: nested dictionaries with numpy values were NOT TESTED Parameters ---------- @@ -132,5 +143,30 @@ def export_model(model: BaseEstimator, attribute_keys: List[str]) -> Dict[str, A attributes : Dict[str, Any] A Dictionary mapping attribute keys to their corresponding values extracted from the model. """ - attributes = {key: getattr(model, key) for key in attribute_keys} - return attributes \ No newline at end of file + attributes = {key: to_json_serializable(getattr(model, key)) for key in attribute_keys} + return attributes + + +def update_model( + model: BaseEstimator, + model_attributes: Dict[str, Any] +) -> BaseEstimator: + """Updates the model's attributes, converting lists to numpy arrays where possible.""" + for key, value in model_attributes.items(): + try: + # Convert lists to numpy arrays if all elements are numeric + if isinstance(value, list): + value = np.array(value) + except ValueError: + warn(f"Could not convert {key} attribute to a numpy array.") + setattr(model, key, value) + return model + + +def to_json_serializable(item: Union[np.ndarray, dict, Any]) -> Union[list, dict, Any]: + """Convert an item to a format that is serializable to JSON.""" + if isinstance(item, np.ndarray): + return item.tolist() + if isinstance(item, dict): + return {key: to_json_serializable(value) for key, value in item.items()} + return item \ No newline at end of file From 2df570bd161c42c0f6a86fe2b8128cd9d0ed358f Mon Sep 17 00:00:00 2001 From: ivanzhovannik Date: Wed, 13 Mar 2024 15:43:17 +0100 Subject: [PATCH 4/8] harmonized styling in helper.py --- v6_logistic_regression_py/helper.py | 151 ++++++++++++---------------- 1 file changed, 66 insertions(+), 85 deletions(-) diff --git a/v6_logistic_regression_py/helper.py b/v6_logistic_regression_py/helper.py index ac9348f..2851c1a 100644 --- a/v6_logistic_regression_py/helper.py +++ b/v6_logistic_regression_py/helper.py @@ -1,46 +1,31 @@ -# -*- coding: utf-8 -*- - -""" Helper functions for running logistic regression -""" import numpy as np - from sklearn.base import BaseEstimator -from sklearn.linear_model import LogisticRegression from vantage6.algorithm.client import AlgorithmClient from vantage6.algorithm.tools.util import info, warn -from typing import Any, Dict, List, Tuple, Type, Union +from typing import Any, Dict, List, Type, Union -XY = Tuple[np.ndarray, np.ndarray] -LogRegParams = Union[XY, Tuple[np.ndarray]] - -def coordinate_task(client: AlgorithmClient, input_: dict, ids: List[int]) -> List[Dict[str, Any]]: - """ Coordinate tasks to be sent to data nodes, which includes dispatching - the task, waiting for results to return and collect completed results +def coordinate_task(client: AlgorithmClient, input_: Dict[str, Any], ids: List[int]) -> List[Dict[str, Any]]: + """ + Coordinate tasks to be sent to data nodes. Parameters ---------- - client - Vantage6 user or mock client - input - Input parameters for the task, such as the method and its arguments - ids - List with organisation ids that will receive the task + client : AlgorithmClient + Vantage6 user or mock client. + input_ : Dict[str, Any] + Input parameters for the task, such as the method and its arguments. + ids : List[int] + List with organisation ids that will receive the task. Returns ------- - results - Collected partial results from all the nodes + List[Dict[str, Any]] + Collected partial results from all the nodes. """ - - # Create a new task for the desired organizations info('Dispatching node tasks') - task = client.task.create( - input_=input_, - organizations=ids - ) + task = client.task.create(input_=input_, organizations=ids) - # Wait for nodes to return results info('Waiting for results') results = client.wait_for_results(task_id=task.get("id"), interval=1) info(f'Results obtained for {input_["method"]}!') @@ -48,11 +33,7 @@ def coordinate_task(client: AlgorithmClient, input_: dict, ids: List[int]) -> Li return results -def aggregate( - global_model: BaseEstimator, - results: List[Dict[str, Any]], - aggregation_keys: List[str] -) -> BaseEstimator: +def aggregate(global_model: BaseEstimator, results: List[Dict[str, Any]], aggregation_keys: List[str]) -> BaseEstimator: """ Aggregate local results into a global model by weighted average of parameters. @@ -60,56 +41,38 @@ def aggregate( ---------- global_model : BaseEstimator Global model instance to be updated. - results: List[Dict[str, Any]] - List of local results, each a dictionary containing model attributes and the data size. - aggregation_keys: List[str] - List of keys, which values have to be aggregated. Other keys' values will be taken from the first site - + results : List[Dict[str, Any]] + List of local results, each containing model attributes and the data size. + aggregation_keys : List[str] + Keys whose values are to be aggregated. + Returns ------- BaseEstimator Updated global model with averaged parameters. """ - # Global sample size total_data_size = sum(result['size'] for result in results) + summed_attributes = {key: np.zeros_like(results[0]['model_attributes'][key]) for key in aggregation_keys} - # Initialize dictionary to hold summed attributes using numpy arrays - summed_attributes = { - key: np.zeros_like(results[0]['model_attributes'][key]) - for key in results[0]['model_attributes'] - if key in aggregation_keys} - for result in results: for key, value in result['model_attributes'].items(): - if key not in aggregation_keys: continue - # Ensure value is a numpy array for vectorized operations - current_value = np.array(value) - summed_attributes[key] += current_value * result['size'] # Weight by size - info(summed_attributes) + if key in aggregation_keys: + summed_attributes[key] += np.array(value) * result['size'] - # Calculate the weighted average of attributes aggregated_attributes = {key: value / total_data_size for key, value in summed_attributes.items()} + return update_model(global_model, aggregated_attributes) - # Update the global model with averaged attributes - global_model = update_model(global_model, aggregated_attributes) - - return global_model - -def initialize_model( - model_class: Type[BaseEstimator], - model_attributes: Dict[str, Any], - *model_init_args: Any, **model_init_kwargs: Any -) -> BaseEstimator: +def initialize_model(model_class: Type[BaseEstimator], model_attributes: Dict[str, Any], *model_init_args: Any, **model_init_kwargs: Any) -> BaseEstimator: """ Initializes an instance of the provided model class with corresponding model attributes. Parameters ---------- model_class : Type[BaseEstimator] - Class of the model to be initialized. + Class of the model to be initialized. model_attributes : Dict[str, Any] - Dictionary mapping attribute names to their corresponding values. + Attribute names and their corresponding values. model_init_args : Any Positional arguments for model class initialization. model_init_kwargs : Any @@ -117,54 +80,72 @@ def initialize_model( Returns ------- - model : BaseEstimator + BaseEstimator The initialized model object. """ model = model_class(*model_init_args, **model_init_kwargs) - model = update_model(model, model_attributes) - return model + return update_model(model, model_attributes) def export_model(model: BaseEstimator, attribute_keys: List[str]) -> Dict[str, Any]: """ Exports model attributes given a model and list of attribute keys. - WARNING: nested dictionaries with numpy values were NOT TESTED - Parameters ---------- model : BaseEstimator - An instance of the Scikit-Learn's BaseEstimator (such as LogisticRegression, SVC, etc.). + An instance of Scikit-Learn's BaseEstimator. attribute_keys : List[str] - List of attribute names that are to be extracted from the model. + Attribute names to be extracted from the model. Returns ------- - attributes : Dict[str, Any] - A Dictionary mapping attribute keys to their corresponding values extracted from the model. + Dict[str, Any] + Dictionary of attribute keys and their values. """ - attributes = {key: to_json_serializable(getattr(model, key)) for key in attribute_keys} - return attributes + return {key: to_json_serializable(getattr(model, key)) for key in attribute_keys} + +def update_model(model: BaseEstimator, model_attributes: Dict[str, Any]) -> BaseEstimator: + """ + Updates the model's attributes, attempting to convert lists to numpy arrays. + + Parameters + ---------- + model : BaseEstimator + The model to update. + model_attributes : Dict[str, Any] + Attributes to update in the model. -def update_model( - model: BaseEstimator, - model_attributes: Dict[str, Any] -) -> BaseEstimator: - """Updates the model's attributes, converting lists to numpy arrays where possible.""" + Returns + ------- + BaseEstimator + The updated model. + """ for key, value in model_attributes.items(): - try: - # Convert lists to numpy arrays if all elements are numeric - if isinstance(value, list): + if isinstance(value, list): + try: value = np.array(value) - except ValueError: - warn(f"Could not convert {key} attribute to a numpy array.") + except ValueError: + warn(f"Conversion failed for {key}, remains a list.") setattr(model, key, value) return model def to_json_serializable(item: Union[np.ndarray, dict, Any]) -> Union[list, dict, Any]: - """Convert an item to a format that is serializable to JSON.""" + """ + Converts an item to JSON-serializable format. Numpy arrays are converted to lists. + + Parameters + ---------- + item : Union[np.ndarray, dict, Any] + The item to convert. + + Returns + ------- + Union[list, dict, Any] + The JSON-serializable representation of the item. + """ if isinstance(item, np.ndarray): return item.tolist() if isinstance(item, dict): From 557e27c323cb53b4ed19c48cc4bf30b46c13ca6e Mon Sep 17 00:00:00 2001 From: ivanzhovannik Date: Wed, 13 Mar 2024 16:22:24 +0100 Subject: [PATCH 5/8] moved global grad calvulation outta master --- v6_logistic_regression_py/__init__.py | 290 ++++++++++++++++---------- 1 file changed, 180 insertions(+), 110 deletions(-) diff --git a/v6_logistic_regression_py/__init__.py b/v6_logistic_regression_py/__init__.py index 709b40b..b3a0457 100644 --- a/v6_logistic_regression_py/__init__.py +++ b/v6_logistic_regression_py/__init__.py @@ -1,7 +1,4 @@ -# -*- coding: utf-8 -*- - -""" Federated algorithm for logistic regression -Adapted from: +""" Federated logistic regression algorithm adapted from Flower's federated scikit-learn example: https://flower.dev/blog/2021-07-21-federated-scikit-learn-using-flower/ """ import warnings @@ -12,56 +9,151 @@ from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss from sklearn.metrics import confusion_matrix -from typing import Dict, List +from typing import Any, Dict, List from vantage6.algorithm.client import AlgorithmClient from vantage6.algorithm.tools.util import info from vantage6.algorithm.tools.decorators import algorithm_client, data -from v6_logistic_regression_py.helper import coordinate_task from v6_logistic_regression_py.helper import ( aggregate, + coordinate_task, export_model, - initialize_model, + initialize_model ) MODEL_ATTRIBUTE_KEYS = ["coef_", "intercept_", "classes_"] MODEL_AGGREGATION_KEYS = ["coef_", "intercept_"] +@algorithm_client +def master( + client: AlgorithmClient, + predictors: List[str], + outcome: str, + classes: List[str], + max_iter: int = 15, + delta: float = 0.01, + org_ids: List[int] = None +) -> Dict[str, Any]: + """ + Orchestrates federated logistic regression training across nodes. + + Parameters + ---------- + client : AlgorithmClient + Vantage6 user or mock client. + predictors : List[str] + Columns to be used as predictors. + outcome : str + Column to be used as target variable. + classes : List[str] + List of class labels. + max_iter : int, optional + Maximum number of iterations for convergence. + delta : float, optional + Convergence threshold based on loss difference. + org_ids : List[int], optional + Specific organization IDs to involve in computation. + + Returns + ------- + Dict[str, any] + Aggregated model attributes, last loss value, and number of iterations performed. + """ + + # Identifying data nodes participating in the federated learning process. + info("Identifying participating organizations.") + organizations = client.organization.list() + ids = [org['id'] for org in organizations if not org_ids or org['id'] in org_ids] + + # Initializing the global logistic regression model with zero weights. + info("Initializing global logistic regression model.") + model_attrs = { + 'classes_': np.array(classes), + 'coef_': np.zeros((1, len(predictors))), + 'intercept_': np.zeros(1) + } + global_model = initialize_model(LogisticRegression, model_attrs) + + # Iteratively updating the global model based on local updates until convergence. + iteration, loss, loss_diff = 0, None, 2 * delta + while iteration < max_iter and loss_diff > delta: + info(f"Starting iteration #{iteration + 1}.") + + # Sending model training tasks to nodes and collecting updates. + model_attrs = export_model(global_model, MODEL_ATTRIBUTE_KEYS) + input_ = { + 'method': 'logistic_regression_partial', + 'kwargs': {'model_attributes': model_attrs, 'predictors': predictors, 'outcome': outcome} + } + partial_results = coordinate_task(client, input_, ids) + + # Aggregating updates into the global model and assessing convergence. + global_model = aggregate(global_model, partial_results, MODEL_AGGREGATION_KEYS) + new_loss = compute_global_loss(client, global_model, predictors, outcome, ids) + + loss_diff = abs(loss - new_loss) if loss is not None else 2 * delta + loss = new_loss + info(f"Iteration #{iteration + 1} completed. Loss difference: {loss_diff}.") + + iteration += 1 + + info("Federated training completed.") + + return { + 'model_attributes': export_model(global_model, MODEL_ATTRIBUTE_KEYS), + 'loss': loss, + 'iteration': iteration + } + +def compute_global_loss(client, model, predictors, outcome, ids): + """ + Helper function to compute global loss, abstracting detailed logging. + """ + model_attributes = export_model(model, MODEL_ATTRIBUTE_KEYS) + input_ = { + 'method': 'compute_loss_partial', + 'kwargs': {'model_attributes': model_attributes, 'predictors': predictors, 'outcome': outcome} + } + results = coordinate_task(client, input_, ids) + aggregated_sample_size = np.sum([res['size'] for res in results]) + aggregated_loss = np.sum([res['loss'] * res['size'] for res in results]) + new_loss = aggregated_loss / aggregated_sample_size + return new_loss + @algorithm_client def master( - client: AlgorithmClient, - predictors: List[str], - outcome: str, - classes: list, - max_iter: int = 15, - delta: float = 0.01, - org_ids: List[int] = None -) -> dict: - """ Master algorithm that coordinates the tasks and performs averaging + client: AlgorithmClient, + predictors: List[str], + outcome: str, + classes: List[str], + max_iter: int = 15, + delta: float = 0.01, + org_ids: List[int] = None +) -> Dict[str, any]: + """ + Coordinates federated logistic regression training across nodes. Parameters ---------- - client - Vantage6 user or mock client - data - DataFrame with the input data - predictors - List with columns to be used as predictors - outcome - Column to be used as outcome - classes - List with classes to be predicted - max_iter - Maximum number of iterations to perform - delta - Threshold for difference between losses to consider convergence - org_ids - List with organisation ids to be used + client : AlgorithmClient + Vantage6 user or mock client. + predictors : List[str] + Columns to be used as predictors. + outcome : str + Column to be used as target variable. + classes : List[str] + List of class labels. + max_iter : int, optional + Maximum number of iterations for convergence. + delta : float, optional + Convergence threshold based on loss difference. + org_ids : List[int], optional + Specific organization IDs to involve in computation. Returns ------- - results - Dictionary with the final averaged result + Dict[str, any] + Aggregated model attributes, last loss value, and number of iterations performed. """ # Get all organization ids that are within the collaboration or @@ -103,30 +195,6 @@ def master( results = coordinate_task(client, input_, ids) info(f'Results before aggregation: {results}') - # # Reassign model parameters - # global_model = update_model(global_model, model_attributes=results['model_attributes']) - - # # Average model weights with weighted average - # info(f'Run global averaging for model weights: {results}') - # coefficients = np.zeros((1, len(predictors))) - # for i in range(coefficients.shape[1]): - # coefficients[0, i] = np.sum([ - # result['model'].coef_[0, i]*result['size'] - # for result in results - # ]) / np.sum([ - # result['size'] for result in results - # ]) - # intercept = np.sum([ - # result['model'].intercept_*result['size'] for result in results - # ]) / np.sum([ - # result['size'] for result in results - # ]) - # intercept = np.array([intercept]) - - # Re-define the global parameters - # parameters = (coefficients, intercept) - # model = set_model_params(model, parameters) - # Aggregate the results info("Aggregating partial modeling results") global_model = aggregate(global_model, results=results, aggregation_keys=MODEL_AGGREGATION_KEYS) @@ -176,28 +244,29 @@ def master( @data(1) def logistic_regression_partial( - df: pd.DataFrame, - model_attributes: Dict[str, List[float]], - predictors, - outcome -) -> dict: - """ Partial method for federated logistic regression + df: pd.DataFrame, + model_attributes: Dict[str, List[float]], + predictors: List[str], + outcome: str +) -> Dict[str, any]: + """ + Fits logistic regression model on local dataset. Parameters ---------- - df - DataFrame with input data - model_attributes - Model weigths of logistic regression - predictors - List with columns to be used as predictors - outcome - Column to be used as outcome + df : pd.DataFrame + Local data frame. + model_attributes : Dict[str, List[float]] + Logistic regression model attributes (weights, intercepts). + predictors : List[str] + List of predictor variable names. + outcome : str + Outcome variable name. Returns ------- - results - Dictionary with local model + Dict[str, any] + Attributes of locally trained logistic regression model and local dataset size. """ # Drop rows with NaNs df = df.dropna(how='any') @@ -220,7 +289,6 @@ def logistic_regression_partial( info('Training round finished') model_attributes = export_model(model, attribute_keys=MODEL_ATTRIBUTE_KEYS) - info(f'MODEL ATTRIBUTES: {model_attributes}') return { 'model_attributes': model_attributes, @@ -230,28 +298,29 @@ def logistic_regression_partial( @data(1) def compute_loss_partial( - df: pd.DataFrame, - model_attributes: Dict[str, list], - predictors, - outcome -) -> dict: - """ Partial method for calculation of loss + df: pd.DataFrame, + model_attributes: Dict[str, list], + predictors: List[str], + outcome: str +) -> Dict[str, Any]: + """ + Computes logistic regression model loss on local dataset. Parameters ---------- - df - DataFrame with input data - model_attributes - Serializable model parameters in Dict[str, list] format - predictors - List with columns to be used as predictors - outcome - Column to be used as outcome + df : pd.DataFrame + Local data frame. + model_attributes : Dict[str, list] + Attributes of the logistic regression model. + predictors : List[str] + Predictor variables. + outcome : str + Outcome variable. Returns ------- - loss - Dictionary with local loss + Dict[str, Any] + Local loss and dataset size. """ # Drop rows with NaNs df = df.dropna(how='any') @@ -274,31 +343,32 @@ def compute_loss_partial( @data(1) def run_validation( - df: pd.DataFrame, - parameters: list, - classes: list, - predictors: list, - outcome: str -) -> dict: - """ Method for running model validation + df: pd.DataFrame, + parameters: List[np.ndarray], + classes: List[str], + predictors: List[str], + outcome: str +) -> Dict[str, Any]: + """ + Validates logistic regression model on local dataset. Parameters ---------- - df - DataFrame with input data - parameters - List with coefficients - classes - Classes to be predicted - predictors - List with columns to be used as predictors - outcome - Column to be used as outcome + df : pd.DataFrame + Local data frame for validation. + parameters : List[np.ndarray] + Model parameters for validation. + classes : List[str] + List of class labels. + predictors : List[str] + Predictor variables for validation. + outcome : str + Outcome variable for validation. Returns ------- - performance - Dictionary with performance metrics + Dict[str, Any] + Performance metrics including model accuracy and confusion matrix. """ # Drop rows with NaNs df = df.dropna(how='any') From 45ad34be6994efa61c43f5b79cc163bdfb4dbe7f Mon Sep 17 00:00:00 2001 From: "Ivan Ye. Zhovannik" <33182324+ivanzhovannik@users.noreply.github.com> Date: Wed, 13 Mar 2024 16:29:57 +0100 Subject: [PATCH 6/8] Updated Dockerfile for v4 --- Dockerfile | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8f47e80..f230f18 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,14 @@ # Basic python3 image as base -FROM harbor2.vantage6.ai/infrastructure/algorithm-base:3.4.2 +FROM harbor2.vantage6.ai/infrastructure/algorithm-base:4.2 +# Change this to the package name of your project. This needs to be the same +# as what you specified for the name in the `setup.py`. +ARG PKG_NAME="v6-logistic-regression-py" -# Package name -ARG PKG_NAME="v6_logistic_regression_py" - -# Install federated algorithm +# This will install your algorithm into this image. COPY . /app RUN pip install /app -# Tell docker to execute `docker_wrapper()` when the image is run. +# This will run your algorithm when the Docker container is started. ENV PKG_NAME=${PKG_NAME} -CMD python -c "from vantage6.tools.docker_wrapper import docker_wrapper; docker_wrapper('${PKG_NAME}')" +CMD python -c "from vantage6.algorithm.tools.wrap import wrap_algorithm; wrap_algorithm()" From 7dbd53541a41732fcd5c681c10df2f03f71553a7 Mon Sep 17 00:00:00 2001 From: "Ivan Ye. Zhovannik" <33182324+ivanzhovannik@users.noreply.github.com> Date: Wed, 13 Mar 2024 16:30:55 +0100 Subject: [PATCH 7/8] Updated setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e51045b..e06cb7a 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ # Setup the package setup( - name='v6_logistic_regression_py', + name='v6-logistic-regression-py', version='1.0.0', description='Vantage6 algorithm for logistic regression', long_description=long_description, From a48c7ffa9b473d2ba61be491c9b7701af1259805 Mon Sep 17 00:00:00 2001 From: ivanzhovannik Date: Wed, 13 Mar 2024 16:32:17 +0100 Subject: [PATCH 8/8] updated example.py - minor cleaning --- v6_logistic_regression_py/example.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/v6_logistic_regression_py/example.py b/v6_logistic_regression_py/example.py index 1c2f4c3..b67f49f 100644 --- a/v6_logistic_regression_py/example.py +++ b/v6_logistic_regression_py/example.py @@ -1,8 +1,5 @@ -# -*- coding: utf-8 -*- - """ Sample code to test the federated algorithm with a mock client """ -import os from pathlib import Path from sklearn.linear_model import LogisticRegression from vantage6.algorithm.tools.mock_client import MockAlgorithmClient