From 8ba652fa0cfb0a50423c50259844f83e8aae6846 Mon Sep 17 00:00:00 2001 From: Ignas Baranauskas Date: Thu, 22 Aug 2024 14:54:50 +0100 Subject: [PATCH 1/3] Chnage the isort profile to black, and add pkg dir for black and flake8 Signed-off-by: Ignas Baranauskas --- .pre-commit-config.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0681a816207..f191e042b50 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,17 +10,17 @@ repos: hooks: - id: isort name: isort - entry: isort --profile google + entry: isort --profile black - repo: https://github.com/psf/black rev: 24.2.0 hooks: - id: black - files: (sdk|examples)/.* + files: (sdk|examples|pkg)/.* - repo: https://github.com/pycqa/flake8 rev: 7.1.1 hooks: - id: flake8 - files: (sdk|examples)/.* + files: (sdk|examples|pkg)/.* exclude: | (?x)^( .*zz_generated.deepcopy.*| From 9fe89575321a1edaaeeb1d81b3dd06bcd362611b Mon Sep 17 00:00:00 2001 From: Ignas Baranauskas Date: Thu, 22 Aug 2024 14:55:47 +0100 Subject: [PATCH 2/3] Fix the formating Signed-off-by: Ignas Baranauskas --- cmd/earlystopping/medianstop/v1beta1/main.py | 2 +- .../v1beta1/tfevent-metricscollector/main.py | 4 +- cmd/suggestion/hyperband/v1beta1/main.py | 2 +- cmd/suggestion/hyperopt/v1beta1/main.py | 2 +- cmd/suggestion/nas/darts/v1beta1/main.py | 2 +- cmd/suggestion/nas/enas/v1beta1/main.py | 2 +- cmd/suggestion/optuna/v1beta1/main.py | 2 +- cmd/suggestion/pbt/v1beta1/main.py | 2 +- cmd/suggestion/skopt/v1beta1/main.py | 2 +- .../kubeflow-pipelines/mpi-job-horovod.py | 22 +- .../trial-images/darts-cnn-cifar10/model.py | 4 +- .../darts-cnn-cifar10/run_trial.py | 6 +- .../enas-cnn-cifar10/ModelConstructor.py | 11 +- .../trial-images/enas-cnn-cifar10/RunTrial.py | 6 +- .../enas-cnn-cifar10/op_library.py | 22 +- .../trial-images/pytorch-mnist/mnist.py | 3 +- .../tf-mnist-with-summaries/mnist.py | 4 +- .../v1beta1/medianstop/service.py | 160 +++++++---- pkg/metricscollector/v1beta1/common/pns.py | 28 +- .../tfevent_loader.py | 46 ++-- pkg/suggestion/v1beta1/hyperband/parameter.py | 18 +- .../v1beta1/hyperband/parsing_util.py | 51 ++-- pkg/suggestion/v1beta1/hyperband/service.py | 256 ++++++++++-------- .../v1beta1/hyperopt/base_service.py | 87 +++--- pkg/suggestion/v1beta1/hyperopt/service.py | 58 ++-- .../v1beta1/internal/base_health_service.py | 33 ++- .../v1beta1/internal/search_space.py | 68 +++-- pkg/suggestion/v1beta1/internal/trial.py | 7 +- .../v1beta1/nas/common/validation.py | 56 +++- pkg/suggestion/v1beta1/nas/darts/service.py | 88 +++--- .../v1beta1/nas/enas/AlgorithmSettings.py | 46 ++-- pkg/suggestion/v1beta1/nas/enas/Controller.py | 120 +++++--- pkg/suggestion/v1beta1/nas/enas/Operation.py | 12 +- pkg/suggestion/v1beta1/nas/enas/service.py | 253 ++++++++++------- pkg/suggestion/v1beta1/optuna/base_service.py | 52 ++-- pkg/suggestion/v1beta1/optuna/service.py | 9 +- pkg/suggestion/v1beta1/pbt/service.py | 23 +- pkg/suggestion/v1beta1/skopt/base_service.py | 82 ++++-- pkg/suggestion/v1beta1/skopt/service.py | 77 ++++-- .../kubeflow/katib/api/katib_client.py | 5 +- .../kubeflow/katib/api/katib_client_test.py | 27 +- .../kubeflow/katib/api/report_metrics.py | 5 +- .../scripts/gh-actions/run-e2e-experiment.py | 6 +- .../scripts/gh-actions/run-e2e-tune-api.py | 3 +- test/e2e/v1beta1/scripts/gh-actions/verify.py | 3 +- .../v1beta1/suggestion/test_darts_service.py | 7 +- 46 files changed, 1085 insertions(+), 699 deletions(-) diff --git a/cmd/earlystopping/medianstop/v1beta1/main.py b/cmd/earlystopping/medianstop/v1beta1/main.py index 240517a76c3..132564d12cb 100644 --- a/cmd/earlystopping/medianstop/v1beta1/main.py +++ b/cmd/earlystopping/medianstop/v1beta1/main.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import logging import time +from concurrent import futures import grpc diff --git a/cmd/metricscollector/v1beta1/tfevent-metricscollector/main.py b/cmd/metricscollector/v1beta1/tfevent-metricscollector/main.py index 21ab7e20bd2..274ed59ba48 100644 --- a/cmd/metricscollector/v1beta1/tfevent-metricscollector/main.py +++ b/cmd/metricscollector/v1beta1/tfevent-metricscollector/main.py @@ -13,9 +13,7 @@ # limitations under the License. import argparse -from logging import getLogger -from logging import INFO -from logging import StreamHandler +from logging import INFO, StreamHandler, getLogger import api_pb2 import api_pb2_grpc diff --git a/cmd/suggestion/hyperband/v1beta1/main.py b/cmd/suggestion/hyperband/v1beta1/main.py index 21dd46e4d9e..d2a3c2dce1f 100644 --- a/cmd/suggestion/hyperband/v1beta1/main.py +++ b/cmd/suggestion/hyperband/v1beta1/main.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import time +from concurrent import futures import grpc diff --git a/cmd/suggestion/hyperopt/v1beta1/main.py b/cmd/suggestion/hyperopt/v1beta1/main.py index c459d5b532c..10d4497c20c 100644 --- a/cmd/suggestion/hyperopt/v1beta1/main.py +++ b/cmd/suggestion/hyperopt/v1beta1/main.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import time +from concurrent import futures import grpc diff --git a/cmd/suggestion/nas/darts/v1beta1/main.py b/cmd/suggestion/nas/darts/v1beta1/main.py index a1926ad8326..f0b8f6a1f97 100644 --- a/cmd/suggestion/nas/darts/v1beta1/main.py +++ b/cmd/suggestion/nas/darts/v1beta1/main.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import time +from concurrent import futures import grpc diff --git a/cmd/suggestion/nas/enas/v1beta1/main.py b/cmd/suggestion/nas/enas/v1beta1/main.py index 62dda9c810c..399ed275a47 100644 --- a/cmd/suggestion/nas/enas/v1beta1/main.py +++ b/cmd/suggestion/nas/enas/v1beta1/main.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import time +from concurrent import futures import grpc diff --git a/cmd/suggestion/optuna/v1beta1/main.py b/cmd/suggestion/optuna/v1beta1/main.py index 435933f4858..cadd393d704 100644 --- a/cmd/suggestion/optuna/v1beta1/main.py +++ b/cmd/suggestion/optuna/v1beta1/main.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import time +from concurrent import futures import grpc diff --git a/cmd/suggestion/pbt/v1beta1/main.py b/cmd/suggestion/pbt/v1beta1/main.py index 9e5efb133a6..7f16ffad432 100644 --- a/cmd/suggestion/pbt/v1beta1/main.py +++ b/cmd/suggestion/pbt/v1beta1/main.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import time +from concurrent import futures import grpc diff --git a/cmd/suggestion/skopt/v1beta1/main.py b/cmd/suggestion/skopt/v1beta1/main.py index 55d6215529b..d2541042855 100644 --- a/cmd/suggestion/skopt/v1beta1/main.py +++ b/cmd/suggestion/skopt/v1beta1/main.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures import time +from concurrent import futures import grpc diff --git a/examples/v1beta1/kubeflow-pipelines/mpi-job-horovod.py b/examples/v1beta1/kubeflow-pipelines/mpi-job-horovod.py index ef48d916bb8..800012a2650 100644 --- a/examples/v1beta1/kubeflow-pipelines/mpi-job-horovod.py +++ b/examples/v1beta1/kubeflow-pipelines/mpi-job-horovod.py @@ -29,17 +29,19 @@ # https://www.kubeflow.org/docs/components/training/mpi/ import kfp -from kfp import components import kfp.dsl as dsl -from kubeflow.katib import ApiClient -from kubeflow.katib import V1beta1AlgorithmSetting -from kubeflow.katib import V1beta1AlgorithmSpec -from kubeflow.katib import V1beta1ExperimentSpec -from kubeflow.katib import V1beta1FeasibleSpace -from kubeflow.katib import V1beta1ObjectiveSpec -from kubeflow.katib import V1beta1ParameterSpec -from kubeflow.katib import V1beta1TrialParameterSpec -from kubeflow.katib import V1beta1TrialTemplate +from kfp import components +from kubeflow.katib import ( + ApiClient, + V1beta1AlgorithmSetting, + V1beta1AlgorithmSpec, + V1beta1ExperimentSpec, + V1beta1FeasibleSpace, + V1beta1ObjectiveSpec, + V1beta1ParameterSpec, + V1beta1TrialParameterSpec, + V1beta1TrialTemplate, +) @dsl.pipeline( diff --git a/examples/v1beta1/trial-images/darts-cnn-cifar10/model.py b/examples/v1beta1/trial-images/darts-cnn-cifar10/model.py index 21232b17a9b..df61fe24014 100644 --- a/examples/v1beta1/trial-images/darts-cnn-cifar10/model.py +++ b/examples/v1beta1/trial-images/darts-cnn-cifar10/model.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from operations import FactorizedReduce -from operations import MixedOp -from operations import StdConv import torch import torch.nn as nn import torch.nn.functional as F +from operations import FactorizedReduce, MixedOp, StdConv class Cell(nn.Module): diff --git a/examples/v1beta1/trial-images/darts-cnn-cifar10/run_trial.py b/examples/v1beta1/trial-images/darts-cnn-cifar10/run_trial.py index 07f454a41cc..0a1f145bd6d 100644 --- a/examples/v1beta1/trial-images/darts-cnn-cifar10/run_trial.py +++ b/examples/v1beta1/trial-images/darts-cnn-cifar10/run_trial.py @@ -16,13 +16,13 @@ import argparse import json -from architect import Architect -from model import NetworkCNN import numpy as np -from search_space import SearchSpace import torch import torch.nn as nn import utils +from architect import Architect +from model import NetworkCNN +from search_space import SearchSpace def main(): diff --git a/examples/v1beta1/trial-images/enas-cnn-cifar10/ModelConstructor.py b/examples/v1beta1/trial-images/enas-cnn-cifar10/ModelConstructor.py index 8d482dc70fb..64b517c5b77 100644 --- a/examples/v1beta1/trial-images/enas-cnn-cifar10/ModelConstructor.py +++ b/examples/v1beta1/trial-images/enas-cnn-cifar10/ModelConstructor.py @@ -14,16 +14,9 @@ import json -from keras.layers import Dense -from keras.layers import Dropout -from keras.layers import GlobalAveragePooling2D -from keras.layers import Input +from keras.layers import Dense, Dropout, GlobalAveragePooling2D, Input from keras.models import Model -from op_library import concat -from op_library import conv -from op_library import dw_conv -from op_library import reduction -from op_library import sp_conv +from op_library import concat, conv, dw_conv, reduction, sp_conv class ModelConstructor(object): diff --git a/examples/v1beta1/trial-images/enas-cnn-cifar10/RunTrial.py b/examples/v1beta1/trial-images/enas-cnn-cifar10/RunTrial.py index 7471019afae..f0db82bea79 100644 --- a/examples/v1beta1/trial-images/enas-cnn-cifar10/RunTrial.py +++ b/examples/v1beta1/trial-images/enas-cnn-cifar10/RunTrial.py @@ -14,13 +14,11 @@ import argparse +import tensorflow as tf from keras.datasets import cifar10 from ModelConstructor import ModelConstructor from tensorflow import keras -import tensorflow as tf -from tensorflow.keras.layers import RandomFlip -from tensorflow.keras.layers import RandomTranslation -from tensorflow.keras.layers import Rescaling +from tensorflow.keras.layers import RandomFlip, RandomTranslation, Rescaling from tensorflow.keras.utils import to_categorical if __name__ == "__main__": diff --git a/examples/v1beta1/trial-images/enas-cnn-cifar10/op_library.py b/examples/v1beta1/trial-images/enas-cnn-cifar10/op_library.py index 919f92fb5b9..eebcff7db49 100644 --- a/examples/v1beta1/trial-images/enas-cnn-cifar10/op_library.py +++ b/examples/v1beta1/trial-images/enas-cnn-cifar10/op_library.py @@ -12,17 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from keras import backend as K -from keras.layers import Activation -from keras.layers import AveragePooling2D -from keras.layers import BatchNormalization -from keras.layers import concatenate -from keras.layers import Conv2D -from keras.layers import DepthwiseConv2D -from keras.layers import MaxPooling2D -from keras.layers import SeparableConv2D -from keras.layers import ZeroPadding2D import numpy as np +from keras import backend as K +from keras.layers import ( + Activation, + AveragePooling2D, + BatchNormalization, + Conv2D, + DepthwiseConv2D, + MaxPooling2D, + SeparableConv2D, + ZeroPadding2D, + concatenate, +) def concat(inputs): diff --git a/examples/v1beta1/trial-images/pytorch-mnist/mnist.py b/examples/v1beta1/trial-images/pytorch-mnist/mnist.py index fdc87925dea..7ecc911cbb4 100644 --- a/examples/v1beta1/trial-images/pytorch-mnist/mnist.py +++ b/examples/v1beta1/trial-images/pytorch-mnist/mnist.py @@ -24,8 +24,7 @@ import torch.nn as nn import torch.nn.functional as F import torch.optim as optim -from torchvision import datasets -from torchvision import transforms +from torchvision import datasets, transforms WORLD_SIZE = int(os.environ.get("WORLD_SIZE", 1)) diff --git a/examples/v1beta1/trial-images/tf-mnist-with-summaries/mnist.py b/examples/v1beta1/trial-images/tf-mnist-with-summaries/mnist.py index 4aac254bdcc..ca65ff5bbe6 100644 --- a/examples/v1beta1/trial-images/tf-mnist-with-summaries/mnist.py +++ b/examples/v1beta1/trial-images/tf-mnist-with-summaries/mnist.py @@ -17,9 +17,7 @@ import tensorflow as tf from tensorflow.keras import Model -from tensorflow.keras.layers import Conv2D -from tensorflow.keras.layers import Dense -from tensorflow.keras.layers import Flatten +from tensorflow.keras.layers import Conv2D, Dense, Flatten class MyModel(Model): diff --git a/pkg/earlystopping/v1beta1/medianstop/service.py b/pkg/earlystopping/v1beta1/medianstop/service.py index 2e4d02acfc2..94c9fdc6bda 100644 --- a/pkg/earlystopping/v1beta1/medianstop/service.py +++ b/pkg/earlystopping/v1beta1/medianstop/service.py @@ -12,17 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime import logging import multiprocessing +from datetime import datetime from typing import Iterable, Optional import grpc -from kubernetes import client -from kubernetes import config +from kubernetes import client, config -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc logger = logging.getLogger() logging.basicConfig(level=logging.INFO) @@ -39,7 +37,6 @@ class MedianStopService(api_pb2_grpc.EarlyStoppingServicer): - def __init__(self): super(MedianStopService, self).__init__() self.is_first_run = True @@ -52,22 +49,30 @@ def __init__(self): # Assume that Trial namespace = Suggestion namespace. try: - with open('/var/run/secrets/kubernetes.io/serviceaccount/namespace', 'r') as f: + with open( + "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r" + ) as f: self.namespace = f.readline() # Set config and api instance for k8s client. config.load_incluster_config() # This is used when service is not running in k8s, e.g. for unit tests. except Exception as e: - logger.info("{}. Service is not running in Kubernetes Pod, \"{}\" namespace is used".format( - e, DEFAULT_NAMESPACE - )) + logger.info( + '{}. Service is not running in Kubernetes Pod, "{}" namespace is used'.format( + e, DEFAULT_NAMESPACE + ) + ) self.namespace = DEFAULT_NAMESPACE # Set config and api instance for k8s client. config.load_kube_config() self.api_instance = client.CustomObjectsApi() - def ValidateEarlyStoppingSettings(self, request: api_pb2.ValidateEarlyStoppingSettingsRequest, context: grpc.ServicerContext) -> api_pb2.ValidateEarlyStoppingSettingsReply: + def ValidateEarlyStoppingSettings( + self, + request: api_pb2.ValidateEarlyStoppingSettingsRequest, + context: grpc.ServicerContext, + ) -> api_pb2.ValidateEarlyStoppingSettingsReply: is_valid, message = self.validate_early_stopping_spec(request.early_stopping) if not is_valid: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) @@ -78,7 +83,9 @@ def ValidateEarlyStoppingSettings(self, request: api_pb2.ValidateEarlyStoppingSe def validate_early_stopping_spec(self, early_stopping_spec): algorithm_name = early_stopping_spec.algorithm_name if algorithm_name == "medianstop": - return self.validate_medianstop_setting(early_stopping_spec.algorithm_settings) + return self.validate_medianstop_setting( + early_stopping_spec.algorithm_settings + ) else: return False, "unknown algorithm name {}".format(algorithm_name) @@ -88,59 +95,90 @@ def validate_medianstop_setting(early_stopping_settings): try: if setting.name == "min_trials_required": if not (int(setting.value) > 0): - return False, "min_trials_required must be greater than zero (>0)" + return ( + False, + "min_trials_required must be greater than zero (>0)", + ) elif setting.name == "start_step": if not (int(setting.value) >= 1): - return False, "start_step must be greater or equal than one (>=1)" + return ( + False, + "start_step must be greater or equal than one (>=1)", + ) else: - return False, "unknown setting {} for algorithm medianstop".format(setting.name) + return False, "unknown setting {} for algorithm medianstop".format( + setting.name + ) except Exception as e: - return False, "failed to validate {}({}): {}".format(setting.name, setting.value, e) + return False, "failed to validate {}({}): {}".format( + setting.name, setting.value, e + ) return True, "" - def GetEarlyStoppingRules(self, request: api_pb2.GetEarlyStoppingRulesRequest, context: grpc.ServicerContext) -> api_pb2.GetSuggestionsReply: + def GetEarlyStoppingRules( + self, + request: api_pb2.GetEarlyStoppingRulesRequest, + context: grpc.ServicerContext, + ) -> api_pb2.GetSuggestionsReply: logger.info("Get new early stopping rules") # Get required values for the first call. if self.is_first_run: self.is_first_run = False # Get early stopping settings. - self.get_early_stopping_settings(request.experiment.spec.early_stopping.algorithm_settings) - logger.info("Median stopping settings are: min_trials_required: {}, start_step: {}".format( - self.min_trials_required, self.start_step)) + self.get_early_stopping_settings( + request.experiment.spec.early_stopping.algorithm_settings + ) + logger.info( + "Median stopping settings are: min_trials_required: {}, start_step: {}".format( + self.min_trials_required, self.start_step + ) + ) # Get comparison type and objective metric if request.experiment.spec.objective.type == api_pb2.MAXIMIZE: self.comparison = api_pb2.LESS else: self.comparison = api_pb2.GREATER - self.objective_metric = request.experiment.spec.objective.objective_metric_name + self.objective_metric = ( + request.experiment.spec.objective.objective_metric_name + ) # Get DB manager address. It should have host and port. # For example: katib-db-manager.kubeflow:6789 - default one. - self.db_manager_address = request.db_manager_address.split(':') + self.db_manager_address = request.db_manager_address.split(":") if len(self.db_manager_address) != 2: - raise Exception("Invalid Katib DB manager service address: {}".format(self.db_manager_address)) + raise Exception( + "Invalid Katib DB manager service address: {}".format( + self.db_manager_address + ) + ) early_stopping_rules = [] median = self.get_median_value(request.trials) if median is not None: - early_stopping_rules.append(api_pb2.EarlyStoppingRule( - name=self.objective_metric, - value=str(median), - comparison=self.comparison, - start_step=self.start_step, - )) - - logger.info("New early stopping rules are:\n {}\n\n".format(early_stopping_rules)) + early_stopping_rules.append( + api_pb2.EarlyStoppingRule( + name=self.objective_metric, + value=str(median), + comparison=self.comparison, + start_step=self.start_step, + ) + ) + + logger.info( + "New early stopping rules are:\n {}\n\n".format(early_stopping_rules) + ) return api_pb2.GetEarlyStoppingRulesReply( early_stopping_rules=early_stopping_rules ) - def get_early_stopping_settings(self, early_stopping_settings: Iterable[api_pb2.EarlyStoppingSetting]): + def get_early_stopping_settings( + self, early_stopping_settings: Iterable[api_pb2.EarlyStoppingSetting] + ): for setting in early_stopping_settings: if setting.name == "min_trials_required": self.min_trials_required = int(setting.value) @@ -168,8 +206,11 @@ def get_median_value(self, trials: Iterable[api_pb2.Trial]) -> Optional[float]: ) # Get only first start_step metrics. - # Since metrics are collected consistently and ordered by time, we slice top start_step metrics. - first_x_logs = get_log_response.observation_log.metric_logs[:self.start_step] + # Since metrics are collected consistently and ordered by time, + # we slice top start_step metrics. + first_x_logs = get_log_response.observation_log.metric_logs[ + : self.start_step + ] metric_sum = 0 for log in first_x_logs: metric_sum += float(log.metric.value) @@ -177,22 +218,33 @@ def get_median_value(self, trials: Iterable[api_pb2.Trial]) -> Optional[float]: # Get average metric value for the Trial. new_average = metric_sum / len(first_x_logs) self.trials_avg_history[trial.name] = new_average - logger.info("Adding new succeeded Trial: {} with average metrics value: {}".format( - trial.name, new_average)) - logger.info("Trials average log history: {}".format(self.trials_avg_history)) + logger.info( + "Adding new succeeded Trial: {} with average metrics value: {}".format( + trial.name, new_average + ) + ) + logger.info( + "Trials average log history: {}".format(self.trials_avg_history) + ) # If count of succeeded Trials is greater than min_trials_required, calculate median. if len(self.trials_avg_history) >= self.min_trials_required: - median = sum(list(self.trials_avg_history.values())) / len(self.trials_avg_history) + median = sum(list(self.trials_avg_history.values())) / len( + self.trials_avg_history + ) logger.info("Generate new Median value: {}".format(median)) return median # Else, return None. - logger.info("Count of succeeded Trials: {} is less than min_trials_required: {}".format( - len(self.trials_avg_history), self.min_trials_required - )) + logger.info( + "Count of succeeded Trials: {} is less than min_trials_required: {}".format( + len(self.trials_avg_history), self.min_trials_required + ) + ) return None - def SetTrialStatus(self, request: api_pb2.SetTrialStatusRequest, context: grpc.ServicerContext) -> api_pb2.SetTrialStatusReply: + def SetTrialStatus( + self, request: api_pb2.SetTrialStatusRequest, context: grpc.ServicerContext + ) -> api_pb2.SetTrialStatusReply: trial_name = request.trial_name logger.info("Update status for Trial: {}".format(trial_name)) @@ -205,7 +257,8 @@ def SetTrialStatus(self, request: api_pb2.SetTrialStatusRequest, context: grpc.S self.namespace, TRIAL_PLURAL, trial_name, - async_req=True) + async_req=True, + ) trial = None try: @@ -214,7 +267,10 @@ def SetTrialStatus(self, request: api_pb2.SetTrialStatusRequest, context: grpc.S raise Exception("Timeout trying to get Katib Trial") except Exception as e: raise Exception( - "Get Trial: {} in namespace: {} failed. Exception: {}".format(trial_name, self.namespace, e)) + "Get Trial: {} in namespace: {} failed. Exception: {}".format( + trial_name, self.namespace, e + ) + ) time_now = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") @@ -237,13 +293,19 @@ def SetTrialStatus(self, request: api_pb2.SetTrialStatusRequest, context: grpc.S TRIAL_PLURAL, trial_name, trial, - async_req=True) + async_req=True, + ) except Exception as e: raise Exception( "Update status for Trial: {} in namespace: {} failed. Exception: {}".format( - trial_name, self.namespace, e)) - - logger.info("Changed status to: {} for Trial: {} in namespace: {}\n\n".format( - STATUS_EARLY_STOPPED, trial_name, self.namespace)) + trial_name, self.namespace, e + ) + ) + + logger.info( + "Changed status to: {} for Trial: {} in namespace: {}\n\n".format( + STATUS_EARLY_STOPPED, trial_name, self.namespace + ) + ) return api_pb2.SetTrialStatusReply() diff --git a/pkg/metricscollector/v1beta1/common/pns.py b/pkg/metricscollector/v1beta1/common/pns.py index 86f5563bbec..a4d74beaaaf 100644 --- a/pkg/metricscollector/v1beta1/common/pns.py +++ b/pkg/metricscollector/v1beta1/common/pns.py @@ -25,17 +25,20 @@ def WaitMainProcesses(pool_interval, timout, wait_all, completed_marked_dir): Hold metrics collector parser until required pids are finished """ - if not sys.platform.startswith('linux'): + if not sys.platform.startswith("linux"): raise Exception("Platform '{}' unsupported".format(sys.platform)) pids, main_pid = GetMainProcesses(completed_marked_dir) - return WaitPIDs(pids, main_pid, pool_interval, timout, wait_all, completed_marked_dir) + return WaitPIDs( + pids, main_pid, pool_interval, timout, wait_all, completed_marked_dir + ) def GetMainProcesses(completed_marked_dir): """ - Return array with all running processes pids and main process pid which metrics collector is waiting. + Return array with all running processes pids + and main process pid which metrics collector is waiting. """ pids = set() main_pid = 0 @@ -59,7 +62,10 @@ def GetMainProcesses(completed_marked_dir): # In addition to that, command line contains completed marker for the main pid. # For example: echo completed > /var/log/katib/$$$$.pid # completed_marked_dir is the directory for completed marker, e.g. /var/log/katib - if main_pid == 0 or ("echo {} > {}".format(const.TRAINING_COMPLETED, completed_marked_dir) in cmd_lind): + if main_pid == 0 or ( + "echo {} > {}".format(const.TRAINING_COMPLETED, completed_marked_dir) + in cmd_lind + ): main_pid = pid pids.add(pid) @@ -92,16 +98,24 @@ def WaitPIDs(pids, main_pid, pool_interval, timout, wait_all, completed_marked_d path = "/proc/{}".format(pid) if not os.path.exists(path): if pid == main_pid: - # For main_pid we check if file with "completed" marker exists if completed_marked_dir is set + # For main_pid we check if file with "completed" + # marker exists if completed_marked_dir is set if completed_marked_dir: - mark_file = os.path.join(completed_marked_dir, "{}.pid".format(pid)) + mark_file = os.path.join( + completed_marked_dir, "{}.pid".format(pid) + ) # Check if file contains "completed" marker with open(mark_file) as file_obj: contents = file_obj.read() if contents.strip() != const.TRAINING_COMPLETED: raise Exception( "Unable to find marker: {} in file: {} with contents: {} for pid: {}".format( - const.TRAINING_COMPLETED, mark_file, contents, pid)) + const.TRAINING_COMPLETED, + mark_file, + contents, + pid, + ) + ) # Add main pid to finished pids set finished_pids.add(pid) # Exit loop if wait all is false because main pid is finished diff --git a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py index 931014d1c15..9ba71e3fc67 100644 --- a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py +++ b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py @@ -21,19 +21,18 @@ # Check TFJob example for more information: # https://github.com/kubeflow/katib/blob/master/examples/v1beta1/kubeflow-training-operator/tfjob-mnist-with-summaries.yaml#L16-L22 -from datetime import datetime -from logging import getLogger -from logging import INFO -from logging import StreamHandler import os +from datetime import datetime +from logging import INFO, StreamHandler, getLogger import api_pb2 import rfc3339 -from tensorboard.backend.event_processing.event_accumulator import \ - EventAccumulator -from tensorboard.backend.event_processing.event_accumulator import TensorEvent -from tensorboard.backend.event_processing.tag_types import TENSORS import tensorflow as tf +from tensorboard.backend.event_processing.event_accumulator import ( + EventAccumulator, + TensorEvent, +) +from tensorboard.backend.event_processing.tag_types import TENSORS from pkg.metricscollector.v1beta1.common import const @@ -55,18 +54,25 @@ def parse_summary(self, tfefile): for tag in event_accumulator.Tags()[TENSORS]: for m in self.metric_names: - tfefile_parent_dir = os.path.dirname(m) if len(m.split("/")) >= 2 else os.path.dirname(tfefile) + tfefile_parent_dir = ( + os.path.dirname(m) + if len(m.split("/")) >= 2 + else os.path.dirname(tfefile) + ) basedir_name = os.path.dirname(tfefile) - if not tag.startswith(m.split("/")[-1]) or not basedir_name.endswith(tfefile_parent_dir): + if not tag.startswith(m.split("/")[-1]) or not basedir_name.endswith( + tfefile_parent_dir + ): continue for tensor in event_accumulator.Tensors(tag): ml = api_pb2.MetricLog( - time_stamp=rfc3339.rfc3339(datetime.fromtimestamp(tensor.wall_time)), + time_stamp=rfc3339.rfc3339( + datetime.fromtimestamp(tensor.wall_time) + ), metric=api_pb2.Metric( - name=m, - value=str(tf.make_ndarray(tensor.tensor_proto)) - ) + name=m, value=str(tf.make_ndarray(tensor.tensor_proto)) + ), ) metric_logs.append(ml) @@ -109,12 +115,14 @@ def parse_file(self, directory): api_pb2.MetricLog( time_stamp=rfc3339.rfc3339(datetime.now()), metric=api_pb2.Metric( - name=self.metrics[0], - value=const.UNAVAILABLE_METRIC_VALUE - ) + name=self.metrics[0], value=const.UNAVAILABLE_METRIC_VALUE + ), ) ] - self.logger.info("Objective metric {} is not found in training logs, {} value is reported".format( - self.metrics[0], const.UNAVAILABLE_METRIC_VALUE)) + self.logger.info( + "Objective metric {} is not found in training logs, {} value is reported".format( + self.metrics[0], const.UNAVAILABLE_METRIC_VALUE + ) + ) return api_pb2.ObservationLog(metric_logs=mls) diff --git a/pkg/suggestion/v1beta1/hyperband/parameter.py b/pkg/suggestion/v1beta1/hyperband/parameter.py index b6713ab748f..89a58318b7e 100644 --- a/pkg/suggestion/v1beta1/hyperband/parameter.py +++ b/pkg/suggestion/v1beta1/hyperband/parameter.py @@ -42,8 +42,17 @@ class ParameterConfig: {"name": "cat_param", "values": ["true", "false"], "number": 2}. """ - def __init__(self, name_ids, dim, lower_bounds, upper_bounds, - parameter_types, names, discrete_info, categorical_info): + def __init__( + self, + name_ids, + dim, + lower_bounds, + upper_bounds, + parameter_types, + names, + discrete_info, + categorical_info, + ): self.name_ids = name_ids self.dim = dim self.lower_bounds = np.array(lower_bounds).reshape((1, dim)) @@ -62,6 +71,7 @@ def create_scaler(self): return scaler def random_sample(self): - new_sample = np.random.uniform(self.lower_bounds, self.upper_bounds, - size=(1, self.dim)) + new_sample = np.random.uniform( + self.lower_bounds, self.upper_bounds, size=(1, self.dim) + ) return new_sample diff --git a/pkg/suggestion/v1beta1/hyperband/parsing_util.py b/pkg/suggestion/v1beta1/hyperband/parsing_util.py index 4136fe7246b..81860a7fbe3 100644 --- a/pkg/suggestion/v1beta1/hyperband/parsing_util.py +++ b/pkg/suggestion/v1beta1/hyperband/parsing_util.py @@ -26,14 +26,14 @@ def _deal_with_discrete(feasible_values, current_value): - """ function to embed the current values to the feasible discrete space""" + """function to embed the current values to the feasible discrete space""" diff = np.subtract(feasible_values, current_value) diff = np.absolute(diff) return feasible_values[np.argmin(diff)] def _deal_with_categorical(feasible_values, one_hot_values): - """ function to do the one hot encoding of the categorical values """ + """function to do the one hot encoding of the categorical values""" index = np.argmax(one_hot_values) return feasible_values[int(index)] @@ -61,17 +61,18 @@ def parse_parameter_configs(parameter_configs): discrete_values = [int(x) for x in param.feasible_space.list] new_lower = min(discrete_values) new_upper = max(discrete_values) - discrete_info.append( - {"name": param.name, "values": discrete_values}) + discrete_info.append({"name": param.name, "values": discrete_values}) elif param.parameter_type == api_pb2.CATEGORICAL: num_feasible = len(param.feasible_space.list) new_lower = [0 for _ in range(num_feasible)] new_upper = [1 for _ in range(num_feasible)] - categorical_info.append({ - "name": param.name, - "values": param.feasible_space.list, - "number": num_feasible, - }) + categorical_info.append( + { + "name": param.name, + "values": param.feasible_space.list, + "number": num_feasible, + } + ) if isinstance(new_lower, Iterable): # handles categorical parameters lower_bounds.extend(new_lower) upper_bounds.extend(new_upper) @@ -80,14 +81,16 @@ def parse_parameter_configs(parameter_configs): lower_bounds.append(new_lower) upper_bounds.append(new_upper) dim += 1 - parsed_config = ParameterConfig(name_ids, - dim, - lower_bounds, - upper_bounds, - parameter_types, - names, - discrete_info, - categorical_info) + parsed_config = ParameterConfig( + name_ids, + dim, + lower_bounds, + upper_bounds, + parameter_types, + names, + discrete_info, + categorical_info, + ) return parsed_config @@ -97,8 +100,7 @@ def parse_previous_observations(parameters_list, dim, name_id, types, categorica offset = 0 for p in parameters: map_id = name_id[p.name] - if types[map_id] in [api_pb2.DOUBLE, api_pb2.INT, - api_pb2.DISCRETE]: + if types[map_id] in [api_pb2.DOUBLE, api_pb2.INT, api_pb2.DISCRETE]: parsed_X[row_idx, offset] = float(p.value) offset += 1 elif types[map_id] == api_pb2.CATEGORICAL: @@ -120,8 +122,10 @@ def parse_metric(y_train, goal): return y_array -def parse_x_next_vector(x_next, param_types, param_names, discrete_info, categorical_info): - """ parse the next suggestion to the proper format """ +def parse_x_next_vector( + x_next, param_types, param_names, discrete_info, categorical_info +): + """parse the next suggestion to the proper format""" counter = 0 result = [] if isinstance(x_next, np.ndarray): @@ -136,8 +140,7 @@ def parse_x_next_vector(x_next, param_types, param_names, discrete_info, categor elif par_type == api_pb2.DISCRETE: for param in discrete_info: if param["name"] == par_name: - value = _deal_with_discrete(param["values"], - x_next[counter]) + value = _deal_with_discrete(param["values"], x_next[counter]) counter = counter + 1 break elif par_type == api_pb2.CATEGORICAL: @@ -145,7 +148,7 @@ def parse_x_next_vector(x_next, param_types, param_names, discrete_info, categor if param["name"] == par_name: value = _deal_with_categorical( feasible_values=param["values"], - one_hot_values=x_next[counter:counter + param["number"]], + one_hot_values=x_next[counter : counter + param["number"]], ) counter = counter + param["number"] break diff --git a/pkg/suggestion/v1beta1/hyperband/service.py b/pkg/suggestion/v1beta1/hyperband/service.py index 50919826d0a..e69a15865f0 100644 --- a/pkg/suggestion/v1beta1/hyperband/service.py +++ b/pkg/suggestion/v1beta1/hyperband/service.py @@ -13,21 +13,18 @@ # limitations under the License. import logging -from logging import DEBUG -from logging import getLogger -from logging import StreamHandler import math import traceback +from logging import DEBUG, StreamHandler, getLogger import grpc -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc from pkg.suggestion.v1beta1.hyperband import parsing_util from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer logger = getLogger(__name__) -FORMAT = '%(asctime)-15s Experiment %(experiment_name)s %(message)s' +FORMAT = "%(asctime)-15s Experiment %(experiment_name)s %(message)s" logging.basicConfig(format=FORMAT) handler = StreamHandler() handler.setLevel(DEBUG) @@ -55,12 +52,17 @@ def GetSuggestions(self, request, context): trials = self._make_bracket(experiment, param) for trial in trials: - reply.parameter_assignments.add(assignments=trial.parameter_assignments.assignments) + reply.parameter_assignments.add( + assignments=trial.parameter_assignments.assignments + ) reply.algorithm.CopyFrom(HyperBandParam.generate(param)) return reply except Exception as e: - logger.error("Fail to generate trials: \n%s", - traceback.format_exc(), extra={"experiment_name": experiment.name}) + logger.error( + "Fail to generate trials: \n%s", + traceback.format_exc(), + extra={"experiment_name": experiment.name}, + ) raise e def _update_hbParameters(self, param): @@ -73,10 +75,13 @@ def _new_hbParameters(self, param): param.current_i = 0 if param.current_s >= 0: # when param.current_s < 0, hyperband algorithm reaches the end - param.n = int(math.ceil(float(param.s_max + 1) * ( - float(param.eta**param.current_s) / float(param.current_s+1)))) - param.r = param.r_l * \ - param.eta**(-param.current_s) + param.n = int( + math.ceil( + float(param.s_max + 1) + * (float(param.eta**param.current_s) / float(param.current_s + 1)) + ) + ) + param.r = param.r_l * param.eta ** (-param.current_s) def _make_bracket(self, experiment, param): if param.evaluating_trials == 0: @@ -88,48 +93,76 @@ def _make_bracket(self, experiment, param): else: param.evaluating_trials = 0 - logger.info("HyperBand Param eta %d.", - param.eta, extra={"experiment_name": experiment.name}) - logger.info("HyperBand Param R %d.", - param.r_l, extra={"experiment_name": experiment.name}) - logger.info("HyperBand Param sMax %d.", - param.s_max, extra={"experiment_name": experiment.name}) - logger.info("HyperBand Param B %d.", - param.b_l, extra={"experiment_name": experiment.name}) - logger.info("HyperBand Param n %d.", - param.n, extra={"experiment_name": experiment.name}) - logger.info("HyperBand Param r %d.", - param.r, extra={"experiment_name": experiment.name}) - logger.info("HyperBand Param s %d.", - param.current_s, extra={"experiment_name": experiment.name}) - logger.info("HyperBand Param i %d.", - param.current_i, extra={"experiment_name": experiment.name}) - logger.info("HyperBand evaluating trials count %d.", - param.evaluating_trials, extra={"experiment_name": experiment.name}) - logger.info("HyperBand budget resource name %s.", - param.resource_name, extra={"experiment_name": experiment.name}) + logger.info( + "HyperBand Param eta %d.", + param.eta, + extra={"experiment_name": experiment.name}, + ) + logger.info( + "HyperBand Param R %d.", + param.r_l, + extra={"experiment_name": experiment.name}, + ) + logger.info( + "HyperBand Param sMax %d.", + param.s_max, + extra={"experiment_name": experiment.name}, + ) + logger.info( + "HyperBand Param B %d.", + param.b_l, + extra={"experiment_name": experiment.name}, + ) + logger.info( + "HyperBand Param n %d.", param.n, extra={"experiment_name": experiment.name} + ) + logger.info( + "HyperBand Param r %d.", param.r, extra={"experiment_name": experiment.name} + ) + logger.info( + "HyperBand Param s %d.", + param.current_s, + extra={"experiment_name": experiment.name}, + ) + logger.info( + "HyperBand Param i %d.", + param.current_i, + extra={"experiment_name": experiment.name}, + ) + logger.info( + "HyperBand evaluating trials count %d.", + param.evaluating_trials, + extra={"experiment_name": experiment.name}, + ) + logger.info( + "HyperBand budget resource name %s.", + param.resource_name, + extra={"experiment_name": experiment.name}, + ) if param.evaluating_trials == 0: self._new_hbParameters(param) return trialSpecs def _make_child_bracket(self, experiment, param): - n_i = math.ceil(param.n * param.eta**(-param.current_i)) + n_i = math.ceil(param.n * param.eta ** (-param.current_i)) top_trials_num = int(math.ceil(n_i / param.eta)) self._update_hbParameters(param) r_i = int(param.r * param.eta**param.current_i) last_trials = self._get_top_trial( - param.evaluating_trials, top_trials_num, experiment) - trialSpecs = self._copy_trials( - last_trials, r_i, param.resource_name) + param.evaluating_trials, top_trials_num, experiment + ) + trialSpecs = self._copy_trials(last_trials, r_i, param.resource_name) - logger.info("Generate %d trials by child bracket.", - top_trials_num, extra={"experiment_name": experiment.name}) + logger.info( + "Generate %d trials by child bracket.", + top_trials_num, + extra={"experiment_name": experiment.name}, + ) return trialSpecs def _get_last_trials(self, all_trials, latest_trials_num): - sorted_trials = sorted( - all_trials, key=lambda trial: trial.status.start_time) + sorted_trials = sorted(all_trials, key=lambda trial: trial.status.start_time) if len(sorted_trials) > latest_trials_num: return sorted_trials[-latest_trials_num:] else: @@ -151,11 +184,14 @@ def get_objective_value(t): for t in latest_trials: if t.status.condition != api_pb2.TrialStatus.TrialConditionType.SUCCEEDED: raise Exception( - "There are some trials which are not completed yet for experiment %s." % experiment.name) + "There are some trials which are not completed yet for experiment %s." + % experiment.name + ) if objective_type == api_pb2.MAXIMIZE: top_trials.extend( - sorted(latest_trials, key=get_objective_value, reverse=True)) + sorted(latest_trials, key=get_objective_value, reverse=True) + ) else: top_trials.extend(sorted(latest_trials, key=get_objective_value)) return top_trials[:top_trials_num] @@ -169,8 +205,9 @@ def _copy_trials(self, trials, r_i, resourceName): value = str(r_i) else: value = assignment.value - trial_spec.parameter_assignments.assignments.add(name=assignment.name, - value=value) + trial_spec.parameter_assignments.assignments.add( + name=assignment.name, value=value + ) trialSpecs.append(trial_spec) return trialSpecs @@ -178,7 +215,8 @@ def _make_master_bracket(self, experiment, param): n = param.n r = int(param.r) parameter_config = parsing_util.parse_parameter_configs( - experiment.spec.parameter_specs.parameters) + experiment.spec.parameter_specs.parameters + ) trial_specs = [] for _ in range(n): sample = parameter_config.random_sample() @@ -187,16 +225,21 @@ def _make_master_bracket(self, experiment, param): parameter_config.parameter_types, parameter_config.names, parameter_config.discrete_info, - parameter_config.categorical_info) + parameter_config.categorical_info, + ) trial_spec = api_pb2.TrialSpec() for hp in suggestion: - if hp['name'] == param.resource_name: - hp['value'] = str(r) - trial_spec.parameter_assignments.assignments.add(name=hp['name'], - value=str(hp['value'])) + if hp["name"] == param.resource_name: + hp["value"] = str(r) + trial_spec.parameter_assignments.assignments.add( + name=hp["name"], value=str(hp["value"]) + ) trial_specs.append(trial_spec) - logger.info("Generate %d trials by master bracket.", - n, extra={"experiment_name": experiment.name}) + logger.info( + "Generate %d trials by master bracket.", + n, + extra={"experiment_name": experiment.name}, + ) return trial_specs def _set_validate_context_error(self, context, error_message): @@ -212,14 +255,20 @@ def ValidateAlgorithmSettings(self, request, context): for setting in settings: setting_dict[setting.name] = setting.value if "r_l" not in setting_dict or "resource_name" not in setting_dict: - return self._set_validate_context_error(context, "r_l and resource_name must be set.") + return self._set_validate_context_error( + context, "r_l and resource_name must be set." + ) try: rl = float(setting_dict["r_l"]) except Exception: - return self._set_validate_context_error(context, "r_l must be a positive float number.") + return self._set_validate_context_error( + context, "r_l must be a positive float number." + ) else: if rl < 0: - return self._set_validate_context_error(context, "r_l must be a positive float number.") + return self._set_validate_context_error( + context, "r_l must be a positive float number." + ) if "eta" in setting_dict: eta = int(float(setting_dict["eta"])) @@ -228,11 +277,12 @@ def ValidateAlgorithmSettings(self, request, context): else: eta = 3 - smax = int(math.log(rl)/math.log(eta)) + smax = int(math.log(rl) / math.log(eta)) max_parallel = int(math.ceil(eta**smax)) if request.experiment.spec.parallel_trial_count < max_parallel: - return self._set_validate_context_error(context, - "parallelTrialCount must be not less than %d." % max_parallel) + return self._set_validate_context_error( + context, "parallelTrialCount must be not less than %d." % max_parallel + ) valid_resourceName = False for param in params: @@ -240,17 +290,27 @@ def ValidateAlgorithmSettings(self, request, context): valid_resourceName = True break if not valid_resourceName: - return self._set_validate_context_error(context, - "value of resource_name setting must be in parameters.") + return self._set_validate_context_error( + context, "value of resource_name setting must be in parameters." + ) return api_pb2.ValidateAlgorithmSettingsReply() class HyperBandParam(object): - def __init__(self, eta=3, s_max=-1, r_l=-1, - b_l=-1, r=-1, n=-1, current_s=-2, - current_i=-1, resource_name="", - evaluating_trials=0): + def __init__( + self, + eta=3, + s_max=-1, + r_l=-1, + b_l=-1, + r=-1, + n=-1, + current_s=-2, + current_i=-1, + resource_name="", + evaluating_trials=0, + ): self.eta = eta self.s_max = s_max self.r_l = r_l @@ -265,45 +325,24 @@ def __init__(self, eta=3, s_max=-1, r_l=-1, @staticmethod def generate(param): algorithm_settings = [ + api_pb2.AlgorithmSetting(name="eta", value=str(param.eta)), + api_pb2.AlgorithmSetting(name="s_max", value=str(param.s_max)), + api_pb2.AlgorithmSetting(name="r_l", value=str(param.r_l)), + api_pb2.AlgorithmSetting(name="b_l", value=str(param.b_l)), + api_pb2.AlgorithmSetting(name="r", value=str(param.r)), + api_pb2.AlgorithmSetting(name="n", value=str(param.n)), + api_pb2.AlgorithmSetting(name="current_s", value=str(param.current_s)), + api_pb2.AlgorithmSetting(name="current_i", value=str(param.current_i)), + api_pb2.AlgorithmSetting(name="resource_name", value=param.resource_name), api_pb2.AlgorithmSetting( - name="eta", - value=str(param.eta) - ), api_pb2.AlgorithmSetting( - name="s_max", - value=str(param.s_max) - ), api_pb2.AlgorithmSetting( - name="r_l", - value=str(param.r_l) - ), api_pb2.AlgorithmSetting( - name="b_l", - value=str(param.b_l) - ), api_pb2.AlgorithmSetting( - name="r", - value=str(param.r) - ), api_pb2.AlgorithmSetting( - name="n", - value=str(param.n) - ), api_pb2.AlgorithmSetting( - name="current_s", - value=str(param.current_s) - ), api_pb2.AlgorithmSetting( - name="current_i", - value=str(param.current_i) - ), api_pb2.AlgorithmSetting( - name="resource_name", - value=param.resource_name - ), api_pb2.AlgorithmSetting( - name="evaluating_trials", - value=str(param.evaluating_trials) - )] - return api_pb2.AlgorithmSpec( - algorithm_settings=algorithm_settings - ) + name="evaluating_trials", value=str(param.evaluating_trials) + ), + ] + return api_pb2.AlgorithmSpec(algorithm_settings=algorithm_settings) @staticmethod def convert(alg_settings): - """Convert the algorithm settings to HyperBandParam. - """ + """Convert the algorithm settings to HyperBandParam.""" param = HyperBandParam() # Set the param from the algorithm settings. for setting in alg_settings: @@ -328,8 +367,7 @@ def convert(alg_settings): elif setting.name == "resource_name": param.resource_name = setting.value else: - logger.info( - "Unknown HyperBand Param %s, ignore it", setting.name) + logger.info("Unknown HyperBand Param %s, ignore it", setting.name) if param.current_s == -1: # Hyperband outlerloop has finished logger.info("HyperBand outlerloop has finished.") @@ -339,8 +377,7 @@ def convert(alg_settings): if param.eta <= 0: param.eta = 3 if param.s_max < 0: - param.s_max = int( - math.log(param.r_l) / math.log(param.eta)) + param.s_max = int(math.log(param.r_l) / math.log(param.eta)) if param.b_l < 0: param.b_l = (param.s_max + 1) * param.r_l if param.current_s < 0: @@ -348,10 +385,13 @@ def convert(alg_settings): if param.current_i < 0: param.current_i = 0 if param.n < 0: - param.n = int(math.ceil(float(param.s_max + 1) * ( - float(param.eta**param.current_s) / float(param.current_s+1)))) + param.n = int( + math.ceil( + float(param.s_max + 1) + * (float(param.eta**param.current_s) / float(param.current_s + 1)) + ) + ) if param.r < 0: - param.r = param.r_l * \ - param.eta**(-param.current_s) + param.r = param.r_l * param.eta ** (-param.current_s) return param diff --git a/pkg/suggestion/v1beta1/hyperopt/base_service.py b/pkg/suggestion/v1beta1/hyperopt/base_service.py index 894a98637bf..82bc9a4a1e2 100644 --- a/pkg/suggestion/v1beta1/hyperopt/base_service.py +++ b/pkg/suggestion/v1beta1/hyperopt/base_service.py @@ -17,11 +17,13 @@ import hyperopt import numpy as np -from pkg.suggestion.v1beta1.internal.constant import CATEGORICAL -from pkg.suggestion.v1beta1.internal.constant import DISCRETE -from pkg.suggestion.v1beta1.internal.constant import DOUBLE -from pkg.suggestion.v1beta1.internal.constant import INTEGER -from pkg.suggestion.v1beta1.internal.constant import MAX_GOAL +from pkg.suggestion.v1beta1.internal.constant import ( + CATEGORICAL, + DISCRETE, + DOUBLE, + INTEGER, + MAX_GOAL, +) from pkg.suggestion.v1beta1.internal.trial import Assignment logger = logging.getLogger(__name__) @@ -31,14 +33,13 @@ class BaseHyperoptService(object): - def __init__(self, - algorithm_name=TPE_ALGORITHM_NAME, - algorithm_conf=None, - search_space=None): + def __init__( + self, algorithm_name=TPE_ALGORITHM_NAME, algorithm_conf=None, search_space=None + ): self.algorithm_name = algorithm_name self.algorithm_conf = algorithm_conf or {} # pop common configurations - random_state = self.algorithm_conf.pop('random_state', None) + random_state = self.algorithm_conf.pop("random_state", None) if self.algorithm_name == TPE_ALGORITHM_NAME: self.hyperopt_algorithm = hyperopt.tpe.suggest @@ -62,21 +63,20 @@ def create_hyperopt_domain(self): for param in self.search_space.params: if param.type == INTEGER: hyperopt_search_space[param.name] = hyperopt.hp.quniform( - param.name, - float(param.min), - float(param.max), - float(param.step)) + param.name, float(param.min), float(param.max), float(param.step) + ) elif param.type == DOUBLE: hyperopt_search_space[param.name] = hyperopt.hp.uniform( - param.name, - float(param.min), - float(param.max)) + param.name, float(param.min), float(param.max) + ) elif param.type == CATEGORICAL or param.type == DISCRETE: hyperopt_search_space[param.name] = hyperopt.hp.choice( - param.name, param.list) + param.name, param.list + ) self.hyperopt_domain = hyperopt.Domain( - None, hyperopt_search_space, pass_expr_memo_ctrl=None) + None, hyperopt_search_space, pass_expr_memo_ctrl=None + ) def create_fmin(self): self.fmin = hyperopt.FMinIter( @@ -85,7 +85,8 @@ def create_fmin(self): trials=hyperopt.Trials(), max_evals=-1, rstate=self.hyperopt_rstate, - verbose=False) + verbose=False, + ) self.fmin.catch_eval_exceptions = False @@ -112,7 +113,10 @@ def getSuggestions(self, trials, current_request_number): # Insert Trial assignment to the misc hyperopt_trial_misc = dict( - tid=new_id[0], cmd=self.hyperopt_domain.cmd, workdir=self.hyperopt_domain.workdir) + tid=new_id[0], + cmd=self.hyperopt_domain.cmd, + workdir=self.hyperopt_domain.workdir, + ) for param in self.search_space.params: parameter_value = None for assignment in trial.assignments: @@ -135,9 +139,7 @@ def getSuggestions(self, trials, current_request_number): hyperopt_trial_miscs.append(hyperopt_trial_misc) # Insert Trial name to the spec - hyperopt_trial_spec = { - "trial-name": trial.name - } + hyperopt_trial_spec = {"trial-name": trial.name} hyperopt_trial_specs.append(hyperopt_trial_spec) # Insert Trial result to the result @@ -149,7 +151,7 @@ def getSuggestions(self, trials, current_request_number): objective_for_hyperopt = -1 * objective_for_hyperopt hyperopt_trial_result = { "loss": objective_for_hyperopt, - "status": hyperopt.STATUS_OK + "status": hyperopt.STATUS_OK, } hyperopt_trial_results.append(hyperopt_trial_result) @@ -160,7 +162,8 @@ def getSuggestions(self, trials, current_request_number): tids=hyperopt_trial_new_ids, specs=hyperopt_trial_specs, results=hyperopt_trial_results, - miscs=hyperopt_trial_miscs) + miscs=hyperopt_trial_miscs, + ) for i, _ in enumerate(hyperopt_trials): hyperopt_trials[i]["state"] = hyperopt.JOB_STATE_DONE @@ -208,7 +211,8 @@ def getSuggestions(self, trials, current_request_number): new_ids=hyperopt_trial_new_ids, domain=self.fmin.domain, trials=self.fmin.trials, - seed=random_state) + seed=random_state, + ) elif self.algorithm_name == TPE_ALGORITHM_NAME: # n_startup_jobs indicates for how many Trials we run random suggestion # This must be current_request_number value @@ -222,24 +226,30 @@ def getSuggestions(self, trials, current_request_number): trials=self.fmin.trials, seed=random_state, n_startup_jobs=current_request_number, - **self.algorithm_conf) + **self.algorithm_conf + ) self.is_first_run = False else: for i in range(current_request_number): # hyperopt_algorithm always returns one new Trial - new_trials.append(self.hyperopt_algorithm( - new_ids=[hyperopt_trial_new_ids[i]], - domain=self.fmin.domain, - trials=self.fmin.trials, - seed=random_state, - n_startup_jobs=current_request_number, - **self.algorithm_conf)[0]) + new_trials.append( + self.hyperopt_algorithm( + new_ids=[hyperopt_trial_new_ids[i]], + domain=self.fmin.domain, + trials=self.fmin.trials, + seed=random_state, + n_startup_jobs=current_request_number, + **self.algorithm_conf + )[0] + ) # Construct return advisor Trials from new hyperopt Trials list_of_assignments = [] for trial in new_trials: - vals = trial['misc']['vals'] - list_of_assignments.append(BaseHyperoptService.convert(self.search_space, vals)) + vals = trial["misc"]["vals"] + list_of_assignments.append( + BaseHyperoptService.convert(self.search_space, vals) + ) if len(list_of_assignments) > 0: logger.info("GetSuggestions returns {} new Trial\n".format(len(new_trials))) @@ -256,5 +266,6 @@ def convert(search_space, vals): assignments.append(Assignment(param.name, vals[param.name][0])) elif param.type == CATEGORICAL or param.type == DISCRETE: assignments.append( - Assignment(param.name, param.list[vals[param.name][0]])) + Assignment(param.name, param.list[vals[param.name][0]]) + ) return assignments diff --git a/pkg/suggestion/v1beta1/hyperopt/service.py b/pkg/suggestion/v1beta1/hyperopt/service.py index d3a9cdfe7c5..ab837173375 100644 --- a/pkg/suggestion/v1beta1/hyperopt/service.py +++ b/pkg/suggestion/v1beta1/hyperopt/service.py @@ -16,14 +16,11 @@ import grpc -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc from pkg.suggestion.v1beta1.hyperopt.base_service import BaseHyperoptService from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer -from pkg.suggestion.v1beta1.internal.search_space import \ - HyperParameterSearchSpace -from pkg.suggestion.v1beta1.internal.trial import Assignment -from pkg.suggestion.v1beta1.internal.trial import Trial +from pkg.suggestion.v1beta1.internal.search_space import HyperParameterSearchSpace +from pkg.suggestion.v1beta1.internal.trial import Assignment, Trial logger = logging.getLogger(__name__) @@ -40,25 +37,28 @@ def GetSuggestions(self, request, context): Main function to provide suggestion. """ name, config = OptimizerConfiguration.convert_algorithm_spec( - request.experiment.spec.algorithm) + request.experiment.spec.algorithm + ) if self.is_first_run: search_space = HyperParameterSearchSpace.convert(request.experiment) self.base_service = BaseHyperoptService( - algorithm_name=name, - algorithm_conf=config, - search_space=search_space) + algorithm_name=name, algorithm_conf=config, search_space=search_space + ) self.is_first_run = False trials = Trial.convert(request.trials) - new_assignments = self.base_service.getSuggestions(trials, request.current_request_number) + new_assignments = self.base_service.getSuggestions( + trials, request.current_request_number + ) return api_pb2.GetSuggestionsReply( parameter_assignments=Assignment.generate(new_assignments) ) def ValidateAlgorithmSettings(self, request, context): is_valid, message = OptimizerConfiguration.validate_algorithm_spec( - request.experiment.spec.algorithm) + request.experiment.spec.algorithm + ) if not is_valid: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_details(message) @@ -68,15 +68,15 @@ def ValidateAlgorithmSettings(self, request, context): class OptimizerConfiguration: __conversion_dict = { - 'tpe': { - 'gamma': lambda x: float(x), - 'prior_weight': lambda x: float(x), - 'n_EI_candidates': lambda x: int(x), + "tpe": { + "gamma": lambda x: float(x), + "prior_weight": lambda x: float(x), + "n_EI_candidates": lambda x: int(x), "random_state": lambda x: int(x), }, "random": { "random_state": lambda x: int(x), - } + }, } @classmethod @@ -92,9 +92,9 @@ def convert_algorithm_spec(cls, algorithm_spec): @classmethod def validate_algorithm_spec(cls, algorithm_spec): algo_name = algorithm_spec.algorithm_name - if algo_name == 'tpe': + if algo_name == "tpe": return cls._validate_tpe_setting(algorithm_spec.algorithm_settings) - elif algo_name == 'random': + elif algo_name == "random": return cls._validate_random_setting(algorithm_spec.algorithm_settings) else: return False, "unknown algorithm name {}".format(algo_name) @@ -103,23 +103,24 @@ def validate_algorithm_spec(cls, algorithm_spec): def _validate_tpe_setting(cls, algorithm_settings): for s in algorithm_settings: try: - if s.name == 'gamma': + if s.name == "gamma": if not 1 > float(s.value) > 0: return False, "gamma should be in the range of (0, 1)" - elif s.name == 'prior_weight': + elif s.name == "prior_weight": if not float(s.value) > 0: return False, "prior_weight should be great than zero" - elif s.name == 'n_EI_candidates': + elif s.name == "n_EI_candidates": if not int(s.value) > 0: return False, "n_EI_candidates should be great than zero" - elif s.name == 'random_state': + elif s.name == "random_state": if not int(s.value) >= 0: return False, "random_state should be great or equal than zero" else: return False, "unknown setting {} for algorithm tpe".format(s.name) except Exception as e: return False, "failed to validate {name}({value}): {exception}".format( - name=s.name, value=s.value, exception=e) + name=s.name, value=s.value, exception=e + ) return True, "" @@ -127,13 +128,16 @@ def _validate_tpe_setting(cls, algorithm_settings): def _validate_random_setting(cls, algorithm_settings): for s in algorithm_settings: try: - if s.name == 'random_state': + if s.name == "random_state": if not (int(s.value) >= 0): return False, "random_state should be great or equal than zero" else: - return False, "unknown setting {} for algorithm random".format(s.name) + return False, "unknown setting {} for algorithm random".format( + s.name + ) except Exception as e: return False, "failed to validate {name}({value}): {exception}".format( - name=s.name, value=s.value, exception=e) + name=s.name, value=s.value, exception=e + ) return True, "" diff --git a/pkg/suggestion/v1beta1/internal/base_health_service.py b/pkg/suggestion/v1beta1/internal/base_health_service.py index 3660c884ed8..13950975b66 100644 --- a/pkg/suggestion/v1beta1/internal/base_health_service.py +++ b/pkg/suggestion/v1beta1/internal/base_health_service.py @@ -22,7 +22,7 @@ from pkg.apis.manager.health.python import health_pb2 as _health_pb2 from pkg.apis.manager.health.python import health_pb2_grpc as _health_pb2_grpc -SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name +SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name["Health"].full_name class _Watcher: @@ -74,9 +74,7 @@ def send_response_callback(response): class HealthServicer(_health_pb2_grpc.HealthServicer): """Servicer handling RPCs for service statuses.""" - def __init__(self, - experimental_non_blocking=True, - experimental_thread_pool=None): + def __init__(self, experimental_non_blocking=True, experimental_thread_pool=None): self._lock = threading.RLock() self._server_status = {} self._send_response_callbacks = {} @@ -89,8 +87,7 @@ def _on_close_callback(self, send_response_callback, service): def callback(): with self._lock: - self._send_response_callbacks[service].remove( - send_response_callback) + self._send_response_callbacks[service].remove(send_response_callback) send_response_callback(None) return callback @@ -114,19 +111,22 @@ def Watch(self, request, context, send_response_callback=None): # generator. blocking_watcher = _Watcher() send_response_callback = _watcher_to_send_response_callback_adapter( - blocking_watcher) + blocking_watcher + ) service = request.service with self._lock: status = self._server_status.get(service) if status is None: - status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member - send_response_callback( - _health_pb2.HealthCheckResponse(status=status)) + status = ( + _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN + ) # pylint: disable=no-member + send_response_callback(_health_pb2.HealthCheckResponse(status=status)) if service not in self._send_response_callbacks: self._send_response_callbacks[service] = set() self._send_response_callbacks[service].add(send_response_callback) context.add_callback( - self._on_close_callback(send_response_callback, service)) + self._on_close_callback(send_response_callback, service) + ) return blocking_watcher def set(self, service, status): @@ -144,9 +144,11 @@ def set(self, service, status): self._server_status[service] = status if service in self._send_response_callbacks: for send_response_callback in self._send_response_callbacks[ - service]: + service + ]: send_response_callback( - _health_pb2.HealthCheckResponse(status=status)) + _health_pb2.HealthCheckResponse(status=status) + ) def enter_graceful_shutdown(self): """Permanently sets the status of all services to NOT_SERVING. @@ -162,6 +164,7 @@ def enter_graceful_shutdown(self): return else: for service in self._server_status: - self.set(service, - _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member + self.set( + service, _health_pb2.HealthCheckResponse.NOT_SERVING + ) # pylint: disable=no-member self._gracefully_shutting_down = True diff --git a/pkg/suggestion/v1beta1/internal/search_space.py b/pkg/suggestion/v1beta1/internal/search_space.py index d50955810e5..6698a160df3 100644 --- a/pkg/suggestion/v1beta1/internal/search_space.py +++ b/pkg/suggestion/v1beta1/internal/search_space.py @@ -16,12 +16,14 @@ import numpy as np -from pkg.apis.manager.v1beta1.python import api_pb2 as api -from pkg.suggestion.v1beta1.internal.constant import CATEGORICAL -from pkg.suggestion.v1beta1.internal.constant import DISCRETE -from pkg.suggestion.v1beta1.internal.constant import DOUBLE -from pkg.suggestion.v1beta1.internal.constant import INTEGER import pkg.suggestion.v1beta1.internal.constant as constant +from pkg.apis.manager.v1beta1.python import api_pb2 as api +from pkg.suggestion.v1beta1.internal.constant import ( + CATEGORICAL, + DISCRETE, + DOUBLE, + INTEGER, +) logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @@ -40,8 +42,7 @@ def convert(experiment): elif experiment.spec.objective.type == api.MINIMIZE: search_space.goal = constant.MIN_GOAL for p in experiment.spec.parameter_specs.parameters: - search_space.params.append( - HyperParameterSearchSpace.convert_parameter(p)) + search_space.params.append(HyperParameterSearchSpace.convert_parameter(p)) return search_space @staticmethod @@ -50,15 +51,21 @@ def convert_to_combinations(search_space): for parameter in search_space.params: if parameter.type == INTEGER: - combinations[parameter.name] = range(int(parameter.min), int(parameter.max)+1, int(parameter.step)) + combinations[parameter.name] = range( + int(parameter.min), int(parameter.max) + 1, int(parameter.step) + ) elif parameter.type == DOUBLE: if parameter.step == "" or parameter.step is None: raise Exception( - "Param {} step is nil; For discrete search space, all parameters must include step". - format(parameter.name) + "Param {} step is nil; For discrete search space, all parameters must include step".format( + parameter.name + ) ) - double_list = np.arange(float(parameter.min), float(parameter.max)+float(parameter.step), - float(parameter.step)) + double_list = np.arange( + float(parameter.min), + float(parameter.max) + float(parameter.step), + float(parameter.step), + ) if double_list[-1] > float(parameter.max): double_list = double_list[:-1] combinations[parameter.name] = double_list @@ -68,8 +75,11 @@ def convert_to_combinations(search_space): return combinations def __str__(self): - return "HyperParameterSearchSpace(goal: {}, ".format(self.goal) + \ - "params: {})".format(", ".join([element.__str__() for element in self.params])) + return "HyperParameterSearchSpace(goal: {}, ".format( + self.goal + ) + "params: {})".format( + ", ".join([element.__str__() for element in self.params]) + ) @staticmethod def convert_parameter(p): @@ -78,16 +88,26 @@ def convert_parameter(p): step = 1 if p.feasible_space.step is not None and p.feasible_space.step != "": step = p.feasible_space.step - return HyperParameter.int(p.name, p.feasible_space.min, p.feasible_space.max, step) + return HyperParameter.int( + p.name, p.feasible_space.min, p.feasible_space.max, step + ) elif p.parameter_type == api.DOUBLE: - return HyperParameter.double(p.name, p.feasible_space.min, p.feasible_space.max, p.feasible_space.step) + return HyperParameter.double( + p.name, + p.feasible_space.min, + p.feasible_space.max, + p.feasible_space.step, + ) elif p.parameter_type == api.CATEGORICAL: return HyperParameter.categorical(p.name, p.feasible_space.list) elif p.parameter_type == api.DISCRETE: return HyperParameter.discrete(p.name, p.feasible_space.list) else: logger.error( - "Cannot get the type for the parameter: %s (%s)", p.name, p.parameter_type) + "Cannot get the type for the parameter: %s (%s)", + p.name, + p.parameter_type, + ) class HyperParameter(object): @@ -101,11 +121,15 @@ def __init__(self, name, type_, min_, max_, list_, step): def __str__(self): if self.type == constant.INTEGER or self.type == constant.DOUBLE: - return "HyperParameter(name: {}, type: {}, min: {}, max: {}, step: {})".format( - self.name, self.type, self.min, self.max, self.step) + return ( + "HyperParameter(name: {}, type: {}, min: {}, max: {}, step: {})".format( + self.name, self.type, self.min, self.max, self.step + ) + ) else: return "HyperParameter(name: {}, type: {}, list: {})".format( - self.name, self.type, ", ".join(self.list)) + self.name, self.type, ", ".join(self.list) + ) @staticmethod def int(name, min_, max_, step): @@ -117,7 +141,9 @@ def double(name, min_, max_, step): @staticmethod def categorical(name, lst): - return HyperParameter(name, constant.CATEGORICAL, 0, 0, [str(e) for e in lst], 0) + return HyperParameter( + name, constant.CATEGORICAL, 0, 0, [str(e) for e in lst], 0 + ) @staticmethod def discrete(name, lst): diff --git a/pkg/suggestion/v1beta1/internal/trial.py b/pkg/suggestion/v1beta1/internal/trial.py index 906617d0c0f..59ec6e8fefb 100644 --- a/pkg/suggestion/v1beta1/internal/trial.py +++ b/pkg/suggestion/v1beta1/internal/trial.py @@ -41,8 +41,11 @@ def __init__( def convert(trials): res = [] for trial in trials: - if trial.status.condition == api.TrialStatus.TrialConditionType.SUCCEEDED or \ - trial.status.condition == api.TrialStatus.TrialConditionType.EARLYSTOPPED: + if ( + trial.status.condition == api.TrialStatus.TrialConditionType.SUCCEEDED + or trial.status.condition + == api.TrialStatus.TrialConditionType.EARLYSTOPPED + ): new_trial = Trial.convertTrial(trial) if new_trial is not None: res.append(Trial.convertTrial(trial)) diff --git a/pkg/suggestion/v1beta1/nas/common/validation.py b/pkg/suggestion/v1beta1/nas/common/validation.py index 26e510d51a0..b91831e098e 100644 --- a/pkg/suggestion/v1beta1/nas/common/validation.py +++ b/pkg/suggestion/v1beta1/nas/common/validation.py @@ -38,26 +38,56 @@ def validate_operations(operations: list[api_pb2.Operation]) -> (bool, str): # Check ParameterType if not parameter.parameter_type: - return False, "Missing ParameterType in ParameterConfig:\n{}".format(parameter) + return False, "Missing ParameterType in ParameterConfig:\n{}".format( + parameter + ) # Check List in Categorical or Discrete Type - if parameter.parameter_type == api_pb2.CATEGORICAL or parameter.parameter_type == api_pb2.DISCRETE: + if ( + parameter.parameter_type == api_pb2.CATEGORICAL + or parameter.parameter_type == api_pb2.DISCRETE + ): if not parameter.feasible_space.list: - return False, "Missing List in ParameterConfig.feasibleSpace:\n{}".format(parameter) + return ( + False, + "Missing List in ParameterConfig.feasibleSpace:\n{}".format( + parameter + ), + ) # Check Max, Min, Step in Int or Double Type - elif parameter.parameter_type == api_pb2.INT or parameter.parameter_type == api_pb2.DOUBLE: - if not parameter.feasible_space.min and not parameter.feasible_space.max: - return False, "Missing Max and Min in ParameterConfig.feasibleSpace:\n{}".format(parameter) + elif ( + parameter.parameter_type == api_pb2.INT + or parameter.parameter_type == api_pb2.DOUBLE + ): + if ( + not parameter.feasible_space.min + and not parameter.feasible_space.max + ): + return ( + False, + "Missing Max and Min in ParameterConfig.feasibleSpace:\n{}".format( + parameter + ), + ) try: - if (parameter.parameter_type == api_pb2.DOUBLE and - (not parameter.feasible_space.step or float(parameter.feasible_space.step) <= 0)): - return False, \ - "Step parameter should be > 0 in ParameterConfig.feasibleSpace:\n{}".format(parameter) + if parameter.parameter_type == api_pb2.DOUBLE and ( + not parameter.feasible_space.step + or float(parameter.feasible_space.step) <= 0 + ): + return ( + False, + "Step parameter should be > 0 in ParameterConfig.feasibleSpace:\n{}".format( + parameter + ), + ) except Exception as e: - return False, \ - "failed to validate ParameterConfig.feasibleSpace \n{parameter}):\n{exception}".format( - parameter=parameter, exception=e) + return ( + False, + "failed to validate ParameterConfig.feasibleSpace \n{parameter}):\n{exception}".format( + parameter=parameter, exception=e + ), + ) return True, "" diff --git a/pkg/suggestion/v1beta1/nas/darts/service.py b/pkg/suggestion/v1beta1/nas/darts/service.py index 55e7751a8f3..835aaa58513 100644 --- a/pkg/suggestion/v1beta1/nas/darts/service.py +++ b/pkg/suggestion/v1beta1/nas/darts/service.py @@ -14,14 +14,11 @@ import json import logging -from logging import getLogger -from logging import INFO -from logging import StreamHandler +from logging import INFO, StreamHandler, getLogger import grpc -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer from pkg.suggestion.v1beta1.nas.common.validation import validate_operations @@ -33,7 +30,7 @@ def __init__(self): self.is_first_run = True self.logger = getLogger(__name__) - FORMAT = '%(asctime)-15s Experiment %(experiment_name)s %(message)s' + FORMAT = "%(asctime)-15s Experiment %(experiment_name)s %(message)s" logging.basicConfig(format=FORMAT) handler = StreamHandler() handler.setLevel(INFO) @@ -62,8 +59,8 @@ def GetSuggestions(self, request, context): search_space_json = json.dumps(search_space) algorithm_settings_json = json.dumps(algorithm_settings) - search_space_str = str(search_space_json).replace('\"', '\'') - algorithm_settings_str = str(algorithm_settings_json).replace('\"', '\'') + search_space_str = str(search_space_json).replace('"', "'") + algorithm_settings_str = str(algorithm_settings_json).replace('"', "'") self.is_first_run = False @@ -84,17 +81,14 @@ def GetSuggestions(self, request, context): api_pb2.GetSuggestionsReply.ParameterAssignments( assignments=[ api_pb2.ParameterAssignment( - name="algorithm-settings", - value=algorithm_settings_str + name="algorithm-settings", value=algorithm_settings_str ), api_pb2.ParameterAssignment( - name="search-space", - value=search_space_str + name="search-space", value=search_space_str ), api_pb2.ParameterAssignment( - name="num-layers", - value=num_layers - ) + name="num-layers", value=num_layers + ), ] ) ) @@ -114,27 +108,29 @@ def get_search_space(operations): # Currently support only one Categorical parameter - filter size opt_spec = list(operation.parameter_specs.parameters)[0] for filter_size in list(opt_spec.feasible_space.list): - search_space.append(opt_type+"_{}x{}".format(filter_size, filter_size)) + search_space.append( + opt_type + "_{}x{}".format(filter_size, filter_size) + ) return search_space def get_algorithm_settings(settings_raw): algorithm_settings_default = { - "num_epochs": 50, - "w_lr": 0.025, - "w_lr_min": 0.001, - "w_momentum": 0.9, - "w_weight_decay": 3e-4, - "w_grad_clip": 5., - "alpha_lr": 3e-4, - "alpha_weight_decay": 1e-3, - "batch_size": 128, - "num_workers": 4, - "init_channels": 16, - "print_step": 50, - "num_nodes": 4, - "stem_multiplier": 3, + "num_epochs": 50, + "w_lr": 0.025, + "w_lr_min": 0.001, + "w_momentum": 0.9, + "w_weight_decay": 3e-4, + "w_grad_clip": 5.0, + "alpha_lr": 3e-4, + "alpha_weight_decay": 1e-3, + "batch_size": 128, + "num_workers": 4, + "init_channels": 16, + "print_step": 50, + "num_nodes": 4, + "stem_multiplier": 3, } for setting in settings_raw: @@ -162,7 +158,9 @@ def validate_algorithm_spec(spec: api_pb2.ExperimentSpec) -> (bool, str): # validate_algorithm_settings is implemented based on quark0/darts and pt.darts. # quark0/darts: https://github.com/quark0/darts # pt.darts: https://github.com/khanrc/pt.darts -def validate_algorithm_settings(algorithm_settings: list[api_pb2.AlgorithmSetting]) -> (bool, str): +def validate_algorithm_settings( + algorithm_settings: list[api_pb2.AlgorithmSetting], +) -> (bool, str): for s in algorithm_settings: try: if s.name == "num_epochs": @@ -172,17 +170,23 @@ def validate_algorithm_settings(algorithm_settings: list[api_pb2.AlgorithmSettin # Validate learning rate if s.name in {"w_lr", "w_lr_min", "alpha_lr"}: if not float(s.value) >= 0.0: - return False, "{} should be greater than or equal to zero".format(s.name) + return False, "{} should be greater than or equal to zero".format( + s.name + ) # Validate weight decay if s.name in {"w_weight_decay", "alpha_weight_decay"}: if not float(s.value) >= 0.0: - return False, "{} should be greater than or equal to zero".format(s.name) + return False, "{} should be greater than or equal to zero".format( + s.name + ) # Validate w_momentum and w_grad_clip if s.name in {"w_momentum", "w_grad_clip"}: if not float(s.value) >= 0.0: - return False, "{} should be greater than or equal to zero".format(s.name) + return False, "{} should be greater than or equal to zero".format( + s.name + ) if s.name == "batch_size": if s.value != "None" and not int(s.value) >= 1: @@ -193,12 +197,20 @@ def validate_algorithm_settings(algorithm_settings: list[api_pb2.AlgorithmSettin return False, "num_workers should be greater than or equal to zero" # Validate "init_channels", "print_step", "num_nodes" and "stem_multiplier" - if s.name in {"init_channels", "print_step", "num_nodes", "stem_multiplier"}: + if s.name in { + "init_channels", + "print_step", + "num_nodes", + "stem_multiplier", + }: if not int(s.value) >= 1: - return False, "{} should be greater than or equal to one".format(s.name) + return False, "{} should be greater than or equal to one".format( + s.name + ) except Exception as e: - return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value, - exception=e) + return False, "failed to validate {name}({value}): {exception}".format( + name=s.name, value=s.value, exception=e + ) return True, "" diff --git a/pkg/suggestion/v1beta1/nas/enas/AlgorithmSettings.py b/pkg/suggestion/v1beta1/nas/enas/AlgorithmSettings.py index abf3ac39e03..685ff4cf25d 100644 --- a/pkg/suggestion/v1beta1/nas/enas/AlgorithmSettings.py +++ b/pkg/suggestion/v1beta1/nas/enas/AlgorithmSettings.py @@ -14,34 +14,38 @@ algorithmSettingsValidator = { - "controller_hidden_size": [int, [1, 'inf']], - "controller_temperature": [float, [0, 'inf']], - "controller_tanh_const": [float, [0, 'inf']], - "controller_entropy_weight": [float, [0.0, 'inf']], - "controller_baseline_decay": [float, [0.0, 1.0]], - "controller_learning_rate": [float, [0.0, 1.0]], - "controller_skip_target": [float, [0.0, 1.0]], - "controller_skip_weight": [float, [0.0, 'inf']], - "controller_train_steps": [int, [1, 'inf']], - "controller_log_every_steps": [int, [1, 'inf']], + "controller_hidden_size": [int, [1, "inf"]], + "controller_temperature": [float, [0, "inf"]], + "controller_tanh_const": [float, [0, "inf"]], + "controller_entropy_weight": [float, [0.0, "inf"]], + "controller_baseline_decay": [float, [0.0, 1.0]], + "controller_learning_rate": [float, [0.0, 1.0]], + "controller_skip_target": [float, [0.0, 1.0]], + "controller_skip_weight": [float, [0.0, "inf"]], + "controller_train_steps": [int, [1, "inf"]], + "controller_log_every_steps": [int, [1, "inf"]], } enableNoneSettingsList = [ - "controller_temperature", "controller_tanh_const", "controller_entropy_weight", "controller_skip_weight"] + "controller_temperature", + "controller_tanh_const", + "controller_entropy_weight", + "controller_skip_weight", +] def parseAlgorithmSettings(settings_raw): algorithm_settings_default = { - "controller_hidden_size": 64, - "controller_temperature": 5., - "controller_tanh_const": 2.25, - "controller_entropy_weight": 1e-5, - "controller_baseline_decay": 0.999, - "controller_learning_rate": 5e-5, - "controller_skip_target": 0.4, - "controller_skip_weight": 0.8, - "controller_train_steps": 50, - "controller_log_every_steps": 10, + "controller_hidden_size": 64, + "controller_temperature": 5.0, + "controller_tanh_const": 2.25, + "controller_entropy_weight": 1e-5, + "controller_baseline_decay": 0.999, + "controller_learning_rate": 5e-5, + "controller_skip_target": 0.4, + "controller_skip_weight": 0.8, + "controller_train_steps": 50, + "controller_log_every_steps": 10, } for setting in settings_raw: diff --git a/pkg/suggestion/v1beta1/nas/enas/Controller.py b/pkg/suggestion/v1beta1/nas/enas/Controller.py index 12a43a32c56..42b4c84b5e5 100644 --- a/pkg/suggestion/v1beta1/nas/enas/Controller.py +++ b/pkg/suggestion/v1beta1/nas/enas/Controller.py @@ -17,19 +17,21 @@ class Controller(object): - def __init__(self, - num_layers=12, - num_operations=16, - controller_hidden_size=64, - controller_temperature=5., - controller_tanh_const=2.25, - controller_entropy_weight=1e-5, - controller_baseline_decay=0.999, - controller_learning_rate=5e-5, - controller_skip_target=0.4, - controller_skip_weight=0.8, - controller_name="controller", - logger=None): + def __init__( + self, + num_layers=12, + num_operations=16, + controller_hidden_size=64, + controller_temperature=5.0, + controller_tanh_const=2.25, + controller_entropy_weight=1e-5, + controller_baseline_decay=0.999, + controller_learning_rate=5e-5, + controller_skip_target=0.4, + controller_skip_weight=0.8, + controller_name="controller", + logger=None, + ): self.logger = logger self.logger.info(">>> Building Controller\n") @@ -59,23 +61,38 @@ def _build_params(self): with tf.compat.v1.variable_scope(self.controller_name, initializer=initializer): with tf.compat.v1.variable_scope("lstm"): - self.w_lstm = tf.compat.v1.get_variable("w", [2 * hidden_size, 4 * hidden_size]) + self.w_lstm = tf.compat.v1.get_variable( + "w", [2 * hidden_size, 4 * hidden_size] + ) self.g_emb = tf.compat.v1.get_variable("g_emb", [1, hidden_size]) with tf.compat.v1.variable_scope("embedding"): - self.w_emb = tf.compat.v1.get_variable("w", [self.num_operations, hidden_size]) + self.w_emb = tf.compat.v1.get_variable( + "w", [self.num_operations, hidden_size] + ) with tf.compat.v1.variable_scope("softmax"): - self.w_soft = tf.compat.v1.get_variable("w", [hidden_size, self.num_operations]) - - with tf.compat.v1.variable_scope('attention'): - self.attn_w_1 = tf.compat.v1.get_variable('w_1', [hidden_size, hidden_size]) - self.attn_w_2 = tf.compat.v1.get_variable("w_2", [hidden_size, hidden_size]) - self.attn_v = tf.compat.v1.get_variable('v', [hidden_size, 1]) - - num_params = sum([np.prod(v.shape) - for v in tf.compat.v1.trainable_variables() if v.name.startswith(self.controller_name)]) + self.w_soft = tf.compat.v1.get_variable( + "w", [hidden_size, self.num_operations] + ) + + with tf.compat.v1.variable_scope("attention"): + self.attn_w_1 = tf.compat.v1.get_variable( + "w_1", [hidden_size, hidden_size] + ) + self.attn_w_2 = tf.compat.v1.get_variable( + "w_2", [hidden_size, hidden_size] + ) + self.attn_v = tf.compat.v1.get_variable("v", [hidden_size, 1]) + + num_params = sum( + [ + np.prod(v.shape) + for v in tf.compat.v1.trainable_variables() + if v.name.startswith(self.controller_name) + ] + ) self.logger.info(">>> Controller has {} Trainable params\n".format(num_params)) def _build_sampler(self): @@ -97,8 +114,10 @@ def _build_sampler(self): prev_c = tf.zeros([1, hidden_size], tf.float32) prev_h = tf.zeros([1, hidden_size], tf.float32) - skip_targets = tf.constant([1.0 - self.controller_skip_target, self.controller_skip_target], - dtype=tf.float32) + skip_targets = tf.constant( + [1.0 - self.controller_skip_target, self.controller_skip_target], + dtype=tf.float32, + ) inputs = self.g_emb @@ -121,7 +140,8 @@ def _build_sampler(self): arc_seq.append(func) log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits( - logits=logits, labels=func) + logits=logits, labels=func + ) sample_log_probs.append(log_prob) entropy = log_prob * tf.exp(-log_prob) @@ -153,17 +173,23 @@ def _build_sampler(self): arc_seq.append(skip_index) skip_prob = tf.sigmoid(logits) - kl = skip_prob * tf.math.log(skip_prob/skip_targets) + kl = skip_prob * tf.math.log(skip_prob / skip_targets) kl = tf.reduce_sum(input_tensor=kl) skip_penalties.append(kl) log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits( - logits=logits, labels=skip_index) + logits=logits, labels=skip_index + ) - sample_log_probs.append(tf.reduce_sum(input_tensor=log_prob, keepdims=True)) + sample_log_probs.append( + tf.reduce_sum(input_tensor=log_prob, keepdims=True) + ) entropy = tf.stop_gradient( - tf.reduce_sum(input_tensor=log_prob * tf.exp(-log_prob), keepdims=True)) + tf.reduce_sum( + input_tensor=log_prob * tf.exp(-log_prob), keepdims=True + ) + ) sample_entropies.append(entropy) skip_index = tf.dtypes.cast(skip_index, tf.float32) @@ -173,7 +199,7 @@ def _build_sampler(self): inputs = tf.matmul(skip_index, tf.concat(all_h, axis=0)) - inputs /= (1.0 + tf.reduce_sum(input_tensor=skip_index)) + inputs /= 1.0 + tf.reduce_sum(input_tensor=skip_index) else: inputs = self.g_emb @@ -201,7 +227,9 @@ def build_trainer(self): self.reward = self.child_val_accuracy - normalize = tf.dtypes.cast((self.num_layers * (self.num_layers - 1) / 2), tf.float32) + normalize = tf.dtypes.cast( + (self.num_layers * (self.num_layers - 1) / 2), tf.float32 + ) self.skip_rate = tf.dtypes.cast((self.skip_count / normalize), tf.float32) if self.controller_entropy_weight is not None: @@ -210,7 +238,9 @@ def build_trainer(self): self.sample_log_probs = tf.reduce_sum(input_tensor=self.sample_log_probs) self.baseline = tf.Variable(0.0, dtype=tf.float32, trainable=False) baseline_update = tf.compat.v1.assign_sub( - self.baseline, (1 - self.controller_baseline_decay) * (self.baseline - self.reward)) + self.baseline, + (1 - self.controller_baseline_decay) * (self.baseline - self.reward), + ) with tf.control_dependencies([baseline_update]): self.reward = tf.identity(self.reward) @@ -221,16 +251,24 @@ def build_trainer(self): self.loss += self.controller_skip_weight * self.skip_penalties self.train_step = tf.Variable( - 0, dtype=tf.int32, trainable=False, name=self.controller_name + '_train_step') - - tf_variables = [var for var in tf.compat.v1.trainable_variables() - if var.name.startswith(self.controller_name)] + 0, + dtype=tf.int32, + trainable=False, + name=self.controller_name + "_train_step", + ) + + tf_variables = [ + var + for var in tf.compat.v1.trainable_variables() + if var.name.startswith(self.controller_name) + ] self.train_op, self.grad_norm = _build_train_op( loss=self.loss, tf_variables=tf_variables, train_step=self.train_step, - learning_rate=self.controller_learning_rate) + learning_rate=self.controller_learning_rate, + ) # TODO: will remove this function and use tf.nn.LSTMCell instead @@ -252,6 +290,8 @@ def _build_train_op(loss, tf_variables, train_step, learning_rate): grads = tf.gradients(ys=loss, xs=tf_variables) grad_norm = tf.linalg.global_norm(grads) - train_op = optimizer.apply_gradients(zip(grads, tf_variables), global_step=train_step) + train_op = optimizer.apply_gradients( + zip(grads, tf_variables), global_step=train_step + ) return train_op, grad_norm diff --git a/pkg/suggestion/v1beta1/nas/enas/Operation.py b/pkg/suggestion/v1beta1/nas/enas/Operation.py index 3fe94df792e..56b9b6f4780 100644 --- a/pkg/suggestion/v1beta1/nas/enas/Operation.py +++ b/pkg/suggestion/v1beta1/nas/enas/Operation.py @@ -27,9 +27,9 @@ def __init__(self, opt_id, opt_type, opt_params): def get_dict(self): opt_dict = dict() - opt_dict['opt_id'] = self.opt_id - opt_dict['opt_type'] = self.opt_type - opt_dict['opt_params'] = self.opt_params + opt_dict["opt_id"] = self.opt_id + opt_dict["opt_type"] = self.opt_type + opt_dict["opt_params"] = self.opt_params return opt_dict def print_op(self, logger): @@ -68,14 +68,12 @@ def _parse_operations(self): spec_min = int(ispec.feasible_space.min) spec_max = int(ispec.feasible_space.max) spec_step = int(ispec.feasible_space.step) - avail_space[spec_name] = range( - spec_min, spec_max+1, spec_step) + avail_space[spec_name] = range(spec_min, spec_max + 1, spec_step) elif ispec.parameter_type == api_pb2.DOUBLE: spec_min = float(ispec.feasible_space.min) spec_max = float(ispec.feasible_space.max) spec_step = float(ispec.feasible_space.step) - double_list = np.arange( - spec_min, spec_max+spec_step, spec_step) + double_list = np.arange(spec_min, spec_max + spec_step, spec_step) if double_list[-1] > spec_max: del double_list[-1] avail_space[spec_name] = double_list diff --git a/pkg/suggestion/v1beta1/nas/enas/service.py b/pkg/suggestion/v1beta1/nas/enas/service.py index c0e7970f127..029d7511216 100644 --- a/pkg/suggestion/v1beta1/nas/enas/service.py +++ b/pkg/suggestion/v1beta1/nas/enas/service.py @@ -14,24 +14,20 @@ import json import logging -from logging import getLogger -from logging import INFO -from logging import StreamHandler import os +from logging import INFO, StreamHandler, getLogger import grpc import tensorflow as tf -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer from pkg.suggestion.v1beta1.nas.common.validation import validate_operations -from pkg.suggestion.v1beta1.nas.enas.AlgorithmSettings import \ - algorithmSettingsValidator -from pkg.suggestion.v1beta1.nas.enas.AlgorithmSettings import \ - enableNoneSettingsList -from pkg.suggestion.v1beta1.nas.enas.AlgorithmSettings import \ - parseAlgorithmSettings +from pkg.suggestion.v1beta1.nas.enas.AlgorithmSettings import ( + algorithmSettingsValidator, + enableNoneSettingsList, + parseAlgorithmSettings, +) from pkg.suggestion.v1beta1.nas.enas.Controller import Controller from pkg.suggestion.v1beta1.nas.enas.Operation import SearchSpace @@ -43,8 +39,7 @@ def __init__(self, request, logger): self.experiment = request.experiment self.num_trials = 1 self.tf_graph = tf.Graph() - self.ctrl_cache_file = "ctrl_cache/{}.ckpt".format( - self.experiment_name) + self.ctrl_cache_file = "ctrl_cache/{}.ckpt".format(self.experiment_name) self.suggestion_step = 0 self.algorithm_settings = None self.controller = None @@ -55,12 +50,18 @@ def __init__(self, request, logger): self.search_space = None self.opt_direction = None self.objective_name = None - self.logger.info("-" * 100 + "\nSetting Up Suggestion for Experiment {}\n".format( - self.experiment_name) + "-" * 100) + self.logger.info( + "-" * 100 + + "\nSetting Up Suggestion for Experiment {}\n".format(self.experiment_name) + + "-" * 100 + ) self._get_experiment_param() self._setup_controller() - self.logger.info(">>> Suggestion for Experiment {} has been initialized.\n".format( - self.experiment_name)) + self.logger.info( + ">>> Suggestion for Experiment {} has been initialized.\n".format( + self.experiment_name + ) + ) def _get_experiment_param(self): # this function need to @@ -96,53 +97,69 @@ def _get_experiment_param(self): self.print_algorithm_settings() def _setup_controller(self): - with self.tf_graph.as_default(): - self.controller = Controller( num_layers=self.num_layers, num_operations=self.num_operations, - controller_hidden_size=self.algorithm_settings['controller_hidden_size'], - controller_temperature=self.algorithm_settings['controller_temperature'], - controller_tanh_const=self.algorithm_settings['controller_tanh_const'], - controller_entropy_weight=self.algorithm_settings['controller_entropy_weight'], - controller_baseline_decay=self.algorithm_settings['controller_baseline_decay'], - controller_learning_rate=self.algorithm_settings["controller_learning_rate"], - controller_skip_target=self.algorithm_settings['controller_skip_target'], - controller_skip_weight=self.algorithm_settings['controller_skip_weight'], + controller_hidden_size=self.algorithm_settings[ + "controller_hidden_size" + ], + controller_temperature=self.algorithm_settings[ + "controller_temperature" + ], + controller_tanh_const=self.algorithm_settings["controller_tanh_const"], + controller_entropy_weight=self.algorithm_settings[ + "controller_entropy_weight" + ], + controller_baseline_decay=self.algorithm_settings[ + "controller_baseline_decay" + ], + controller_learning_rate=self.algorithm_settings[ + "controller_learning_rate" + ], + controller_skip_target=self.algorithm_settings[ + "controller_skip_target" + ], + controller_skip_weight=self.algorithm_settings[ + "controller_skip_weight" + ], controller_name="Ctrl_" + self.experiment_name, - logger=self.logger) + logger=self.logger, + ) self.controller.build_trainer() def print_search_space(self): if self.search_space is None: - self.logger.warning( - "Error! The Suggestion has not yet been initialized!") + self.logger.warning("Error! The Suggestion has not yet been initialized!") return self.logger.info( - ">>> Search Space for Experiment {}".format(self.experiment_name)) + ">>> Search Space for Experiment {}".format(self.experiment_name) + ) for opt in self.search_space: opt.print_op(self.logger) self.logger.info( - "There are {} operations in total.\n".format(self.num_operations)) + "There are {} operations in total.\n".format(self.num_operations) + ) def print_algorithm_settings(self): if self.algorithm_settings is None: - self.logger.warning( - "Error! The Suggestion has not yet been initialized!") + self.logger.warning("Error! The Suggestion has not yet been initialized!") return - self.logger.info(">>> Parameters of LSTM Controller for Experiment {}\n".format( - self.experiment_name)) + self.logger.info( + ">>> Parameters of LSTM Controller for Experiment {}\n".format( + self.experiment_name + ) + ) for spec in self.algorithm_settings: if len(spec) > 22: - self.logger.info("{}:\t{}".format( - spec, self.algorithm_settings[spec])) + self.logger.info("{}:\t{}".format(spec, self.algorithm_settings[spec])) else: - self.logger.info("{}:\t\t{}".format( - spec, self.algorithm_settings[spec])) + self.logger.info( + "{}:\t\t{}".format(spec, self.algorithm_settings[spec]) + ) self.logger.info("") @@ -154,7 +171,7 @@ def __init__(self, logger=None): self.experiment = None if logger is None: self.logger = getLogger(__name__) - FORMAT = '%(asctime)-15s Experiment %(experiment_name)s %(message)s' + FORMAT = "%(asctime)-15s Experiment %(experiment_name)s %(message)s" logging.basicConfig(format=FORMAT) handler = StreamHandler() handler.setLevel(INFO) @@ -175,18 +192,21 @@ def ValidateAlgorithmSettings(self, request, context): # Validate GraphConfig # Check InputSize if not graph_config.input_sizes: - return self.set_validate_context_error(context, - "Missing InputSizes in GraphConfig:\n{}".format(graph_config)) + return self.set_validate_context_error( + context, "Missing InputSizes in GraphConfig:\n{}".format(graph_config) + ) # Check OutputSize if not graph_config.output_sizes: - return self.set_validate_context_error(context, - "Missing OutputSizes in GraphConfig:\n{}".format(graph_config)) + return self.set_validate_context_error( + context, "Missing OutputSizes in GraphConfig:\n{}".format(graph_config) + ) # Check NumLayers if not graph_config.num_layers: - return self.set_validate_context_error(context, - "Missing NumLayers in GraphConfig:\n{}".format(graph_config)) + return self.set_validate_context_error( + context, "Missing NumLayers in GraphConfig:\n{}".format(graph_config) + ) # Validate Operations is_valid, message = validate_operations(nas_config.operations.operation) @@ -204,34 +224,43 @@ def ValidateAlgorithmSettings(self, request, context): try: converted_value = setting_type(setting.value) except Exception as e: - return self.set_validate_context_error(context, - "Algorithm Setting {} must be {} type: exception {}".format( - setting.name, setting_type.__name__, e)) + return self.set_validate_context_error( + context, + "Algorithm Setting {} must be {} type: exception {}".format( + setting.name, setting_type.__name__, e + ), + ) if setting_type == float: - if (converted_value <= setting_range[0] or - (setting_range[1] != 'inf' and converted_value > setting_range[1])): + if converted_value <= setting_range[0] or ( + setting_range[1] != "inf" and converted_value > setting_range[1] + ): return self.set_validate_context_error( - context, "Algorithm Setting {}: {} with {} type must be in range ({}, {}]".format( + context, + "Algorithm Setting {}: {} with {} type must be in range ({}, {}]".format( setting.name, converted_value, setting_type.__name__, setting_range[0], - setting_range[1]) + setting_range[1], + ), ) elif converted_value < setting_range[0]: return self.set_validate_context_error( - context, "Algorithm Setting {}: {} with {} type must be in range [{}, {})".format( + context, + "Algorithm Setting {}: {} with {} type must be in range [{}, {})".format( setting.name, converted_value, setting_type.__name__, setting_range[0], - setting_range[1]) + setting_range[1], + ), ) else: - return self.set_validate_context_error(context, - "Unknown Algorithm Setting name: {}".format(setting.name)) + return self.set_validate_context_error( + context, "Unknown Algorithm Setting name: {}".format(setting.name) + ) self.logger.info("All Experiment Settings are Valid") return api_pb2.ValidateAlgorithmSettingsReply() @@ -248,11 +277,18 @@ def GetSuggestions(self, request, context): experiment = self.experiment if request.current_request_number > 0: experiment.num_trials = request.current_request_number - self.logger.info("-" * 100 + "\nSuggestion Step {} for Experiment {}\n".format( - experiment.suggestion_step, experiment.experiment_name) + "-" * 100) + self.logger.info( + "-" * 100 + + "\nSuggestion Step {} for Experiment {}\n".format( + experiment.suggestion_step, experiment.experiment_name + ) + + "-" * 100 + ) self.logger.info("") - self.logger.info(">>> Current Request Number:\t\t{}".format(experiment.num_trials)) + self.logger.info( + ">>> Current Request Number:\t\t{}".format(experiment.num_trials) + ) self.logger.info("") with experiment.tf_graph.as_default(): @@ -272,14 +308,16 @@ def GetSuggestions(self, request, context): } if self.is_first_run: - self.logger.info(">>> First time running suggestion for {}. Random architecture will be given.".format( - experiment.experiment_name)) + self.logger.info( + ">>> First time running suggestion for {}. Random architecture will be given.".format( + experiment.experiment_name + ) + ) with tf.compat.v1.Session() as sess: sess.run(tf.compat.v1.global_variables_initializer()) candidates = list() for _ in range(experiment.num_trials): - candidates.append( - sess.run(controller_ops["sample_arc"])) + candidates.append(sess.run(controller_ops["sample_arc"])) # TODO: will use PVC to store the checkpoint to protect against unexpected suggestion pod restart saver.save(sess, experiment.ctrl_cache_file) @@ -300,11 +338,16 @@ def GetSuggestions(self, request, context): # then fail the task because it may indicate that the training container has errors. if result is None: self.logger.warning( - ">>> Suggestion has spawned trials, but they all failed.") + ">>> Suggestion has spawned trials, but they all failed." + ) self.logger.warning( - ">>> Please check whether the training container is correctly implemented") - self.logger.info(">>> Experiment {} failed".format( - experiment.experiment_name)) + ">>> Please check whether the training container is correctly implemented" + ) + self.logger.info( + ">>> Experiment {} failed".format( + experiment.experiment_name + ) + ) return [] # This LSTM network is designed to maximize the metrics @@ -313,26 +356,35 @@ def GetSuggestions(self, request, context): if experiment.opt_direction == api_pb2.MINIMIZE: result = -result - self.logger.info(">>> Suggestion updated. LSTM Controller Training\n") - log_every = experiment.algorithm_settings["controller_log_every_steps"] - for ctrl_step in range(1, experiment.algorithm_settings["controller_train_steps"]+1): + self.logger.info( + ">>> Suggestion updated. LSTM Controller Training\n" + ) + log_every = experiment.algorithm_settings[ + "controller_log_every_steps" + ] + for ctrl_step in range( + 1, experiment.algorithm_settings["controller_train_steps"] + 1 + ): run_ops = [ controller_ops["loss"], controller_ops["entropy"], controller_ops["grad_norm"], controller_ops["baseline"], controller_ops["skip_rate"], - controller_ops["train_op"] + controller_ops["train_op"], ] loss, entropy, grad_norm, baseline, skip_rate, _ = sess.run( fetches=run_ops, - feed_dict={controller_ops["child_val_accuracy"]: result}) + feed_dict={controller_ops["child_val_accuracy"]: result}, + ) controller_step = sess.run(controller_ops["train_step"]) if ctrl_step % log_every == 0: log_string = "" - log_string += "Controller Step: {} - ".format(controller_step) + log_string += "Controller Step: {} - ".format( + controller_step + ) log_string += "Loss: {:.4f} - ".format(loss) log_string += "Entropy: {:.9} - ".format(entropy) log_string += "Gradient Norm: {:.7f} - ".format(grad_norm) @@ -342,8 +394,7 @@ def GetSuggestions(self, request, context): candidates = list() for _ in range(experiment.num_trials): - candidates.append( - sess.run(controller_ops["sample_arc"])) + candidates.append(sess.run(controller_ops["sample_arc"])) saver.save(sess, experiment.ctrl_cache_file) @@ -355,27 +406,30 @@ def GetSuggestions(self, request, context): organized_arc = [0 for _ in range(experiment.num_layers)] record = 0 for layer in range(experiment.num_layers): - organized_arc[layer] = arc[record: record + layer + 1] + organized_arc[layer] = arc[record : record + layer + 1] record += layer + 1 organized_candidates.append(organized_arc) nn_config = dict() - nn_config['num_layers'] = experiment.num_layers - nn_config['input_sizes'] = experiment.input_sizes - nn_config['output_sizes'] = experiment.output_sizes - nn_config['embedding'] = dict() + nn_config["num_layers"] = experiment.num_layers + nn_config["input_sizes"] = experiment.input_sizes + nn_config["output_sizes"] = experiment.output_sizes + nn_config["embedding"] = dict() for layer in range(experiment.num_layers): opt = organized_arc[layer][0] - nn_config['embedding'][opt] = experiment.search_space[opt].get_dict() + nn_config["embedding"][opt] = experiment.search_space[opt].get_dict() organized_arc_json = json.dumps(organized_arc) nn_config_json = json.dumps(nn_config) - organized_arc_str = str(organized_arc_json).replace('\"', '\'') - nn_config_str = str(nn_config_json).replace('\"', '\'') + organized_arc_str = str(organized_arc_json).replace('"', "'") + nn_config_str = str(nn_config_json).replace('"', "'") self.logger.info( - "\n>>> New Neural Network Architecture Candidate #{} (internal representation):".format(i)) + "\n>>> New Neural Network Architecture Candidate #{} (internal representation):".format( + i + ) + ) self.logger.info(organized_arc_json) self.logger.info("\n>>> Corresponding Seach Space Description:") self.logger.info(nn_config_str) @@ -384,20 +438,21 @@ def GetSuggestions(self, request, context): api_pb2.GetSuggestionsReply.ParameterAssignments( assignments=[ api_pb2.ParameterAssignment( - name="architecture", - value=organized_arc_str + name="architecture", value=organized_arc_str ), api_pb2.ParameterAssignment( - name="nn_config", - value=nn_config_str - ) + name="nn_config", value=nn_config_str + ), ] ) ) self.logger.info("") - self.logger.info(">>> {} Trials were created for Experiment {}".format( - experiment.num_trials, experiment.experiment_name)) + self.logger.info( + ">>> {} Trials were created for Experiment {}".format( + experiment.num_trials, experiment.experiment_name + ) + ) self.logger.info("") experiment.suggestion_step += 1 @@ -423,11 +478,15 @@ def GetEvaluationResult(self, trials_list): failed_trials.append(t.name) n_completed = len(completed_trials) - self.logger.info(">>> By now: {} Trials succeeded, {} Trials failed".format( - n_completed, len(failed_trials))) + self.logger.info( + ">>> By now: {} Trials succeeded, {} Trials failed".format( + n_completed, len(failed_trials) + ) + ) for tname in completed_trials: - self.logger.info("Trial: {}, Value: {}".format( - tname, completed_trials[tname])) + self.logger.info( + "Trial: {}, Value: {}".format(tname, completed_trials[tname]) + ) for tname in failed_trials: self.logger.info("Trial: {} was failed".format(tname)) diff --git a/pkg/suggestion/v1beta1/optuna/base_service.py b/pkg/suggestion/v1beta1/optuna/base_service.py index ce790173024..0a18f395771 100644 --- a/pkg/suggestion/v1beta1/optuna/base_service.py +++ b/pkg/suggestion/v1beta1/optuna/base_service.py @@ -16,21 +16,19 @@ import optuna -from pkg.suggestion.v1beta1.internal.constant import CATEGORICAL -from pkg.suggestion.v1beta1.internal.constant import DISCRETE -from pkg.suggestion.v1beta1.internal.constant import DOUBLE -from pkg.suggestion.v1beta1.internal.constant import INTEGER -from pkg.suggestion.v1beta1.internal.constant import MAX_GOAL -from pkg.suggestion.v1beta1.internal.search_space import \ - HyperParameterSearchSpace +from pkg.suggestion.v1beta1.internal.constant import ( + CATEGORICAL, + DISCRETE, + DOUBLE, + INTEGER, + MAX_GOAL, +) +from pkg.suggestion.v1beta1.internal.search_space import HyperParameterSearchSpace from pkg.suggestion.v1beta1.internal.trial import Assignment class BaseOptunaService(object): - def __init__(self, - algorithm_name="", - algorithm_config=None, - search_space=None): + def __init__(self, algorithm_name="", algorithm_config=None, search_space=None): self.algorithm_name = algorithm_name self.algorithm_config = algorithm_config self.search_space = search_space @@ -56,7 +54,9 @@ def _create_sampler(self): return optuna.samplers.RandomSampler(**self.algorithm_config) elif self.algorithm_name == "grid": - combinations = HyperParameterSearchSpace.convert_to_combinations(self.search_space) + combinations = HyperParameterSearchSpace.convert_to_combinations( + self.search_space + ) return optuna.samplers.GridSampler(combinations, **self.algorithm_config) def get_suggestions(self, trials, current_request_number): @@ -67,13 +67,17 @@ def get_suggestions(self, trials, current_request_number): def _ask(self, current_request_number): list_of_assignments = [] for _ in range(current_request_number): - optuna_trial = self.study.ask(fixed_distributions=self._get_optuna_search_space()) + optuna_trial = self.study.ask( + fixed_distributions=self._get_optuna_search_space() + ) assignments = [Assignment(k, v) for k, v in optuna_trial.params.items()] list_of_assignments.append(assignments) assignments_key = self._get_assignments_key(assignments) - self.assignments_to_optuna_number[assignments_key].append(optuna_trial.number) + self.assignments_to_optuna_number[assignments_key].append( + optuna_trial.number + ) return list_of_assignments @@ -84,13 +88,17 @@ def _tell(self, trials): value = float(trial.target_metric.value) assignments_key = self._get_assignments_key(trial.assignments) - optuna_trial_numbers = self.assignments_to_optuna_number[assignments_key] + optuna_trial_numbers = self.assignments_to_optuna_number[ + assignments_key + ] if len(optuna_trial_numbers) != 0: trial_number = optuna_trial_numbers.pop(0) self.study.tell(trial_number, value) else: - raise ValueError("An unknown trial has been passed in the GetSuggestion request.") + raise ValueError( + "An unknown trial has been passed in the GetSuggestion request." + ) @staticmethod def _get_assignments_key(assignments): @@ -102,9 +110,15 @@ def _get_optuna_search_space(self): search_space = {} for param in self.search_space.params: if param.type == INTEGER: - search_space[param.name] = optuna.distributions.IntDistribution(int(param.min), int(param.max)) + search_space[param.name] = optuna.distributions.IntDistribution( + int(param.min), int(param.max) + ) elif param.type == DOUBLE: - search_space[param.name] = optuna.distributions.FloatDistribution(float(param.min), float(param.max)) + search_space[param.name] = optuna.distributions.FloatDistribution( + float(param.min), float(param.max) + ) elif param.type == CATEGORICAL or param.type == DISCRETE: - search_space[param.name] = optuna.distributions.CategoricalDistribution(param.list) + search_space[param.name] = optuna.distributions.CategoricalDistribution( + param.list + ) return search_space diff --git a/pkg/suggestion/v1beta1/optuna/service.py b/pkg/suggestion/v1beta1/optuna/service.py index a99d0043676..d42987ecf26 100644 --- a/pkg/suggestion/v1beta1/optuna/service.py +++ b/pkg/suggestion/v1beta1/optuna/service.py @@ -18,13 +18,10 @@ import grpc -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer -from pkg.suggestion.v1beta1.internal.search_space import \ - HyperParameterSearchSpace -from pkg.suggestion.v1beta1.internal.trial import Assignment -from pkg.suggestion.v1beta1.internal.trial import Trial +from pkg.suggestion.v1beta1.internal.search_space import HyperParameterSearchSpace +from pkg.suggestion.v1beta1.internal.trial import Assignment, Trial from pkg.suggestion.v1beta1.optuna.base_service import BaseOptunaService logger = logging.getLogger(__name__) diff --git a/pkg/suggestion/v1beta1/pbt/service.py b/pkg/suggestion/v1beta1/pbt/service.py index 38087786e7a..70c69a25348 100644 --- a/pkg/suggestion/v1beta1/pbt/service.py +++ b/pkg/suggestion/v1beta1/pbt/service.py @@ -20,15 +20,14 @@ import grpc import numpy as np -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc -from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer import pkg.suggestion.v1beta1.internal.constant as constant -from pkg.suggestion.v1beta1.internal.search_space import HyperParameter -from pkg.suggestion.v1beta1.internal.search_space import \ - HyperParameterSearchSpace -from pkg.suggestion.v1beta1.internal.trial import Assignment -from pkg.suggestion.v1beta1.internal.trial import Trial +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc +from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer +from pkg.suggestion.v1beta1.internal.search_space import ( + HyperParameter, + HyperParameterSearchSpace, +) +from pkg.suggestion.v1beta1.internal.trial import Assignment, Trial logger = logging.getLogger(__name__) @@ -97,9 +96,11 @@ def GetSuggestions(self, request, context): request.experiment.name, int(settings["n_population"]), float(settings["truncation_threshold"]), - None - if not "resample_probability" in settings - else float(settings["resample_probability"]), + ( + None + if not "resample_probability" in settings + else float(settings["resample_probability"]) + ), search_space, objective_metric, objective_scale, diff --git a/pkg/suggestion/v1beta1/skopt/base_service.py b/pkg/suggestion/v1beta1/skopt/base_service.py index 434c3a8b908..4ac56e30729 100644 --- a/pkg/suggestion/v1beta1/skopt/base_service.py +++ b/pkg/suggestion/v1beta1/skopt/base_service.py @@ -17,11 +17,13 @@ import skopt -from pkg.suggestion.v1beta1.internal.constant import CATEGORICAL -from pkg.suggestion.v1beta1.internal.constant import DISCRETE -from pkg.suggestion.v1beta1.internal.constant import DOUBLE -from pkg.suggestion.v1beta1.internal.constant import INTEGER -from pkg.suggestion.v1beta1.internal.constant import MAX_GOAL +from pkg.suggestion.v1beta1.internal.constant import ( + CATEGORICAL, + DISCRETE, + DOUBLE, + INTEGER, + MAX_GOAL, +) from pkg.suggestion.v1beta1.internal.trial import Assignment logger = logging.getLogger(__name__) @@ -32,13 +34,15 @@ class BaseSkoptService(object): Refer to https://github.com/scikit-optimize/scikit-optimize . """ - def __init__(self, - base_estimator="GP", - n_initial_points=10, - acq_func="gp_hedge", - acq_optimizer="auto", - random_state=None, - search_space=None): + def __init__( + self, + base_estimator="GP", + n_initial_points=10, + acq_func="gp_hedge", + acq_optimizer="auto", + random_state=None, + search_space=None, + ): self.base_estimator = base_estimator self.n_initial_points = n_initial_points self.acq_func = acq_func @@ -56,14 +60,22 @@ def create_optimizer(self): for param in self.search_space.params: if param.type == INTEGER: - skopt_search_space.append(skopt.space.Integer( - int(param.min), int(param.max), name=param.name)) + skopt_search_space.append( + skopt.space.Integer(int(param.min), int(param.max), name=param.name) + ) elif param.type == DOUBLE: - skopt_search_space.append(skopt.space.Real( - float(param.min), float(param.max), "log-uniform", name=param.name)) + skopt_search_space.append( + skopt.space.Real( + float(param.min), + float(param.max), + "log-uniform", + name=param.name, + ) + ) elif param.type == CATEGORICAL or param.type == DISCRETE: skopt_search_space.append( - skopt.space.Categorical(param.list, name=param.name)) + skopt.space.Categorical(param.list, name=param.name) + ) self.skopt_optimizer = skopt.Optimizer( skopt_search_space, @@ -71,20 +83,27 @@ def create_optimizer(self): n_initial_points=self.n_initial_points, acq_func=self.acq_func, acq_optimizer=self.acq_optimizer, - random_state=self.random_state) + random_state=self.random_state, + ) def getSuggestions(self, trials, current_request_number): """ Get the new suggested trials with skopt algorithm. """ logger.info("-" * 100 + "\n") - logger.info("New GetSuggestions call with current request number: {}\n".format(current_request_number)) + logger.info( + "New GetSuggestions call with current request number: {}\n".format( + current_request_number + ) + ) skopt_suggested = [] loss_for_skopt = [] if len(trials) > self.succeeded_trials or self.succeeded_trials == 0: self.succeeded_trials = len(trials) if self.succeeded_trials != 0: - logger.info("Succeeded Trials changed: {}\n".format(self.succeeded_trials)) + logger.info( + "Succeeded Trials changed: {}\n".format(self.succeeded_trials) + ) for trial in trials: if trial.name not in self.recorded_trials_names: self.recorded_trials_names.append(trial.name) @@ -113,11 +132,21 @@ def getSuggestions(self, trials, current_request_number): logger.info("Objective values: {}\n".format(loss_for_skopt)) t1 = datetime.datetime.now() self.skopt_optimizer.tell(skopt_suggested, loss_for_skopt) - logger.info("Optimizer tell method takes {} seconds".format((datetime.datetime.now()-t1).seconds)) - logger.info("List of recorded Trials names: {}\n".format(self.recorded_trials_names)) + logger.info( + "Optimizer tell method takes {} seconds".format( + (datetime.datetime.now() - t1).seconds + ) + ) + logger.info( + "List of recorded Trials names: {}\n".format( + self.recorded_trials_names + ) + ) else: - logger.error("Succeeded Trials didn't change: {}\n".format(self.succeeded_trials)) + logger.error( + "Succeeded Trials didn't change: {}\n".format(self.succeeded_trials) + ) logger.info("Running Optimizer ask to query new parameters for Trials\n") @@ -127,9 +156,12 @@ def getSuggestions(self, trials, current_request_number): for suggestion in skopt_suggested: logger.info("New suggested parameters for Trial: {}".format(suggestion)) return_trial_list.append( - BaseSkoptService.convert(self.search_space, suggestion)) + BaseSkoptService.convert(self.search_space, suggestion) + ) - logger.info("GetSuggestions returns {} new Trials\n\n".format(len(return_trial_list))) + logger.info( + "GetSuggestions returns {} new Trials\n\n".format(len(return_trial_list)) + ) return return_trial_list @staticmethod diff --git a/pkg/suggestion/v1beta1/skopt/service.py b/pkg/suggestion/v1beta1/skopt/service.py index 4ac0041866f..f970a74fabf 100644 --- a/pkg/suggestion/v1beta1/skopt/service.py +++ b/pkg/suggestion/v1beta1/skopt/service.py @@ -16,13 +16,10 @@ import grpc -from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.apis.manager.v1beta1.python import api_pb2_grpc +from pkg.apis.manager.v1beta1.python import api_pb2, api_pb2_grpc from pkg.suggestion.v1beta1.internal.base_health_service import HealthServicer -from pkg.suggestion.v1beta1.internal.search_space import \ - HyperParameterSearchSpace -from pkg.suggestion.v1beta1.internal.trial import Assignment -from pkg.suggestion.v1beta1.internal.trial import Trial +from pkg.suggestion.v1beta1.internal.search_space import HyperParameterSearchSpace +from pkg.suggestion.v1beta1.internal.trial import Assignment, Trial from pkg.suggestion.v1beta1.skopt.base_service import BaseSkoptService logger = logging.getLogger(__name__) @@ -40,7 +37,8 @@ def GetSuggestions(self, request, context): Main function to provide suggestion. """ algorithm_name, config = OptimizerConfiguration.convert_algorithm_spec( - request.experiment.spec.algorithm) + request.experiment.spec.algorithm + ) if self.is_first_run: search_space = HyperParameterSearchSpace.convert(request.experiment) @@ -50,18 +48,22 @@ def GetSuggestions(self, request, context): acq_func=config.acq_func, acq_optimizer=config.acq_optimizer, random_state=config.random_state, - search_space=search_space) + search_space=search_space, + ) self.is_first_run = False trials = Trial.convert(request.trials) - new_trials = self.base_service.getSuggestions(trials, request.current_request_number) + new_trials = self.base_service.getSuggestions( + trials, request.current_request_number + ) return api_pb2.GetSuggestionsReply( parameter_assignments=Assignment.generate(new_trials) ) def ValidateAlgorithmSettings(self, request, context): is_valid, message = OptimizerConfiguration.validate_algorithm_spec( - request.experiment.spec.algorithm) + request.experiment.spec.algorithm + ) if not is_valid: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_details(message) @@ -70,11 +72,14 @@ def ValidateAlgorithmSettings(self, request, context): class OptimizerConfiguration(object): - def __init__(self, base_estimator="GP", - n_initial_points=10, - acq_func="gp_hedge", - acq_optimizer="auto", - random_state=None): + def __init__( + self, + base_estimator="GP", + n_initial_points=10, + acq_func="gp_hedge", + acq_optimizer="auto", + random_state=None, + ): self.base_estimator = base_estimator self.n_initial_points = n_initial_points self.acq_func = acq_func @@ -102,7 +107,9 @@ def validate_algorithm_spec(cls, algorithm_spec): algo_name = algorithm_spec.algorithm_name if algo_name == "bayesianoptimization": - return cls._validate_bayesianoptimization_setting(algorithm_spec.algorithm_settings) + return cls._validate_bayesianoptimization_setting( + algorithm_spec.algorithm_settings + ) else: return False, "unknown algorithm name {}".format(algo_name) @@ -112,23 +119,47 @@ def _validate_bayesianoptimization_setting(cls, algorithm_settings): try: if s.name == "base_estimator": if s.value not in ["GP", "RF", "ET", "GBRT"]: - return False, "base_estimator {} is not supported in Bayesian optimization".format(s.value) + return ( + False, + "base_estimator {} is not supported in Bayesian optimization".format( + s.value + ), + ) elif s.name == "n_initial_points": if not (int(s.value) >= 0): - return False, "n_initial_points should be great or equal than zero" + return ( + False, + "n_initial_points should be great or equal than zero", + ) elif s.name == "acq_func": if s.value not in ["gp_hedge", "LCB", "EI", "PI", "EIps", "PIps"]: - return False, "acq_func {} is not supported in Bayesian optimization".format(s.value) + return ( + False, + "acq_func {} is not supported in Bayesian optimization".format( + s.value + ), + ) elif s.name == "acq_optimizer": if s.value not in ["auto", "sampling", "lbfgs"]: - return False, "acq_optimizer {} is not supported in Bayesian optimization".format(s.value) + return ( + False, + "acq_optimizer {} is not supported in Bayesian optimization".format( + s.value + ), + ) elif s.name == "random_state": if not (int(s.value) >= 0): return False, "random_state should be great or equal than zero" else: - return False, "unknown setting {} for algorithm bayesianoptimization".format(s.name) + return ( + False, + "unknown setting {} for algorithm bayesianoptimization".format( + s.name + ), + ) except Exception as e: - return False, "failed to validate {name}({value}): {exception}".format(name=s.name, value=s.value, - exception=e) + return False, "failed to validate {name}({value}): {exception}".format( + name=s.name, value=s.value, exception=e + ) return True, "" diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py index a58291b2117..18bb0bd26a9 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py @@ -20,13 +20,12 @@ from typing import Any, Callable, Dict, List, Optional, Union import grpc +import kubeflow.katib.katib_api_pb2 as katib_api_pb2 from kubeflow.katib import models from kubeflow.katib.api_client import ApiClient from kubeflow.katib.constants import constants -import kubeflow.katib.katib_api_pb2 as katib_api_pb2 from kubeflow.katib.utils import utils -from kubernetes import client -from kubernetes import config +from kubernetes import client, config logger = logging.getLogger(__name__) diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py index 6df5f2bded4..f02728f4413 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py @@ -1,20 +1,21 @@ import multiprocessing from typing import List, Optional -from unittest.mock import Mock -from unittest.mock import patch - -from kubeflow.katib import KatibClient -from kubeflow.katib import V1beta1AlgorithmSpec -from kubeflow.katib import V1beta1Experiment -from kubeflow.katib import V1beta1ExperimentSpec -from kubeflow.katib import V1beta1FeasibleSpace -from kubeflow.katib import V1beta1ObjectiveSpec -from kubeflow.katib import V1beta1ParameterSpec -from kubeflow.katib import V1beta1TrialParameterSpec -from kubeflow.katib import V1beta1TrialTemplate +from unittest.mock import Mock, patch + +import pytest +from kubeflow.katib import ( + KatibClient, + V1beta1AlgorithmSpec, + V1beta1Experiment, + V1beta1ExperimentSpec, + V1beta1FeasibleSpace, + V1beta1ObjectiveSpec, + V1beta1ParameterSpec, + V1beta1TrialParameterSpec, + V1beta1TrialTemplate, +) from kubeflow.katib.constants import constants from kubernetes.client import V1ObjectMeta -import pytest TEST_RESULT_SUCCESS = "success" diff --git a/sdk/python/v1beta1/kubeflow/katib/api/report_metrics.py b/sdk/python/v1beta1/kubeflow/katib/api/report_metrics.py index 74b7db90505..5e5f2996f5d 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/report_metrics.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/report_metrics.py @@ -12,14 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime -from datetime import timezone import os +from datetime import datetime, timezone from typing import Any, Dict import grpc -from kubeflow.katib.constants import constants import kubeflow.katib.katib_api_pb2 as katib_api_pb2 +from kubeflow.katib.constants import constants from kubeflow.katib.utils import utils diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py index efbe0539e73..a7c70b47c38 100644 --- a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py @@ -1,14 +1,12 @@ import argparse import logging -from kubeflow.katib import ApiClient -from kubeflow.katib import KatibClient -from kubeflow.katib import models +import yaml +from kubeflow.katib import ApiClient, KatibClient, models from kubeflow.katib.constants import constants from kubeflow.katib.utils.utils import FakeResponse from kubernetes import client from verify import verify_experiment_results -import yaml # Experiment timeout is 40 min. EXPERIMENT_TIMEOUT = 60 * 40 diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py index 1ca3596af95..c9d1cb2ee43 100644 --- a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py @@ -1,8 +1,7 @@ import argparse import logging -from kubeflow.katib import KatibClient -from kubeflow.katib import search +from kubeflow.katib import KatibClient, search from kubernetes import client from verify import verify_experiment_results diff --git a/test/e2e/v1beta1/scripts/gh-actions/verify.py b/test/e2e/v1beta1/scripts/gh-actions/verify.py index cbc522d8344..c1514f6da12 100644 --- a/test/e2e/v1beta1/scripts/gh-actions/verify.py +++ b/test/e2e/v1beta1/scripts/gh-actions/verify.py @@ -1,7 +1,6 @@ import time -from kubeflow.katib import KatibClient -from kubeflow.katib import models +from kubeflow.katib import KatibClient, models from kubeflow.katib.constants import constants from kubernetes import client diff --git a/test/unit/v1beta1/suggestion/test_darts_service.py b/test/unit/v1beta1/suggestion/test_darts_service.py index c3cc792ba76..9355364ff2b 100644 --- a/test/unit/v1beta1/suggestion/test_darts_service.py +++ b/test/unit/v1beta1/suggestion/test_darts_service.py @@ -19,9 +19,10 @@ import grpc_testing from pkg.apis.manager.v1beta1.python import api_pb2 -from pkg.suggestion.v1beta1.nas.darts.service import DartsService -from pkg.suggestion.v1beta1.nas.darts.service import \ - validate_algorithm_settings +from pkg.suggestion.v1beta1.nas.darts.service import ( + DartsService, + validate_algorithm_settings, +) class TestDarts(unittest.TestCase): From 39e57ff8e01194ea0cc987bfa82d1a01072856d2 Mon Sep 17 00:00:00 2001 From: Ignas Baranauskas Date: Thu, 22 Aug 2024 15:23:56 +0100 Subject: [PATCH 3/3] Fix flake8 lint issues Signed-off-by: Ignas Baranauskas --- pkg/metricscollector/v1beta1/common/pns.py | 3 +- .../tfevent_loader.py | 9 ++--- .../v1beta1/hyperopt/base_service.py | 14 ++++---- .../v1beta1/internal/search_space.py | 5 ++- pkg/suggestion/v1beta1/internal/trial.py | 13 ++++--- .../v1beta1/nas/common/validation.py | 15 ++++---- pkg/suggestion/v1beta1/nas/enas/service.py | 34 ++++++++++++------- pkg/suggestion/v1beta1/optuna/service.py | 3 +- pkg/suggestion/v1beta1/pbt/service.py | 11 +++--- 9 files changed, 58 insertions(+), 49 deletions(-) diff --git a/pkg/metricscollector/v1beta1/common/pns.py b/pkg/metricscollector/v1beta1/common/pns.py index a4d74beaaaf..24f0882b72b 100644 --- a/pkg/metricscollector/v1beta1/common/pns.py +++ b/pkg/metricscollector/v1beta1/common/pns.py @@ -109,7 +109,8 @@ def WaitPIDs(pids, main_pid, pool_interval, timout, wait_all, completed_marked_d contents = file_obj.read() if contents.strip() != const.TRAINING_COMPLETED: raise Exception( - "Unable to find marker: {} in file: {} with contents: {} for pid: {}".format( + "Unable to find marker: {} in file: {} with contents: {} " + "for pid: {}".format( const.TRAINING_COMPLETED, mark_file, contents, diff --git a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py index 9ba71e3fc67..f41597f9237 100644 --- a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py +++ b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py @@ -13,7 +13,8 @@ # limitations under the License. # TFEventFileParser parses tfevent files and returns an ObservationLog of the metrics specified. -# When the event file is under a directory(e.g. test dir), please specify "{{dirname}}/{{metrics name}}" +# When the event file is under a directory(e.g. test dir), please specify +# "{{dirname}}/{{metrics name}}" # For example, in the Tensorflow MNIST Classification With Summaries: # https://github.com/kubeflow/katib/blob/master/examples/v1beta1/trial-images/tf-mnist-with-summaries/mnist.py. # The "accuracy" and "loss" metric is saved under "train" and "test" directories. @@ -28,10 +29,7 @@ import api_pb2 import rfc3339 import tensorflow as tf -from tensorboard.backend.event_processing.event_accumulator import ( - EventAccumulator, - TensorEvent, -) +from tensorboard.backend.event_processing.event_accumulator import EventAccumulator from tensorboard.backend.event_processing.tag_types import TENSORS from pkg.metricscollector.v1beta1.common import const @@ -53,7 +51,6 @@ def parse_summary(self, tfefile): event_accumulator.Reload() for tag in event_accumulator.Tags()[TENSORS]: for m in self.metric_names: - tfefile_parent_dir = ( os.path.dirname(m) if len(m.split("/")) >= 2 diff --git a/pkg/suggestion/v1beta1/hyperopt/base_service.py b/pkg/suggestion/v1beta1/hyperopt/base_service.py index 82bc9a4a1e2..c794ae6fb60 100644 --- a/pkg/suggestion/v1beta1/hyperopt/base_service.py +++ b/pkg/suggestion/v1beta1/hyperopt/base_service.py @@ -58,7 +58,8 @@ def __init__( self.is_first_run = True def create_hyperopt_domain(self): - # Construct search space, example: {"x": hyperopt.hp.uniform('x', -10, 10), "x2": hyperopt.hp.uniform('x2', -10, 10)} + # Construct search space, example: {"x": hyperopt.hp.uniform('x', -10, 10), "x2": + # hyperopt.hp.uniform('x2', -10, 10)} hyperopt_search_space = {} for param in self.search_space.params: if param.type == INTEGER: @@ -108,7 +109,8 @@ def getSuggestions(self, trials, current_request_number): new_id = self.fmin.trials.new_trial_ids(1) hyperopt_trial_new_ids.append(new_id[0]) hyperopt_trial_miscs_idxs = {} - # Example: {'l1_normalization': [0.1], 'learning_rate': [0.1], 'hidden2': [1], 'optimizer': [1]} + # Example: {'l1_normalization': [0.1], 'learning_rate': [0.1], + # 'hidden2': [1], 'optimizer': [1]} hyperopt_trial_miscs_vals = {} # Insert Trial assignment to the misc @@ -147,7 +149,8 @@ def getSuggestions(self, trials, current_request_number): # TODO: Do we need to analyse additional_metrics? objective_for_hyperopt = float(trial.target_metric.value) if self.search_space.goal == MAX_GOAL: - # Now hyperopt only supports fmin and we need to reverse objective value for maximization + # Now hyperopt only supports fmin and we need to reverse + # objective value for maximization objective_for_hyperopt = -1 * objective_for_hyperopt hyperopt_trial_result = { "loss": objective_for_hyperopt, @@ -156,7 +159,6 @@ def getSuggestions(self, trials, current_request_number): hyperopt_trial_results.append(hyperopt_trial_result) if len(trials) > 0: - # Create new Trial doc hyperopt_trials = hyperopt.Trials().new_trial_docs( tids=hyperopt_trial_new_ids, @@ -226,7 +228,7 @@ def getSuggestions(self, trials, current_request_number): trials=self.fmin.trials, seed=random_state, n_startup_jobs=current_request_number, - **self.algorithm_conf + **self.algorithm_conf, ) self.is_first_run = False else: @@ -239,7 +241,7 @@ def getSuggestions(self, trials, current_request_number): trials=self.fmin.trials, seed=random_state, n_startup_jobs=current_request_number, - **self.algorithm_conf + **self.algorithm_conf, )[0] ) diff --git a/pkg/suggestion/v1beta1/internal/search_space.py b/pkg/suggestion/v1beta1/internal/search_space.py index 6698a160df3..7e920d7c363 100644 --- a/pkg/suggestion/v1beta1/internal/search_space.py +++ b/pkg/suggestion/v1beta1/internal/search_space.py @@ -57,9 +57,8 @@ def convert_to_combinations(search_space): elif parameter.type == DOUBLE: if parameter.step == "" or parameter.step is None: raise Exception( - "Param {} step is nil; For discrete search space, all parameters must include step".format( - parameter.name - ) + "Param {} step is nil; For discrete search space, all parameters " + "must include step".format(parameter.name) ) double_list = np.arange( float(parameter.min), diff --git a/pkg/suggestion/v1beta1/internal/trial.py b/pkg/suggestion/v1beta1/internal/trial.py index 59ec6e8fefb..37d0a6891b0 100644 --- a/pkg/suggestion/v1beta1/internal/trial.py +++ b/pkg/suggestion/v1beta1/internal/trial.py @@ -80,11 +80,14 @@ def __str__(self): ", ".join([str(e) for e in self.assignments]) ) else: - return "Trial(assignment: {}, metric_name: {}, metric: {}, additional_metrics: {})".format( - ", ".join([str(e) for e in self.assignments]), - self.metric_name, - self.target_metric, - ", ".join(str(e) for e in self.additional_metrics), + return ( + "Trial(assignment: {}, metric_name: {}, metric: {}, " + "additional_metrics: {})".format( + ", ".join(str(e) for e in self.assignments), + self.metric_name, + self.target_metric, + ", ".join(str(e) for e in self.additional_metrics), + ) ) diff --git a/pkg/suggestion/v1beta1/nas/common/validation.py b/pkg/suggestion/v1beta1/nas/common/validation.py index b91831e098e..8535d0ee985 100644 --- a/pkg/suggestion/v1beta1/nas/common/validation.py +++ b/pkg/suggestion/v1beta1/nas/common/validation.py @@ -16,10 +16,8 @@ def validate_operations(operations: list[api_pb2.Operation]) -> (bool, str): - # Validate each operation for operation in operations: - # Check OperationType if not operation.operation_type: return False, "Missing operationType in Operation:\n{}".format(operation) @@ -31,7 +29,6 @@ def validate_operations(operations: list[api_pb2.Operation]) -> (bool, str): # Validate each ParameterConfig in Operation parameters_list = list(operation.parameter_specs.parameters) for parameter in parameters_list: - # Check Name if not parameter.name: return False, "Missing Name in ParameterConfig:\n{}".format(parameter) @@ -78,16 +75,16 @@ def validate_operations(operations: list[api_pb2.Operation]) -> (bool, str): ): return ( False, - "Step parameter should be > 0 in ParameterConfig.feasibleSpace:\n{}".format( - parameter - ), + "Step parameter should be > 0 in ParameterConfig.feasibleSpace:\n" + "{}".format(parameter), ) except Exception as e: return ( False, - "failed to validate ParameterConfig.feasibleSpace \n{parameter}):\n{exception}".format( - parameter=parameter, exception=e - ), + ( + "failed to validate ParameterConfig.feasibleSpace \n" + "{parameter}):\n{exception}" + ).format(parameter=parameter, exception=e), ) return True, "" diff --git a/pkg/suggestion/v1beta1/nas/enas/service.py b/pkg/suggestion/v1beta1/nas/enas/service.py index 029d7511216..94534d3f076 100644 --- a/pkg/suggestion/v1beta1/nas/enas/service.py +++ b/pkg/suggestion/v1beta1/nas/enas/service.py @@ -237,7 +237,10 @@ def ValidateAlgorithmSettings(self, request, context): ): return self.set_validate_context_error( context, - "Algorithm Setting {}: {} with {} type must be in range ({}, {}]".format( + ( + "Algorithm Setting {}: {} with {} type must be in range " + "({}, {})" + ).format( setting.name, converted_value, setting_type.__name__, @@ -309,7 +312,8 @@ def GetSuggestions(self, request, context): if self.is_first_run: self.logger.info( - ">>> First time running suggestion for {}. Random architecture will be given.".format( + ">>> First time running suggestion for {}. " + "Random architecture will be given.".format( experiment.experiment_name ) ) @@ -319,7 +323,8 @@ def GetSuggestions(self, request, context): for _ in range(experiment.num_trials): candidates.append(sess.run(controller_ops["sample_arc"])) - # TODO: will use PVC to store the checkpoint to protect against unexpected suggestion pod restart + # TODO: will use PVC to store the checkpoint to protect + # against unexpected suggestion pod restart saver.save(sess, experiment.ctrl_cache_file) self.is_first_run = False @@ -331,17 +336,22 @@ def GetSuggestions(self, request, context): result = self.GetEvaluationResult(request.trials) # TODO: (andreyvelich) I deleted this part, should it be handle by controller? - # Sometimes training container may fail and GetEvaluationResult() will return None + # Sometimes training container may fail and GetEvaluationResult() + # will return None # In this case, the Suggestion will: - # 1. Firstly try to respawn the previous trials after waiting for RESPAWN_SLEEP seconds - # 2. If respawning the trials for RESPAWN_LIMIT times still cannot collect valid results, - # then fail the task because it may indicate that the training container has errors. + # 1. Firstly try to respawn the previous trials after waiting for + # RESPAWN_SLEEP seconds + # 2. If respawning the trials for RESPAWN_LIMIT times still cannot + # collect valid results, + # then fail the task because it may indicate that the training + # container has errors. if result is None: self.logger.warning( ">>> Suggestion has spawned trials, but they all failed." ) self.logger.warning( - ">>> Please check whether the training container is correctly implemented" + ">>> Please check whether the training container " + "is correctly implemented" ) self.logger.info( ">>> Experiment {} failed".format( @@ -351,7 +361,8 @@ def GetSuggestions(self, request, context): return [] # This LSTM network is designed to maximize the metrics - # However, if the user wants to minimize the metrics, we can take the negative of the result + # However, if the user wants to minimize the metrics, + # we can take the negative of the result if experiment.opt_direction == api_pb2.MINIMIZE: result = -result @@ -426,9 +437,8 @@ def GetSuggestions(self, request, context): nn_config_str = str(nn_config_json).replace('"', "'") self.logger.info( - "\n>>> New Neural Network Architecture Candidate #{} (internal representation):".format( - i - ) + "\n>>> New Neural Network Architecture Candidate #{} " + "(internal representation):".format(i) ) self.logger.info(organized_arc_json) self.logger.info("\n>>> Corresponding Seach Space Description:") diff --git a/pkg/suggestion/v1beta1/optuna/service.py b/pkg/suggestion/v1beta1/optuna/service.py index d42987ecf26..c793e615727 100644 --- a/pkg/suggestion/v1beta1/optuna/service.py +++ b/pkg/suggestion/v1beta1/optuna/service.py @@ -244,7 +244,8 @@ def _validate_grid_setting(cls, experiment): if max_trial_count > num_combinations: return ( False, - "Max Trial Count: {max_trial} > all possible search combinations: {combinations}".format( + "Max Trial Count: {max_trial} > all possible search combinations: " + "{combinations}".format( max_trial=max_trial_count, combinations=num_combinations ), ) diff --git a/pkg/suggestion/v1beta1/pbt/service.py b/pkg/suggestion/v1beta1/pbt/service.py index 70c69a25348..7791390eb59 100644 --- a/pkg/suggestion/v1beta1/pbt/service.py +++ b/pkg/suggestion/v1beta1/pbt/service.py @@ -27,7 +27,7 @@ HyperParameter, HyperParameterSearchSpace, ) -from pkg.suggestion.v1beta1.internal.trial import Assignment, Trial +from pkg.suggestion.v1beta1.internal.trial import Assignment logger = logging.getLogger(__name__) @@ -70,7 +70,8 @@ def ValidateAlgorithmSettings(self, request, context): ): return self._set_validate_context_error( context, - "Param(resample_probability) should be null to perturb at 0.8 or 1.2, or be between 0 and 1, inclusive, to resample", + "Param(resample_probability) should be null to perturb at 0.8 or 1.2, " + "or be between 0 and 1, inclusive, to resample", ) return api_pb2.ValidateAlgorithmSettingsReply() @@ -98,7 +99,7 @@ def GetSuggestions(self, request, context): float(settings["truncation_threshold"]), ( None - if not "resample_probability" in settings + if "resample_probability" not in settings else float(settings["resample_probability"]) ), search_space, @@ -184,7 +185,7 @@ def get(self): labels = { "pbt.suggestion.katib.kubeflow.org/generation": self.generation, } - if not self.parent is None: + if self.parent is not None: labels["pbt.suggestion.katib.kubeflow.org/parent"] = self.parent return assignments, labels, self.uid @@ -284,9 +285,7 @@ def get(self): return obj.get() def update(self, trial): - trial_labels = trial.spec.labels uid = trial.name - generation = trial_labels["pbt.suggestion.katib.kubeflow.org/generation"] # Do not update active/pending trials if trial.status.condition in (