-
Notifications
You must be signed in to change notification settings - Fork 2
/
sampler.py
91 lines (78 loc) · 3.41 KB
/
sampler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
Some code in this file is borrowed from PatchCore (https://github.com/amazon-science/patchcore-inspection) (Apache-2.0 License)
"""
import torch
from tqdm import tqdm
import numpy as np
from torch.nn import Linear
class ApproximateGreedyCoresetSampler:
def __init__(
self,
ratio: float,
num_starting_points: int,
mapper: Linear,
max_sampling_time: int = None,
):
if not 0 < ratio <= 1:
raise ValueError("ratio value not in (0,1].")
self.ratio = ratio
self.num_starting_points = num_starting_points
self.mapper = mapper
self.max_sampling_time = max_sampling_time
def run(self, patches: torch.Tensor) -> torch.Tensor:
"""Subsamples patches using Greedy Coreset.
Args:
patches: [N x D]
"""
if self.ratio == 1:
return patches
sample_indices = self._compute_greedy_coreset_indices(self.mapper(patches))
return patches[sample_indices]
@staticmethod
def _compute_batchwise_differences(
matrix_a: torch.Tensor, matrix_b: torch.Tensor
) -> torch.Tensor:
"""Computes batchwise Euclidean distances using PyTorch."""
a_times_a = matrix_a.unsqueeze(1).bmm(matrix_a.unsqueeze(2)).reshape(-1, 1)
b_times_b = matrix_b.unsqueeze(1).bmm(matrix_b.unsqueeze(2)).reshape(1, -1)
a_times_b = matrix_a.mm(matrix_b.T)
return (-2 * a_times_b + a_times_a + b_times_b).clamp(0, None).sqrt()
def _compute_greedy_coreset_indices(self, patches: torch.Tensor) -> np.ndarray:
"""Runs approximate iterative greedy coreset selection.
This greedy coreset implementation does not require computation of the
full N x N distance matrix and thus requires a lot less memory, however
at the cost of increased sampling times.
Args:
patches: [NxD] input feature bank to sample.
"""
start_points = np.random.choice(
len(patches), self.num_starting_points, replace=False
).tolist()
approximate_distance_matrix = self._compute_batchwise_differences(
patches, patches[start_points]
)
approximate_coreset_anchor_distances = torch.mean(
approximate_distance_matrix, axis=-1
).reshape(-1, 1)
coreset_indices = []
num_coreset_samples = int(len(patches) * self.ratio)
with tqdm(range(num_coreset_samples), desc="Subsampling...") as t:
for _ in t:
if (
self.max_sampling_time is not None
and t.format_dict["elapsed"] > self.max_sampling_time
):
raise RuntimeError("Max sampling time reached")
select_idx = torch.argmax(approximate_coreset_anchor_distances).item()
coreset_indices.append(select_idx)
coreset_select_distance = self._compute_batchwise_differences(
patches, patches[select_idx : select_idx + 1] # noqa: E203
)
approximate_coreset_anchor_distances = torch.cat(
[approximate_coreset_anchor_distances, coreset_select_distance],
dim=-1,
)
approximate_coreset_anchor_distances = torch.min(
approximate_coreset_anchor_distances, dim=1
).values.reshape(-1, 1)
return np.array(coreset_indices)