diff --git a/examples/plot_hsfa.py b/examples/plot_hsfa.py index e0880cc..28fcb2c 100644 --- a/examples/plot_hsfa.py +++ b/examples/plot_hsfa.py @@ -36,11 +36,11 @@ test_gt = ground_truth[split_idx:] # Preparing the HSFA-network: -# - each layer needs a 5-tuple for configuration -# - each 5-tuple contains (kernel_width, kernel_height, stride_width, stride_height, n_features) +# - each layer needs a 6-tuple for configuration +# - each 6-tuple contains (kernel_width, kernel_height, stride_width, stride_height, n_features, expansion_degree) # The final layer will always be a full connected SFA layer -layer_configurations = [(8, 8, 8, 8, 8), - (2, 2, 2, 2, 8)] +layer_configurations = [(8, 8, 8, 8, 8, 1), + (2, 2, 2, 2, 8, 2)] hsfa = HSFA(n_components=2, input_shape=data.shape[1:], diff --git a/sksfa/_hsfa.py b/sksfa/_hsfa.py index 4fd4b70..b501b0c 100644 --- a/sksfa/_hsfa.py +++ b/sksfa/_hsfa.py @@ -45,7 +45,7 @@ def transform(self, X): class HSFA: """Hierarchical Slow Feature Analysis (HSFA). - A network of quadratic SFA estimators interlaced with receptive field transformers + A network of SFA estimators interlaced with receptive field transformers and linear SFA estimators for intermediate pre-expansion dimensionality reduction. This can deal with high-dimensional image time-series significantly better than standard (non-linear) SFA by using receptive fields to slice the images in a way @@ -66,15 +66,16 @@ class HSFA: that. ---------- - def __init__(self, n_components, input_shape, layer_configurations, internal_batch_size=50, noise_std=0.05, verbose=False): n_components : int Number of features extracted by the complete network. + final_degree : int, default=2 + The degree of the final layer's polynomial expansion. input_shape : tuple (int) The shape of a single input (i.e., without sample-dimension) to the input layer. - layer_configurations : list of 5-tuples + layer_configurations : list of 6-tuples A list of tuples to configure the intermediate layers. Each tuple needs to contain: - (field_width, field_height, stride_width, stride_height, n_intermediate_components) + (field_width, field_height, stride_width, stride_height, n_intermediate_components, polynomial_degree) internal_batch_size : int, default=50 The size of mini-batches used internally. This should not be chosen too small as the SFA nodes at this point do not respect connections between batches. @@ -117,13 +118,13 @@ def __init__(self, n_components, input_shape, layer_configurations, internal_bat >>> ordered_cosines = np.cos(t) >>> mixed_cosines = np.dot(ordered_cosines, np.random.normal(0, 1, (dimension, dimension))) >>> mixed_cosines = mixed_cosines.reshape(n_samples, image_width, image_height, 1) - >>> layer_configurations = [(5, 5, 5, 5, 4)] + >>> layer_configurations = [(5, 5, 5, 5, 4, 1)] >>> >>> hsfa = HSFA(2, mixed_cosines.shape[1:], layer_configurations, noise_std=0.1) >>> hsfa = hsfa.fit(mixed_cosines) >>> unmixed_cosines = hsfa.transform(mixed_cosines) """ - def __init__(self, n_components, input_shape, layer_configurations, internal_batch_size=50, noise_std=0.05, verbose=False): + def __init__(self, n_components, input_shape, layer_configurations, final_degree=2, internal_batch_size=50, noise_std=0.05, verbose=False): self.layer_configurations = layer_configurations self.verbose = verbose self.input_shape = input_shape @@ -132,24 +133,31 @@ def __init__(self, n_components, input_shape, layer_configurations, internal_bat self.noise_std = noise_std self.sequence = [] self.layer_outputs = [] + self.final_degree = final_degree self.initialize_layers() def initialize_layers(self): # First layer does not need reconstructor - field_w, field_h, stride_w, stride_h, n_components = self.layer_configurations[0] + field_w, field_h, stride_w, stride_h, n_components, poly_degree = self.layer_configurations[0] try: slicer = ReceptiveSlicer(input_shape=self.input_shape, field_size=(field_w, field_h), strides=(stride_w, stride_h)) except AssertionError: raise ValueError(f"Layer 1: Field ({field_w}, {field_h}) with stride ({stride_w}, {stride_h}) does not fit data dimension ({self.input_shape[0]}, {self.input_shape[1]})") self.sequence.append(slicer) - sfa = SFA(n_components, batch_size=self.internal_batch_size) - self.sequence.append(sfa) + if poly_degree > 1: + sfa = SFA(n_components, batch_size=self.internal_batch_size, fill_mode=None) + self.sequence.append(sfa) + expansion = PolynomialFeatures(poly_degree) + expansion.partial = expansion.fit + self.sequence.append(expansion) + post_expansion_sfa = SFA(n_components, batch_size=self.internal_batch_size, fill_mode=None) + self.sequence.append(post_expansion_sfa) reconstructor = ReceptiveRebuilder((slicer.reconstruction_shape)) if self.verbose: print(slicer.reconstruction_shape) self.layer_outputs.append(slicer.reconstruction_shape) self.sequence.append(reconstructor) - for build_idx, (field_w, field_h, stride_w, stride_h, n_components) in enumerate(self.layer_configurations[1:]): + for build_idx, (field_w, field_h, stride_w, stride_h, n_components, poly_degree) in enumerate(self.layer_configurations[1:]): if (field_w == field_h == -1): field_w = slicer.reconstruction_shape[0] field_h = slicer.reconstruction_shape[1] @@ -161,11 +169,12 @@ def initialize_layers(self): print(slicer.reconstruction_shape) self.layer_outputs.append(slicer.reconstruction_shape) self.sequence.append(slicer) - pre_expansion_sfa = SFA(n_components, batch_size=self.internal_batch_size, fill_mode=None) - self.sequence.append(pre_expansion_sfa) - expansion = PolynomialFeatures(2) - expansion.partial = expansion.fit - self.sequence.append(expansion) + if poly_degree > 1: + pre_expansion_sfa = SFA(n_components, batch_size=self.internal_batch_size, fill_mode=None) + self.sequence.append(pre_expansion_sfa) + expansion = PolynomialFeatures(poly_degree) + expansion.partial = expansion.fit + self.sequence.append(expansion) self.sequence.append(AdditiveNoise(self.noise_std)) post_expansion_sfa = SFA(n_components, batch_size=self.internal_batch_size, fill_mode=None) self.sequence.append(post_expansion_sfa) @@ -173,11 +182,12 @@ def initialize_layers(self): reconstructor = ReceptiveRebuilder((slicer.reconstruction_shape)) self.sequence.append(reconstructor) self.sequence.append(Flatten()) - pre_expansion_sfa = SFA(n_components, batch_size=self.internal_batch_size, fill_mode=None) - self.sequence.append(pre_expansion_sfa) - expansion = PolynomialFeatures(2) - expansion.partial = expansion.fit - self.sequence.append(expansion) + if self.final_degree > 1: + pre_expansion_sfa = SFA(n_components, batch_size=self.internal_batch_size, fill_mode=None) + self.sequence.append(pre_expansion_sfa) + expansion = PolynomialFeatures(self.final_degree) + expansion.partial = expansion.fit + self.sequence.append(expansion) self.sequence.append(AdditiveNoise(self.noise_std)) post_expansion_sfa = SFA(self.n_components, batch_size=self.internal_batch_size, fill_mode=None) if self.verbose: @@ -249,9 +259,9 @@ def summary(self): print() print("Input Layer:") print(f"\tinput shape: \t\t{self.input_shape}") - for layer_idx, (field_w, field_h, stride_w, stride_h, n_components) in enumerate(self.layer_configurations): + for layer_idx, (field_w, field_h, stride_w, stride_h, n_components, poly_degree) in enumerate(self.layer_configurations): print(f"Layer {layer_idx + 1}:") - print(f"\treceptive field: \t({field_w}, {field_h})\n\tstrides: \t\t({stride_w}, {stride_h})") + print(f"\treceptive field: \t({field_w}, {field_h})\n\tstrides: \t\t({stride_w}, {stride_h})\n\texpansion degree: \t{poly_degree}") output_shape = self.layer_outputs[layer_idx] print(f"\toutput shape: \t\t{output_shape + (n_components,)}") print(f"Final Layer:")