Skip to content

Commit

Permalink
Code cleanup and proper logging for pareto and clustering (#1180)
Browse files Browse the repository at this point in the history
* refactored pareto_optimizer code

* removed unused unit test

* Added logging for Pareto module

* cleaned up unused functions and unimplemented code

* cluster loggers

---------

Co-authored-by: Dhaval Patel <[email protected]>
  • Loading branch information
dhavalpatel624624 and Dhaval Patel authored Dec 6, 2024
1 parent 634980b commit 0b98a2f
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 639 deletions.
31 changes: 31 additions & 0 deletions python/src/robyn/common/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# pyre-strict

import logging

import pandas as pd


class RobynLogger:

@staticmethod
def log_df(
logger,
df: pd.DataFrame,
logLevel: int = logging.DEBUG,
print_head: bool = False,
):
"""
Log the shape and first few rows of a DataFrame.
Args:
df (pd.DataFrame): DataFrame to log.
name (str): Name of the DataFrame.
"""
if df is None:
logger.log(logLevel, "DataFrame is None")
return

logger.log(logLevel, f"DataFrame columns: {df.columns}")
logger.log(logLevel, f"DataFrame Shape: {df.shape}")
if print_head:
logger.log(logLevel, f"DataFrame Head: {df.head()}")
71 changes: 20 additions & 51 deletions python/src/robyn/modeling/clustering/cluster_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re
import numpy as np
import pandas as pd
from robyn.common.logger import RobynLogger
from robyn.modeling.clustering.clustering_config import ClusterBy, ClusteringConfig
from robyn.data.entities.enums import DependentVarType
from robyn.modeling.entities.clustering_results import (
Expand Down Expand Up @@ -71,7 +72,7 @@ def cluster_models(self, config: ClusteringConfig) -> ClusteredResult:
aux = self.pareto_result.media_vec_collect.columns
type_index = aux.get_loc("type")
config.all_media = aux[1:type_index].tolist()
self.logger.debug("Auto-detected media channels: %s", config.all_media)
self.logger.info("Auto-detected media channels: %s", config.all_media)

self.logger.info("Clustering by: %s", config.cluster_by)
if config.cluster_by == ClusterBy.HYPERPARAMETERS:
Expand All @@ -84,6 +85,7 @@ def cluster_models(self, config: ClusteringConfig) -> ClusteredResult:
self.logger.error(f"Invalid clustering method: {config.cluster_by}")
raise ValueError("Invalid clustering method")

RobynLogger.log_df(self.logger, df)
ignored_columns = [
"sol_id",
"mape",
Expand Down Expand Up @@ -139,6 +141,7 @@ def cluster_models(self, config: ClusteringConfig) -> ClusteredResult:
self.logger.debug("Selecting top solutions")
top_solutions = self._select_top_solutions(result.cluster_data, config)
self.logger.info(f"Selected {len(top_solutions)} top solutions")
RobynLogger.log_df(self.logger, top_solutions, print_head=True)

self.logger.debug("Calculating confidence intervals")
ci_results: ConfidenceIntervalCollectionData = (
Expand Down Expand Up @@ -170,13 +173,15 @@ def cluster_models(self, config: ClusteringConfig) -> ClusteredResult:
ci_results, config
),
)
clustered_data = result.cluster_data.assign(
top_sol=result.cluster_data["sol_id"].isin(top_solutions["sol_id"]),
cluster=result.cluster_data["cluster"].astype(int),
)

RobynLogger.log_df(self.logger, clustered_data)
self.logger.info("Clustering process completed successfully")
return ClusteredResult(
cluster_data=result.cluster_data.assign(
top_sol=result.cluster_data["sol_id"].isin(top_solutions["sol_id"]),
cluster=result.cluster_data["cluster"].astype(int),
),
cluster_data=clustered_data,
top_solutions=top_solutions,
wss=result.wss,
cluster_ci=cluster_ci,
Expand Down Expand Up @@ -454,6 +459,9 @@ def clusterKmeans(
result.cluster_data = pd.concat(
[result.cluster_data, df_copy["cluster"]], axis=1
)
self.logger.debug(f"Clustered data after KMeans:")
RobynLogger.log_df(self.logger, result.cluster_data)

df = df_copy

self.logger.debug("Calculating cluster means and counts")
Expand All @@ -468,6 +476,8 @@ def clusterKmeans(
cluster_means["n"] = cluster_counts

self.logger.debug(f"Cluster sizes: {dict(cluster_counts)}")
self.logger.debug(f"Cluster means:")
RobynLogger.log_df(self.logger, cluster_means)

# Check for potentially problematic clusters
min_cluster_size = cluster_counts.min()
Expand All @@ -489,47 +499,6 @@ def clusterKmeans(

return result

def _bootstrap_sampling(
self, sample: pd.Series, boot_n: int
) -> Dict[str, List[float]]:
"""Calculate bootstrap confidence interval"""
self.logger.debug(f"Starting bootstrap sampling with n={boot_n}")

if len(sample[~sample.isna()]) > 1:
sample_n = len(sample)
sample_mean = np.mean(sample)

self.logger.debug(f"Performing bootstrap with sample size={sample_n}")
boot_sample = np.random.choice(
sample, size=(boot_n, sample_n), replace=True
)
boot_means = np.mean(boot_sample, axis=1)
se = np.std(boot_means)
me = stats.t.ppf(0.975, sample_n - 1) * se
sample_me = np.sqrt(sample_n) * me
ci = [sample_mean - sample_me, sample_mean + sample_me]

self.logger.debug(
f"Bootstrap results: CI=[{ci[0]:.4f}, {ci[1]:.4f}], SE={se:.4f}"
)
return {
"boot_means": boot_means,
"ci": ci,
"se": [float(se)],
}
else:
self.logger.warning("Insufficient non-NA samples for bootstrap analysis")
ci = (
[np.nan, np.nan]
if sample.isna().all()
else [sample.iloc[0], sample.iloc[0]]
)
return {
"boot_means": sample.tolist(),
"ci": ci,
"se": [0.0],
}

def _select_optimal_clusters(
self,
df: pd.DataFrame,
Expand Down Expand Up @@ -766,14 +735,14 @@ def _select_top_solutions(
df["rank"] = df.groupby("cluster", observed=False)["error_score"].rank(
method="dense"
)

self.logger.info(f"Selected {len(df)} solutions from {initial_solutions} total")
self.logger.debug(f"Final dataset shape: {df.shape}")

return df[
df = df[
["cluster", "rank"]
+ [col for col in df.columns if col not in ["cluster", "rank"]]
]
self.logger.info(f"Selected {len(df)} solutions from {initial_solutions} total")
RobynLogger.log_df(self.logger, df)

return df

def _bootstrap_sampling(
self, sample: pd.Series, boot_n: int, seed: int = 1
Expand Down
10 changes: 10 additions & 0 deletions python/src/robyn/modeling/pareto/data_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging

import pandas as pd
from robyn.common.logger import RobynLogger
from robyn.modeling.entities.modeloutputs import ModelOutputs, Trial


Expand Down Expand Up @@ -73,13 +74,22 @@ def aggregate_model_data(self, calibrated: bool) -> Dict[str, pd.DataFrame]:
result_hyp_param = pd.concat(result_hyp_param_list, ignore_index=True)
x_decomp_agg = pd.concat(x_decomp_agg_list, ignore_index=True)

self.logger.debug("Aggregated result_hyp_param:")
RobynLogger.log_df(self.logger, result_hyp_param)

self.logger.debug("Aggregated x_decomp_agg:")
RobynLogger.log_df(self.logger, x_decomp_agg)

self._check_sol_id(result_hyp_param, x_decomp_agg)

result_calibration = self._process_calibration_data(trials, calibrated)

if not hyper_fixed:
self._add_iterations(result_hyp_param, x_decomp_agg, result_calibration)

self.logger.debug("Aggregated x_decomp_agg:")
RobynLogger.log_df(self.logger, result_calibration)

self._merge_bootstrap_results(x_decomp_agg)

return {
Expand Down
189 changes: 0 additions & 189 deletions python/src/robyn/modeling/pareto/immediate_carryover.py

This file was deleted.

Loading

0 comments on commit 0b98a2f

Please sign in to comment.