diff --git a/plugins/train/model/_base/model.py b/plugins/train/model/_base/model.py index 7915cd1d30a..d3d3f6202dd 100644 --- a/plugins/train/model/_base/model.py +++ b/plugins/train/model/_base/model.py @@ -11,11 +11,9 @@ import time import typing as T -from collections import OrderedDict - -import numpy as np import keras +from lib.logger import parse_class_init from lib.serializer import get_serializer from lib.model.nn_blocks import set_config as set_nnblock_config from lib.utils import FaceswapError @@ -65,9 +63,7 @@ def __init__(self, model_dir: str, arguments: argparse.Namespace, predict: bool = False) -> None: - logger.debug("Initializing ModelBase (%s): (model_dir: '%s', arguments: %s, predict: %s)", - self.__class__.__name__, model_dir, arguments, predict) - + logger.debug(parse_class_init(locals())) # Input shape must be set within the plugin after initializing self.input_shape: tuple[int, ...] = () self.trainer = "original" # Override for plugin specific trainer @@ -75,7 +71,7 @@ def __init__(self, self._args = arguments self._is_predict = predict - self._model: keras.models.Model | None = None + self._model: keras.Model | None = None self._configfile = arguments.configfile if hasattr(arguments, "configfile") else None self._load_config() @@ -105,8 +101,8 @@ def __init__(self, logger.debug("Initialized ModelBase (%s)", self.__class__.__name__) @property - def model(self) -> keras.models.Model: - """:class:`Keras.models.Model`: The compiled model for this plugin. """ + def model(self) -> keras.Model: + """:class:`keras.Model`: The compiled model for this plugin. """ return self._model @property @@ -292,7 +288,7 @@ def _get_inputs(self) -> list[keras.layers.Input]: logger.debug("inputs: %s", inputs) return inputs - def build_model(self, inputs: list[keras.layers.Input]) -> keras.models.Model: + def build_model(self, inputs: list[keras.layers.Input]) -> keras.Model: """ Override for Model Specific autoencoder builds. Parameters @@ -303,7 +299,7 @@ def build_model(self, inputs: list[keras.layers.Input]) -> keras.models.Model: Returns ------- - :class:`keras.models.Model` + :class:`keras.Model` See Keras documentation for the correct structure, but note that parameter :attr:`name` is a required rather than an optional argument in Faceswap. You should assign this to the attribute ``self.name`` that is automatically generated from the plugin's filename. @@ -390,9 +386,7 @@ def __init__(self, model_name: str, config_changeable_items: dict, no_logs: bool) -> None: - logger.debug("Initializing %s: (model_dir: '%s', model_name: '%s', " - "config_changeable_items: '%s', no_logs: %s", self.__class__.__name__, - model_dir, model_name, config_changeable_items, no_logs) + logger.debug(parse_class_init(locals())) self._serializer = get_serializer("json") filename = f"{model_name}_state.{self._serializer.file_extension}" self._filename = os.path.join(model_dir, filename) @@ -737,169 +731,252 @@ class Inference(): Parameters ---------- - saved_model: :class:`keras.models.Model` + saved_model: :class:`keras.Model` The saved trained Faceswap model switch_sides: bool ``True`` if the swap should be performed "B" > "A" ``False`` if the swap should be "A" > "B" """ - def __init__(self, saved_model: keras.models.Model, switch_sides: bool) -> None: - logger.debug("Initializing: %s (saved_model: %s, switch_sides: %s)", - self.__class__.__name__, saved_model, switch_sides) - self._config = saved_model.get_config() + def __init__(self, saved_model: keras.Model, switch_sides: bool) -> None: + logger.debug(parse_class_init(locals())) + + self._layers: list[keras.Layer] = [lyr for lyr in saved_model.layers + if not isinstance(lyr, keras.layers.InputLayer)] + """list[:class:`keras.layers.Layer]: All the layers that exist within the model excluding + input layers """ + + self._input = self._get_model_input(saved_model, switch_sides) + """:class:`keras.KerasTensor`: The correct input for the inference model """ - self._input_idx = 1 if switch_sides else 0 - self._output_idx = 0 if switch_sides else 1 + self._name = f"{saved_model.name}_inference" + """str: The name for the final inference model""" - self._input_names = [inp[0] for inp in self._config["input_layers"]] - self._model = self._make_inference_model(saved_model) + self._model = self._build() logger.debug("Initialized: %s", self.__class__.__name__) @property - def model(self) -> keras.models.Model: - """ :class:`keras.models.Model`: The Faceswap model, compiled for inference. """ + def model(self) -> keras.Model: + """ :class:`keras.Model`: The Faceswap model, compiled for inference. """ return self._model - def _get_nodes(self, nodes: np.ndarray) -> list[tuple[str, int]]: - """ Given in input list of nodes from a :attr:`keras.models.Model.get_config` dictionary, - filters the layer name(s) and output index of the node, splitting to the correct output - index in the event of multiple inputs. + def _get_model_input(self, model: keras.Model, switch_sides: bool) -> list[keras.KerasTensor]: + """ Obtain the inputs for the requested swap direction. Parameters ---------- - nodes: list - A node entry from the :attr:`keras.models.Model.get_config` dictionary + saved_model: :class:`keras.Model` + The saved trained Faceswap model + switch_sides: bool + ``True`` if the swap should be performed "B" > "A" ``False`` if the swap should be + "A" > "B" Returns ------- - list - The (node name, output index) for each node passed in + list[]:class:`keras.KerasTensor`] + The input tensor to feed the model for the requested swap direction """ - anodes = np.array(nodes, dtype="object")[..., :3] - num_layers = anodes.shape[0] - anodes = anodes[self._output_idx] if num_layers == 2 else anodes[0] - - # Probably better checks for this, but this occurs when DNY preset is used and learn - # mask is enabled (i.e. the mask is created in fully connected layers) - anodes = anodes.squeeze() if anodes.ndim == 3 else anodes - - retval = [(node[0], node[2]) for node in anodes] - return retval - - def _make_inference_model(self, saved_model: keras.models.Model) -> keras.models.Model: - """ Extract the sub-models from the saved model that are required for inference. + inputs: list[keras.KerasTensor] = model.input + assert len(inputs) == 2, "Faceswap models should have exactly 2 inputs" + idx = 0 if switch_sides else 1 + retval = inputs[idx] + logger.debug("model inputs: %s, idx: %s, inference_input: '%s'", + [(i.name, i.shape[1:]) for i in inputs], idx, retval.name) + return [retval] + + def _get_candidates(self, input_tensors: list[keras.KerasTensor | keras.Layer] + ) -> T.Generator[tuple[keras.Layer, list[keras.src.ops.node.KerasHistory]], + None, None]: + """ Given a list of input tensors, get all layers from the main model which have the given + input tensors marked as Inbound nodes for the model Parameters ---------- - saved_model: :class:`keras.models.Model` - The saved trained Faceswap model + input_tensors: list[:class:`keras.KerasTensor` | :class:`keras.Layer`] + List of Tensors that act as an input to a layer within the model - Returns - ------- - :class:`keras.models.Model` - The model compiled for inference + Yields + ------ + tuple[:class:`keras.KerasLayer`, list[:class:`keras.src.ops.node.KerasHistory'] + Any layer in the main model that use the given input tensors as an input along with the + corresponding keras inbound history """ - logger.debug("Compiling inference model. saved_model: %s", saved_model) - struct = self._get_filtered_structure() - model_inputs = self._get_inputs(saved_model.inputs) - compiled_layers: dict[str, keras.layers.Layer] = {} - for layer in saved_model.layers: - if layer.name not in struct: - logger.debug("Skipping unused layer: '%s'", layer.name) + unique_input_names = set(i.name for i in input_tensors) + for layer in self._layers: + + history = [tensor._keras_history # pylint:disable=protected-access + for node in layer._inbound_nodes # pylint:disable=protected-access + for parent in node.parent_nodes + for tensor in parent.outputs] + + unique_inbound_names = set(h.operation.name for h in history) + if not unique_input_names.issubset(unique_inbound_names): + logger.debug("%s: Skipping candidate '%s' unmatched inputs: %s", + unique_input_names, layer.name, unique_inbound_names) continue - inbound = struct[layer.name] - logger.debug("Processing layer '%s': (layer: %s, inbound_nodes: %s)", - layer.name, layer, inbound) - if not inbound: - model = model_inputs - logger.debug("Adding model inputs %s: %s", layer.name, model) - else: - layer_inputs = [] - for inp in inbound: - inbound_layer = compiled_layers[inp[0]] - if isinstance(inbound_layer, list) and len(inbound_layer) > 1: - # Multi output inputs - inbound_output_idx = inp[1] - next_input = inbound_layer[inbound_output_idx] - logger.debug("Selecting output index %s from multi output inbound layer: " - "%s (using: %s)", inbound_output_idx, inbound_layer, - next_input) - else: - next_input = inbound_layer - - layer_inputs.append(next_input) - - logger.debug("Compiling layer '%s': layer inputs: %s", layer.name, layer_inputs) - model = layer(layer_inputs) - compiled_layers[layer.name] = model - retval = keras.models.Model(model_inputs, model, name=f"{saved_model.name}_inference") - logger.debug("Compiled inference model '%s': %s", retval.name, retval) - return retval - def _get_filtered_structure(self) -> OrderedDict: - """ Obtain the structure of the inference model. + logger.debug("%s: Yielding candidate '%s'. History: %s", + unique_input_names, layer.name, [(h.operation.name, h.node_index) + for h in history]) + yield layer, history + + @T.overload + def _group_inputs(self, layer: keras.Layer, inputs: list[tuple[keras.Layer, int]] + ) -> list[list[tuple[keras.Layer, int]]]: + ... - This parses the model config (in reverse) to obtain the required layers for an inference - model. + @T.overload + def _group_inputs(self, layer: keras.Layer, inputs: list[keras.src.ops.node.KerasHistory] + ) -> list[list[keras.src.ops.node.KerasHistory]]: + ... + + def _group_inputs(self, layer, inputs): + """ Layers can have more than one input. In these instances we need to group the inputs + and the layers' inbound nodes to correspond to inputs per instance. + + Parameters + ---------- + layer: :class:`keras.Layer` + The current layer being processed + inputs: list[:class:`keras.KerasTensor`] | list[:class:`keras.src.ops.node.KerasHistory`] + List of input tensors or inbound keras histories to be grouped per layer input Returns ------- - :class:`collections.OrderedDict` - The layer name as key with the input name and output index as value. + list[list[tuple[:class:`keras.Layer`, int]]] | + list[list[:class:`keras.src.ops.node.KerasHistory`] + A list of list of input layers and the corresponding node index or inbound keras + histories """ - # Filter output layer - out = np.array(self._config["output_layers"], dtype="object") - if out.ndim == 2: - out = np.expand_dims(out, axis=1) # Needs to be expanded for _get_nodes - outputs = self._get_nodes(out) - - # Iterate backwards from the required output to get the reversed model structure - current_layers = [outputs[0]] - next_layers = [] - struct = OrderedDict() - drop_input = self._input_names[abs(self._input_idx - 1)] - switch_input = self._input_names[self._input_idx] - while True: - layer_info = current_layers.pop(0) - current_layer = next(lyr for lyr in self._config["layers"] - if lyr["name"] == layer_info[0]) - inbound = current_layer["inbound_nodes"] + layer_inputs = 1 if isinstance(layer.input, keras.KerasTensor) else len(layer.input) + num_inputs = len(inputs) - if not inbound: - break + total_calls = num_inputs / layer_inputs + assert total_calls.is_integer() + total_calls = int(total_calls) - inbound_info = self._get_nodes(inbound) + retval = [inputs[i * layer_inputs: i * layer_inputs + layer_inputs] + for i in range(total_calls)] - if any(inb[0] == drop_input for inb in inbound_info): # Switch inputs - inbound_info = [(switch_input if inb[0] == drop_input else inb[0], inb[1]) - for inb in inbound_info] - struct[layer_info[0]] = inbound_info - next_layers.extend(inbound_info) + return retval - if not current_layers: - current_layers = next_layers - next_layers = [] + def _layers_from_inputs(self, + input_tensors: list[keras.KerasTensor | keras.Layer], + node_indices: list[int] + ) -> tuple[list[keras.Layer], + list[keras.src.ops.node.KerasHistory], + list[int]]: + """ Given a list of input tensors and their corresponding inbound node ids, return all of + the layers for the model that uses the given nodes as their input - struct[switch_input] = [] # Add the input layer - logger.debug("Model structure: %s", struct) - return struct + Parameters + ---------- + input_tensors: list[:class:`keras.KerasTensor` | :class:`keras.Layer`] + List of Tensors that act as an input to a layer within the model + node_indices: list[int] + The list of node indices corresponding to the inbound node index of the given layers - def _get_inputs(self, inputs: list) -> list: - """ Obtain the inputs for the requested swap direction. + Returns + ------- + list[:class:`keras.layers.Layer`] + Any layers from the model that use the given inputs as its input. Empty list if there + are no matches + list[:class:`keras.src.ops.node.KerasHistory`] + The keras inbound history for the layers + list[int] + The output node index for the layer, used for the inbound node index of the next layer + """ + retval: tuple[list[keras.Layer], + list[keras.src.ops.node.KerasHistory], + list[int]] = ([], [], []) + for layer, history in self._get_candidates(input_tensors): + grp_inputs = self._group_inputs(layer, list(zip(input_tensors, node_indices))) + grp_hist = self._group_inputs(layer, history) + + for input_group in grp_inputs: # pylint:disable=not-an-iterable + have = [(i[0].name, i[1]) for i in input_group] + for out_idx, hist in enumerate(grp_hist): + requires = [(h.operation.name, h.node_index) for h in hist] + if sorted(have) != sorted(requires): + logger.debug("%s: Skipping '%s'. Requires %s. Output node index: %s", + have, layer.name, requires, out_idx) + continue + retval[0].append(layer) + retval[1].append(hist) + retval[2].append(out_idx) + + logger.debug("Got layers %s for input_tensors: %s", + [x.name for x in retval[0]], [t.name for t in input_tensors]) + return retval + + def _build_layers(self, + layers: list[keras.Layer], + history: list[keras.src.ops.node.KerasHistory], + inputs: list[keras.KerasTensor]) -> list[keras.KerasTensor]: + """ Compile the given layers with the given inputs Parameters ---------- - inputs: list - The full list of input tensors to the saved faceswap training model + layers: list[:class:`keras.Layer`] + The layers to be called with the given inputs + history: list[:class:`keras.src.ops.node.KerasHistory`] + The corresponding keras inbound history for the layers + inputs: list[:class:`keras.KerasTensor] + The inputs for the given layers Returns ------- - list - List of input tensors to feed the model for the requested swap direction + list[:class:`keras.KerasTensor`] + The list of compiled layers """ - input_split = len(inputs) // 2 - start_idx = input_split * self._input_idx - retval = inputs[start_idx: start_idx + input_split] - logger.debug("model inputs: %s, input_split: %s, start_idx: %s, inference_inputs: %s", - inputs, input_split, start_idx, retval) + retval = [] + given_order = [i._keras_history.operation.name # pylint:disable=protected-access + for i in inputs] + for layer, hist in zip(layers, history): + layer_input = [inputs[given_order.index(h.operation.name)] + for h in hist if h.operation.name in given_order] + if layer_input != inputs: + logger.debug("Sorted layer inputs %s to %s", + given_order, + [i._keras_history.operation.name # pylint:disable=protected-access + for i in layer_input]) + built = layer(layer_input) + built = built if isinstance(built, list) else [built] + + logger.debug( + "Compiled layer '%s' from inputs %s", + layer.name, + [i._keras_history.operation.name # pylint:disable=protected-access + for i in layer_input]) + retval.extend(built) + + logger.debug( + "Compiled layers %s from input %s", + [x._keras_history.operation.name for x in retval], # pylint:disable=protected-access + [x._keras_history.operation.name for x in inputs]) # pylint:disable=protected-access + return retval + + def _build(self): + """ Extract the sub-models from the saved model that are required for inference. + + Returns + ------- + :class:`keras.Model` + The model compiled for inference + """ + logger.debug("Compiling inference model") + + layers = self._input + node_index = [0] + built = layers + + while True: + layers, history, node_index = self._layers_from_inputs(layers, node_index) + if not layers: + break + + built = self._build_layers(layers, history, built) + + retval = keras.Model(inputs=self._input, outputs=built, name=self._name) + logger.debug("Compiled inference model '%s': %s", retval.name, retval) + return retval diff --git a/tools/model/model.py b/tools/model/model.py index 9cb58fdda1b..c78c350f152 100644 --- a/tools/model/model.py +++ b/tools/model/model.py @@ -131,7 +131,9 @@ def _get_output_file(self, model_dir: str) -> tuple[str, str]: str The full path to the inference model save location """ - model_name = next(fname for fname in os.listdir(model_dir) if fname.endswith(".keras")) + model_name = next(fname for fname in os.listdir(model_dir) + if fname.endswith(".keras") + and not fname.endswith("_inference.keras")) in_path = os.path.join(model_dir, model_name) logger.debug("Model input path: '%s'", in_path)