Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustering utilities #770

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
11 changes: 6 additions & 5 deletions src/graphnet/models/graphs/nodes/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from graphnet.utilities.decorators import final
from graphnet.models import Model
from graphnet.models.graphs.utils import (
cluster_summarize_with_percentiles,
cluster_and_pad,
identify_indices,
lex_sort,
ice_transparency,
Expand Down Expand Up @@ -198,13 +198,14 @@ def _construct_nodes(self, x: torch.Tensor) -> Data:
x = x.numpy()
# Construct clusters with percentile-summarized features
if hasattr(self, "_summarization_indices"):
array = cluster_summarize_with_percentiles(
x=x,
cluster_class = cluster_and_pad(
x=x, cluster_columns=self._cluster_indices
)
array = cluster_class.add_percentile_summary(
summarization_indices=self._summarization_indices,
cluster_indices=self._cluster_indices,
percentiles=self._percentiles,
add_counts=self._add_counts,
)
array = cluster_class.add_counts()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the resulting node definition will always have counts added as a node feature - but in the arguments to PercentileCluster we allow the user to specify if they want this. The new implementation should respect this.

Would recommend a simple change such as:

if self._add_counts:
    array = cluster_class.add_counts()

else:
self.error(
f"""{self.__class__.__name__} was not instatiated with
Expand Down
254 changes: 253 additions & 1 deletion src/graphnet/models/graphs/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Utility functions for construction of graphs."""

from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, Union
import os
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -113,6 +113,8 @@ def identify_indices(
return cluster_indices, summarization_indices, features_for_summarization


# TODO Remove this function as it is superseded by
# cluster_and_pad wich has the same functionality
def cluster_summarize_with_percentiles(
x: np.ndarray,
summarization_indices: List[int],
Expand Down Expand Up @@ -149,6 +151,11 @@ def cluster_summarize_with_percentiles(
Returns:
Percentile-summarized array
"""
print(
"This function is deprecated and will be removed,",
"use the class cluster_and_pad with add_percentile_summary",
"instead for the same functionality",
)
Copy link
Collaborator

@pweigel pweigel Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's good to warn the user that this function is deprecated, this will print for every call (lots of printing). I am not sure what a good solution is here. Maybe it can just be removed immediately since the nodes no longer use it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! I missed that.

pct_dict = {}
for feature_idx in summarization_indices:
summarized_array, column_offset, counts = gather_cluster_sequence(
Expand All @@ -172,6 +179,251 @@ def cluster_summarize_with_percentiles(
return array


class cluster_and_pad:
"""Cluster and pad the data for further summarization."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets expand the doc string here to make it clear that the class stores a clustered tensor that is manipulated when the public methods are called. Here's a suggestion:

"""
Cluster the input data according to a set of specified columns and 
compute aggregate statistics of cluster attributes.

The method will cluster the input data according to unique combinations of values in 
the specified `cluster_columns` and store the clustered tensor. The clustered tensor can
be manipulated further by calling public methods that add statistical quantities to the 
tensor of the values in each cluster. The order in which the public methods are called
determines the order of manipulations. 

Example:
# Build cluster of single event on first three columns
cluster_class = cluster_and_pad(x = single_event_as_array, cluster_columns = [0,1,2])

# Summarize columns 3 and 4 from original array on each cluster with 10th, 50th and 90th percentiles
clustered_array_with_percentile_summary = cluster_class.percentile_summarize(summarization_indices =[3,4], percentiles = [10, 50, 90])
# Add the std of the 5th column to the array above
clustered_array_with_percentile_summary_and_std = cluster_class.add_std(column = 5)
"""


def __init__(self, x: np.ndarray, cluster_columns: List[int]) -> None:
"""Initialize the class with the data and cluster columns.

Args:
x: Array to be clustered
cluster_columns: List of column indices on which the clusters
are constructed.
Returns: None
Adds:
clustered_x: Added to the class
_counts: Added to the class
_padded_x: Added to the class
"""
x = lex_sort(x=x, cluster_columns=cluster_columns)

unique_sensors, self._counts = np.unique(
x[:, cluster_columns], axis=0, return_counts=True
)

contingency_table = np.concatenate(
[unique_sensors, self._counts.reshape(-1, 1)], axis=1
)

contingency_table = lex_sort(
x=contingency_table, cluster_columns=cluster_columns
)

self.clustered_x = contingency_table[:, 0 : unique_sensors.shape[1]]
self._counts = (
contingency_table[:, self.clustered_x.shape[1] :]
.flatten()
.astype(int)
)

self._padded_x = np.empty(
(len(self._counts), max(self._counts), x.shape[1])
)
self._padded_x.fill(np.nan)

for i in range(len(self._counts)):
self._padded_x[i, : self._counts[i]] = x[: self._counts[i]]
x = x[self._counts[i] :]

def _add_column(
self, column: np.ndarray, location: Optional[int] = None
) -> None:
"""Add a column to the clustered tensor.

Args:
column: Column to be added to the tensor
location: Location to insert the column in the clustered tensor
Returns:
clustered_x: The clustered tensor with the column added
"""
if location is None:
self.clustered_x = np.column_stack([self.clustered_x, column])
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The location parameter could potentially overwrite information, right? E.g. if column 6 already contains some aggregate statistics and the user specifies location=6. Maybe a warning/error here would be useful?

self.clustered_x = np.insert(
self.clustered_x, location, column, axis=1
)

def add_charge_threshold_summary(
self,
summarization_indices: List[int],
percentiles: List[int],
charge_index: int,
location: Optional[int] = None,
) -> np.ndarray:
"""Summarize features through percentiles on charge of sensor.

Args:
summarization_indices: List of column indices that defines features
that will be summarized with percentiles.
percentiles: percentiles used to summarize `x`. E.g. [10,50,90].
charge_index: index of the charge column in the padded tensor
location: Location to insert the summarization indices in the
clustered tensor defaults to adding at the end
Returns:
clustered_x: The clustered tensor with the summarization indices
added
Adds:
_charge_sum: Added to the class
_charge_weights: Added to the class
Altered:
_padded_x: Charge is altered to be the cumulative sum
of the charge divided by the total charge
clustered_x: The summarization indices are added at the end
of the tensor
"""
# convert the charge to the cumulative sum of the charge divided
# by the total charge
self._charge_weights = self._padded_x[:, :, charge_index]

self._padded_x[:, :, charge_index] = self._padded_x[
:, :, charge_index
].cumsum(axis=1)

# add the charge sum to the class if it does not already exist
if not hasattr(self, "_charge_sum"):
self._charge_sum = np.nanmax(
self._padded_x[:, :, charge_index], axis=1
)

self._charge_weights = (
self._charge_weights / self._charge_sum[:, np.newaxis]
)

self._padded_x[:, :, charge_index] = (
self._padded_x[:, :, charge_index]
/ self._charge_sum[:, np.newaxis]
)

# Summarize the charge at different percentiles
selections = np.argmax(
self._padded_x[:, :, charge_index][:, :, np.newaxis]
>= (np.array(percentiles) / 100),
axis=1,
)

selections += (np.arange(len(self._counts)) * self._padded_x.shape[1])[
:, np.newaxis
]

selections = self._padded_x[:, :, summarization_indices].reshape(
-1, len(summarization_indices)
)[selections]
selections = selections.transpose(0, 2, 1).reshape(
len(self.clustered_x), -1
)
self._add_column(selections, location)
return self.clustered_x

def add_percentile_summary(
self,
summarization_indices: List[int],
percentiles: List[int],
method: str = "linear",
location: Optional[int] = None,
) -> np.ndarray:
"""Summarize the features of the sensors using percentiles.

Args:
summarization_indices: List of column indices that defines features
that will be summarized with percentiles.
percentiles: percentiles used to summarize `x`. E.g. [10,50,90].
method: Method to summarize the features. E.g. "linear"
location: Location to insert the summarization indices in the
clustered tensor defaults to adding at the end
Returns:
None
Adds:
None
Altered:
clustered_x: The summarization indices are added at the end of
the tensor
"""
percentiles_x = np.nanpercentile(
self._padded_x[:, :, summarization_indices],
percentiles,
axis=1,
method=method,
)

percentiles_x = percentiles_x.transpose(1, 2, 0).reshape(
len(self.clustered_x), -1
)
self._add_column(percentiles_x, location)
return self.clustered_x

def calculate_charge_sum(self, charge_index: int) -> np.ndarray:
"""Calculate the sum of the charge."""
assert not hasattr(
self, "_charge_sum"
), "Charge sum has already been calculated, \
re-calculation is not allowed"
self._charge_sum = self._padded_x[:, :, charge_index].sum(axis=1)
return self._charge_sum

def calculate_charge_weights(self, charge_index: int) -> np.ndarray:
"""Calculate the weights of the charge."""
assert not hasattr(
self, "_charge_weights"
), "Charge weights have already been calculated, \
re-calculation is not allowed"
assert hasattr(
self, "_charge_sum"
), "Charge sum has not been calculated, \
please run calculate_charge_sum"
self._charge_weights = (
self._padded_x[:, :, charge_index]
/ self._charge_sum[:, np.newaxis]
)
return self._charge_weights

def add_counts(self, location: Optional[int] = None) -> np.ndarray:
"""Add the counts of the sensor to the summarization features."""
self._add_column(np.log10(self._counts), location)
return self.clustered_x

def add_sum_charge(self, location: Optional[int] = None) -> np.ndarray:
"""Add the sum of the charge to the summarization features."""
assert hasattr(
self, "_charge_sum"
), "Charge sum has not been calculated, \
please run calculate_charge_sum"
self._add_column(self._charge_sum, location)
return self.clustered_x

def add_std(
self,
column: int,
location: Optional[int] = None,
weights: Union[np.ndarray, int] = 1,
) -> np.ndarray:
"""Add the standard deviation of the column.

Args:
column: Index of the column in the padded tensor to
calculate the standard deviation
location: Location to insert the standard deviation in the
clustered tensor defaults to adding at the end
weights: Optional weights to be applied to the standard deviation
"""
self._add_column(
np.nanstd(self._padded_x[:, :, column] * weights, axis=1), location
)
return self.clustered_x

def add_mean(
self,
column: int,
location: Optional[int] = None,
weights: Union[np.ndarray, int] = 1,
) -> np.ndarray:
"""Add the mean of the column."""
self._add_column(
np.nanmean(self._padded_x[:, :, column] * weights, axis=1),
location,
)
return self.clustered_x


def ice_transparency(
z_offset: Optional[float] = None, z_scaling: Optional[float] = None
) -> Tuple[interp1d, interp1d]:
Expand Down
Loading