Skip to content

Commit

Permalink
added preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
emdeeweegio committed Mar 15, 2024
1 parent ba7d6b4 commit 2accdf5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 130 deletions.
151 changes: 22 additions & 129 deletions v6_logistic_regression_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
aggregate,
coordinate_task,
export_model,
initialize_model
initialize_model,
trash_outcomes
)

MODEL_ATTRIBUTE_KEYS = ["coef_", "intercept_", "classes_"]
MODEL_AGGREGATION_KEYS = ["coef_", "intercept_"]


@algorithm_client
def master(
client: AlgorithmClient,
Expand All @@ -31,7 +34,11 @@ def master(
classes: List[str],
max_iter: int = 15,
delta: float = 0.01,
org_ids: List[int] = None
org_ids: List[int] = None,
trash_kwargs=dict(
survival_column="Survival.time",
event_column="deadstatus.event",
threshold=730)
) -> Dict[str, Any]:
"""
Orchestrates federated logistic regression training across nodes.
Expand Down Expand Up @@ -82,7 +89,7 @@ def master(
model_attrs = export_model(global_model, MODEL_ATTRIBUTE_KEYS)
input_ = {
'method': 'logistic_regression_partial',
'kwargs': {'model_attributes': model_attrs, 'predictors': predictors, 'outcome': outcome}
'kwargs': {'model_attributes': model_attrs, 'predictors': predictors, 'outcome': outcome, 'trash_kwargs': trash_kwargs}
}
partial_results = coordinate_task(client, input_, ids)

Expand All @@ -104,14 +111,14 @@ def master(
'iteration': iteration
}

def compute_global_loss(client, model, predictors, outcome, ids):
def compute_global_loss(client, model, predictors, outcome, ids, trash_kwargs):
"""
Helper function to compute global loss, abstracting detailed logging.
"""
model_attributes = export_model(model, MODEL_ATTRIBUTE_KEYS)
input_ = {
'method': 'compute_loss_partial',
'kwargs': {'model_attributes': model_attributes, 'predictors': predictors, 'outcome': outcome}
'kwargs': {'model_attributes': model_attributes, 'predictors': predictors, 'outcome': outcome, 'trash_kwargs': trash_kwargs}
}
results = coordinate_task(client, input_, ids)
aggregated_sample_size = np.sum([res['size'] for res in results])
Expand All @@ -120,134 +127,13 @@ def compute_global_loss(client, model, predictors, outcome, ids):
return new_loss


@algorithm_client
def master(
client: AlgorithmClient,
predictors: List[str],
outcome: str,
classes: List[str],
max_iter: int = 15,
delta: float = 0.01,
org_ids: List[int] = None
) -> Dict[str, any]:
"""
Coordinates federated logistic regression training across nodes.
Parameters
----------
client : AlgorithmClient
Vantage6 user or mock client.
predictors : List[str]
Columns to be used as predictors.
outcome : str
Column to be used as target variable.
classes : List[str]
List of class labels.
max_iter : int, optional
Maximum number of iterations for convergence.
delta : float, optional
Convergence threshold based on loss difference.
org_ids : List[int], optional
Specific organization IDs to involve in computation.
Returns
-------
Dict[str, any]
Aggregated model attributes, last loss value, and number of iterations performed.
"""

# Get all organization ids that are within the collaboration or
# use the provided ones
info('Collecting the identification of the participating organizations')
organizations = client.organization.list()
ids = [organization.get('id') for organization in organizations
if not org_ids or organization.get('id') in org_ids]

# Initialise the weights for the logistic regression
info('Initializing logistic regression estimator')
model_initial_attributes = dict(
classes_ =np.array(classes),
coef_ =np.zeros((1, len(predictors))),
intercept_=np.zeros((1,))
)
global_model = initialize_model(LogisticRegression, model_initial_attributes)
model_attributes = export_model(global_model, attribute_keys=MODEL_ATTRIBUTE_KEYS)
info(model_attributes)

# The next steps are run until the maximum number of iterations or
# convergence is reached
iteration = 0
loss = None
loss_diff = 2*delta
while (iteration < max_iter) and (loss_diff > delta):
# The input for the partial algorithm
info(f'######## ITERATION #{iteration} #########')
input_ = {
'method': 'logistic_regression_partial',
'kwargs': {
'model_attributes': model_attributes,
'predictors': predictors,
'outcome': outcome
}
}

# Send partial task and collect results
results = coordinate_task(client, input_, ids)
info(f'Results before aggregation: {results}')

# Aggregate the results
info("Aggregating partial modeling results")
global_model = aggregate(global_model, results=results, aggregation_keys=MODEL_AGGREGATION_KEYS)
info("Exporting global model")
global_model_attributes = export_model(model=global_model, attribute_keys=MODEL_ATTRIBUTE_KEYS)

# The input for the partial algorithm that computes the loss
info('Computing local losses')
input_ = {
'method': 'compute_loss_partial',
'kwargs': {
'model_attributes': global_model_attributes,
'predictors': predictors,
'outcome': outcome
}
}

# Send partial task and collect results
results = coordinate_task(client, input_, ids)
info(f'Results: {results}')

# Aggregating local losses into a global loss
info('Run global averaging for losses')
new_loss = np.sum([
result['loss']*result['size'] for result in results
]) / np.sum([
result['size'] for result in results
])

# Check convergence: we assume convergence when the difference in the
# global loss between iterations gets below a certain threshold,
# the difference is set to a value greater than delta in iteration zero
info('Checking convergence')
loss_diff = np.abs(loss - new_loss) if iteration != 0 else 2*delta
loss = new_loss
info(f'Difference is loss = {loss_diff}')

# Update iterations counter
iteration += 1

return {
'model_attributes': global_model_attributes,
'loss': loss,
'iteration': iteration
}


@data(1)
def logistic_regression_partial(
df: pd.DataFrame,
model_attributes: Dict[str, List[float]],
predictors: List[str],
outcome: str
outcome: str,
trash_kwargs: dict
) -> Dict[str, any]:
"""
Fits logistic regression model on local dataset.
Expand All @@ -271,6 +157,9 @@ def logistic_regression_partial(
# Drop rows with NaNs
df = df.dropna(how='any')

# REMOVE
df = trash_outcomes(df, outcome, **trash_kwargs)

# Get features and outcomes
X = df[predictors].values
y = df[outcome].values
Expand Down Expand Up @@ -301,7 +190,8 @@ def compute_loss_partial(
df: pd.DataFrame,
model_attributes: Dict[str, list],
predictors: List[str],
outcome: str
outcome: str,
trash_kwargs: dict
) -> Dict[str, Any]:
"""
Computes logistic regression model loss on local dataset.
Expand All @@ -325,6 +215,9 @@ def compute_loss_partial(
# Drop rows with NaNs
df = df.dropna(how='any')

# REMOVE
df = trash_outcomes(df, outcome, **trash_kwargs)

# Get features and outcomes
X = df[predictors].values
y = df[outcome].values
Expand Down
11 changes: 10 additions & 1 deletion v6_logistic_regression_py/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,13 @@ def to_json_serializable(item: Union[np.ndarray, dict, Any]) -> Union[list, dict
return item.tolist()
if isinstance(item, dict):
return {key: to_json_serializable(value) for key, value in item.items()}
return item
return item

def trash_outcomes(
df,
outcome,
survival_column="Survival.time",
event_column="deadstatus.event",
threshold=730):
df[outcome] = (df[survival_column] <= threshold).astype(int)
return df

0 comments on commit 2accdf5

Please sign in to comment.