diff --git a/evaluation/experiments/utils.libsonnet b/evaluation/experiments/utils.libsonnet index bce239d4a..4ad4a069a 100644 --- a/evaluation/experiments/utils.libsonnet +++ b/evaluation/experiments/utils.libsonnet @@ -102,6 +102,7 @@ local outputs_step_name(config) = config.task_name + std.get(config.task_kwargs, "task_rename", ""); local outputs_ref(config) = {type: "ref", ref: outputs_step_name(config)}; +local processed_outputs_ref(config) = {type: "ref", ref: "processed_" + outputs_step_name(config)}; local create_outputs_steps(model_task_configs) = std.foldl( function(x, config) x + { @@ -119,12 +120,30 @@ local create_outputs_steps(model_task_configs) = std.foldl( {} ); +local create_process_outputs_steps(model_task_configs) = std.foldl( + function(x, config) x + { + ["processed_" + outputs_step_name(config)]: { + type: "process-outputs", + outputs: outputs_ref(config), + step_resources: { + gpu_count: 0 + } + }, + }, + model_task_configs, + {} +); local all_outputs(model_task_configs) = [ outputs_ref(config) for config in model_task_configs ]; +local all_processed_outputs(model_task_configs) = [ + processed_outputs_ref(config) + for config in model_task_configs +]; + local all_pred_kwargs(model_task_configs) = [ config.prediction_kwargs for config in model_task_configs @@ -149,6 +168,20 @@ local create_outputs_as_rows_steps(model_task_configs, gsheet) = } }; +local create_processed_outputs_as_rows_multiple_metrics_steps(model_task_configs, gsheet) = + { + "combine-all-outputs": { + type: "write-outputs-as-rows-multiple-metrics", + outputs: all_processed_outputs(model_task_configs), + models: all_models(model_task_configs), + prediction_kwargs: all_pred_kwargs(model_task_configs), + gsheet: gsheet, + step_resources: { + gpu_count: 0 + } + } + }; + local create_pipeline(models, task_sets, gsheet) = // Model steps @@ -175,9 +208,39 @@ local create_pipeline(models, task_sets, gsheet) = all_steps; +local create_fine_grained_pipeline(models, task_sets, gsheet) = + + // Model steps + local model_location_steps = create_model_location_steps(models); + local catwalk_model_steps = create_catwalk_model_steps(models); + + // Task steps + local task_configs = flatten_task_sets(task_sets); + local task_steps = create_task_steps(task_configs); + + // Prediction and metrics + local model_task_configs = model_task_cross_product(models, task_configs); + local outputs_steps = create_outputs_steps(model_task_configs); + + local processed_outputs_steps = create_process_outputs_steps(model_task_configs); + + // Aggregate results for each task set and model combination + local combine_all_outputs = create_processed_outputs_as_rows_multiple_metrics_steps(model_task_configs, gsheet); + + local all_steps = + model_location_steps + + catwalk_model_steps + + task_steps + + outputs_steps + + processed_outputs_steps + + combine_all_outputs; + + all_steps; + { - create_pipeline: create_pipeline + create_pipeline: create_pipeline, + create_fine_grained_pipeline: create_fine_grained_pipeline } /*local wandb_log_step = { diff --git a/evaluation/steps/run_catwalk.py b/evaluation/steps/run_catwalk.py index f6444d438..ccea68096 100644 --- a/evaluation/steps/run_catwalk.py +++ b/evaluation/steps/run_catwalk.py @@ -6,6 +6,7 @@ from pydoc import locate from typing import Any, Dict, List, Optional +import numpy as np import pandas as pd import pytz from catwalk.dependencies.lm_eval.utils import simple_parse_args_string @@ -128,6 +129,40 @@ def run( } +@Step.register("process-outputs") +class ProcessOutputs(Step): + VERSION = "002" + + def run( + self, + outputs: Dict, + **kwargs, + ) -> Dict: + task_name = outputs["task"] + new_metrics: Dict[str, Dict] = {} + if "subdomain" in outputs["instance_predictions"][0]["instance"]: + new_metrics[f"ppl_token_{task_name}_subdomains"] = {} + sum_logits: Dict[str, float] = {} + num_tokens: Dict[str, int] = {} + for instance_prediction in outputs["instance_predictions"]: + subdomain = instance_prediction["instance"]["subdomain"] + sum_logits[subdomain] = ( + sum_logits.get(subdomain, 0) + instance_prediction["prediction"]["model_output"]["sum_logits"] + ) + num_tokens[subdomain] = ( + num_tokens.get(subdomain, 0) + instance_prediction["prediction"]["model_output"]["num_tokens"] + ) + + for subdomain in sum_logits: + new_metrics[f"ppl_token_{task_name}_subdomains"][subdomain] = np.exp( + -sum_logits[subdomain] / num_tokens[subdomain] + ) + + outputs["metrics"].update(new_metrics) + + return outputs + + @Step.register("predict-and-calculate-metrics") class PredictAndCalculateMetricsStep(Step): VERSION = "003" @@ -271,16 +306,65 @@ def run( tsv_outputs.append(row) if gsheet: - self._write_to_gsheet(gsheet, tsv_outputs) + write_to_gsheet(gsheet, tsv_outputs) return tsv_outputs - def _write_to_gsheet(self, gsheet: str, rows: List[Dict]): - import pygsheets - client = pygsheets.authorize(service_account_json=os.environ["GDRIVE_SERVICE_ACCOUNT_JSON"]) - sheet = client.open(gsheet) - worksheet = sheet[0] # TODO: pass in sheet title, etc. - current_df = worksheet.get_as_df() - new_df = pd.concat([current_df, pd.DataFrame(rows)]) - worksheet.set_dataframe(new_df, (1, 1), nan="") +@Step.register("write-outputs-as-rows-multiple-metrics") +class WriteOutputsAsRowsMultipleMetrics(Step): + VERSION = "001" + + def run( + self, models: List[str], outputs: List[Dict], prediction_kwargs: List[Dict], gsheet: Optional[str] = None + ) -> Dict[str, List[Dict]]: + per_metric_type_tsv_outputs: Dict[str, List[Dict]] = {} + for idx, d in enumerate(outputs): + model = models[idx] + pred_kwargs = copy.deepcopy(DEFAULT_PREDICTION_KWARGS) + pred_kwargs.update(prediction_kwargs[idx]) + tsv_outputs: List[Dict] = [] + for metric_type_name, metrics_dict in d["metrics"].items(): + row = {} + row["date"] = datetime.now(tz=pytz.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + row["model"] = model + row["model_kwargs"] = d["model_kwargs"] + row["full_model"] = f"lm::pretrained={model}" + row["task"] = d["task"] + row["processing_time"] = d["processing_time"] + row["num_instances"] = d["num_instances"] + row["tango_workspace"] = self.workspace.url + row["tango_step"] = self.unique_id + for metric_name in metrics_dict: + row[metric_name] = metrics_dict[metric_name] + + row.update(pred_kwargs) + per_metric_type_tsv_outputs[metric_type_name] = per_metric_type_tsv_outputs.get( + metric_type_name, [] + ) + [row] + + if gsheet: + for metric_type_name, tsv_outputs in per_metric_type_tsv_outputs.items(): + write_to_gsheet(gsheet, tsv_outputs, sheet_title=metric_type_name) + + return per_metric_type_tsv_outputs + + +def write_to_gsheet(gsheet: str, rows: List[Dict], sheet_title: str = "Sheet1"): + import pygsheets + + # make rows into dataframe + new_df = pd.DataFrame(rows) + + client = pygsheets.authorize(service_account_json=os.environ["GDRIVE_SERVICE_ACCOUNT_JSON"]) + sheet = client.open(gsheet) + + # make sheet if doesn't exist + if sheet_title in [s.title for s in sheet.worksheets()]: + worksheet = sheet.worksheet_by_title(sheet_title) + else: + sheet.add_worksheet(rows=new_df.shape[0], cols=new_df.shape[1], title=sheet_title) + worksheet = sheet.worksheet_by_title(sheet_title) + current_df = worksheet.get_as_df() + new_df = pd.concat([current_df, new_df]) + worksheet.set_dataframe(new_df, (1, 1), nan="")