Skip to content

Commit

Permalink
Feature/fld calibrator (#131)
Browse files Browse the repository at this point in the history
* add bugfixes and notebook

* feat: Refactor calibration model and enhance contour detection

* style fixes by ruff

---------

Co-authored-by: AtomScott <[email protected]>
  • Loading branch information
AtomScott and AtomScott authored Nov 1, 2023
1 parent 181764d commit 9bd7348
Show file tree
Hide file tree
Showing 9 changed files with 1,017 additions and 38 deletions.
700 changes: 700 additions & 0 deletions notebooks/01_get_started/tracking_with_bells_and_whistles.ipynb

Large diffs are not rendered by default.

161 changes: 144 additions & 17 deletions notebooks/03_core_components/calibration_model.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion sportslabkit/calibration_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from sportslabkit.calibration_model.base import BaseCalibrationModel
from sportslabkit.calibration_model.dummy import DummyCalibrationModel
from sportslabkit.calibration_model.fld import LineBasedCalibrator
from sportslabkit.calibration_model.fld import SimpleContourCalibrator, FLDCalibrator
from sportslabkit.logger import logger


Expand Down
86 changes: 74 additions & 12 deletions sportslabkit/calibration_model/fld.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@
from .base import BaseCalibrationModel


class LineBasedCalibrator(BaseCalibrationModel):
class SimpleContourCalibrator(BaseCalibrationModel):
def __init__(
self, min_line_length=50, line_distance_threshold=50, line_thickness=15, morph_size=15, dst_points=None
self,
morph_open_size=15,
morph_close_size=15,
morph_dilate_size=15,
morph_erode_size=15,
morph_iters=1,
dst_points=None
):
"""Initialize the line-based calibrator with given parameters."""
self.min_line_length = min_line_length
self.line_distance_threshold = line_distance_threshold
self.line_thickness = line_thickness
self.morph_size = morph_size
self.morph_open_size = morph_open_size
self.morph_close_size = morph_close_size
self.morph_dilate_size = morph_dilate_size
self.morph_erode_size = morph_erode_size
self.morph_iters = morph_iters

# If destination points are not provided, default to a standard soccer pitch
if dst_points is None:
# Using the dimensions of a standard soccer pitch (105m x 68m)
Expand All @@ -23,16 +31,26 @@ def _preprocess_image(self, image):
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gray = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)[1]
gray = gray.astype(np.uint8)
kernel = np.ones((self.morph_size, self.morph_size), np.uint8)
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)
open_kernel = np.ones((self.morph_open_size, self.morph_open_size), np.uint8)
close_kernel = np.ones((self.morph_close_size, self.morph_close_size), np.uint8)
dilate_kernel = np.ones((self.morph_dilate_size, self.morph_dilate_size), np.uint8)
erode_kernel = np.ones((self.morph_erode_size, self.morph_erode_size), np.uint8)
for i in range(self.morph_iters):
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, close_kernel)
gray = cv2.morphologyEx(gray, cv2.MORPH_OPEN, open_kernel)
gray = cv2.morphologyEx(gray, cv2.MORPH_DILATE, dilate_kernel)
gray = cv2.morphologyEx(gray, cv2.MORPH_ERODE, erode_kernel)
return gray

def _get_largest_contour(self, image):
"""Extract and return the largest contour from the binary image."""
binary = self._preprocess_image(image)
contours, _ = cv2.findContours(binary, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
max_contour = max(contours, key=cv2.contourArea)
hull = cv2.convexHull(max_contour)
return max_contour

def _approximate_hull(self, contour):
hull = cv2.convexHull(contour)
return hull

def _farthest_point_from(self, reference_point, point_list):
Expand All @@ -46,7 +64,7 @@ def _farthest_point_from(self, reference_point, point_list):
farthest_point = point[0]
return farthest_point

def _approximate_contour(self, hull):
def _approximate_quad(self, hull):
"""Approximate a convex hull to a quadrilateral by considering most distant points."""
first_point = hull[0][0]
second_point = self._farthest_point_from(first_point, hull)
Expand Down Expand Up @@ -90,7 +108,8 @@ def find_quadrilateral(self, image):
"""

contour = self._get_largest_contour(image)
quadrilateral = self._approximate_contour(contour)
hull = self._approximate_hull(contour)
quadrilateral = self._approximate_quad(hull)
return self.order_points(quadrilateral)

def order_points(self, pts):
Expand Down Expand Up @@ -120,7 +139,50 @@ def forward(self, image):
"""

contour = self._get_largest_contour(image)
quadrilateral = self._approximate_contour(contour)
quadrilateral = self._approximate_quad(contour)

homography_matrix = self._calculate_homography(quadrilateral, self.dst_points)
return homography_matrix


class FLDCalibrator(SimpleContourCalibrator):
def __init__(self, length_threshold=50, distance_threshold=50, canny_th1=50, canny_th2=150, canny_aperture_size=3, do_merge=True, dst_points=None):
"""Initialize the line-based calibrator with given parameters."""
self.fld = cv2.ximgproc.createFastLineDetector(_length_threshold=self.length_threshold,
_distance_threshold=self.distance_threshold,
_canny_th1=self.canny_th1, _canny_th2=self.canny_th2,
_canny_aperture_size=self.canny_aperture_size, _do_merge=self.do_merge)
if dst_points is None:
# Using the dimensions of a standard soccer pitch (105m x 68m)
self.dst_points = np.array([[0, 0], [105, 0], [105, 68], [0, 68]], dtype=np.float32)

def _preprocess_image(self, image):
"""Convert the image to grayscale and apply thresholding and morphological operations."""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gray = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)[1]
gray = gray.astype(np.uint8)
kernel = np.ones((self.morph_size, self.morph_size), np.uint8)
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)
return gray

def _get_lines(self, image):
"""Detect lines in the image using Fast Line Detector."""

lines = self.fld.detect(image)
return lines

def _get_largest_contour(self, image):
"""Extract and return the largest contour from the binary image."""
binary = self._preprocess_image(image)
lines = self._get_lines(binary)

# Creating an empty canvas to draw lines on
line_image = np.zeros_like(binary)
for line in lines:
x0, y0, x1, y1 = map(int, line[0])
cv2.line(line_image, (x0, y0), (x1, y1), 255, self.line_thickness)

contours, _ = cv2.findContours(line_image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
max_contour = max(contours, key=cv2.contourArea)
hull = cv2.convexHull(max_contour)
return hull
11 changes: 8 additions & 3 deletions sportslabkit/matching/motion_visual.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,22 @@ def compute_cost_matrix(self, trackers: Sequence[Tracklet], detections: Sequence
return np.array([])

# Compute motion cost
motion_cost = motion_metric_beta * self.motion_metric(trackers, detections)
motion_cost = self.motion_metric(trackers, detections)
# print(motion_cost.mean(), motion_cost.std())

# Gate elements of motion cost matrix to infinity
motion_cost[motion_cost > self.motion_metric_gate] = np.inf

# Compute visual cost
visual_cost = visual_metric_beta * self.visual_metric(trackers, detections)
visual_cost = self.visual_metric(trackers, detections)
# print(visual_cost.mean(),visual_cost.std())

# Gate elements of visual cost matrix to infinity
visual_cost[visual_cost > self.visual_metric_gate] = np.inf

# Compute total cost
cost_matrix = motion_cost + visual_cost
inf_mask = (motion_cost == np.inf) | (visual_cost == np.inf)
cost_matrix = motion_metric_beta * motion_cost + visual_metric_beta * visual_cost

cost_matrix[inf_mask] = np.inf
return cost_matrix
18 changes: 14 additions & 4 deletions sportslabkit/mot/teamtrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ def __init__(
self.first_matching_fn = first_matching_fn
self.second_matching_fn = second_matching_fn
self.detection_score_threshold = detection_score_threshold
self.homographies = []

def predict_single_tracklet_motion(self, tracklet):
# x = self.tracklet_to_points(tracklet, H)
y = self.motion_model(tracklet).squeeze(0).numpy()
y = self.motion_model(tracklet).squeeze().numpy()
return y

def predict_multi_tracklet_motion(self, tracklets):
Expand Down Expand Up @@ -101,9 +102,20 @@ def update(self, current_frame, tracklets):

# calculate 2d pitch coordinates
H = self.calibration_model(current_frame)
self.homographies.append(H)

dets_ids_to_remove = []
for i, det in enumerate(detections):
det.pt = self.detection_to_points(det, H)

# remove detections that are outside the pitch
# add other sports
if det.pt[0] < 0 or det.pt[0] > 105 or det.pt[1] < 0 or det.pt[1] > 68:
dets_ids_to_remove.append(i)

for i in sorted(dets_ids_to_remove, reverse=True):
del detections[i]

##############################
# Motion prediction
##############################
Expand Down Expand Up @@ -187,9 +199,7 @@ def update(self, current_frame, tracklets):
##############################

# Second association between unassigned tracklets and low confidence detections
matches_second, cost_matrix_second = self.second_matching_fn(
unassigned_tracklets, low_confidence_detections, True
)
matches_second, cost_matrix_second = self.second_matching_fn(unassigned_tracklets, low_confidence_detections, True)

# [Second] assigned tracklets: update
for match in matches_second:
Expand Down
3 changes: 2 additions & 1 deletion sportslabkit/motion_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from sportslabkit.motion_model.base import BaseMotionModel
from sportslabkit.motion_model.models import ExponentialMovingAverage, KalmanFilter
from sportslabkit.motion_model.tune import tune_motion_model

from sportslabkit.motion_model.groupcast import GCLinear

__all__ = [
"tune_motion_model",
"ExponentialMovingAverage",
"KalmanFilter",
"BaseMotionModel",
"GCLinear"
]


Expand Down
72 changes: 72 additions & 0 deletions sportslabkit/motion_model/groupcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Any

import numpy as np
import torch
from torch import nn

from sportslabkit.motion_model.base import BaseMotionModel


# TODO: Refactor GroupCast out of slk code
class Linear(nn.Module):
def __init__(self, obs_steps: int):
"""Simple linear model that predicts the next position based on the last `obs_steps`, using a constant velocity model."""
super().__init__()
self.obs_steps = obs_steps

def forward(self, x):
# assume x is (batch_size, seq_len, 2)
assert x.dim() == 3 or x.dim() == 2
if x.dim() == 2:
# If only one observation, add a batch dimension
x = x.unsqueeze(0)

if x.shape[1] == 1:
# If only one observation, just return it
return x

# Estimate the velocity
v = x[:, -self.obs_steps :].diff(dim=1).mean(dim=1) # (batch_size, 2)
y_pred = x[:, -1] + v # (batch_size, 2)
return y_pred

def roll_out(self, x, n_steps, y_gt=None):
y_pred = []

for i in range(n_steps):
y_pred_i = self.forward(x)
y_pred.append(y_pred_i)

if y_gt is not None:
# use the ground truth position as the next input
x = torch.cat([x[:, 1:, :], y_gt[:, i, :].unsqueeze(1)], dim=1)
else:
# use the predicted position as the next input
x = torch.cat([x[:, 1:, :], y_pred_i.unsqueeze(1)], dim=1)

return torch.stack(y_pred, dim=1)


class GCLinear(BaseMotionModel):
""" """

hparam_search_space: dict[str, dict[str, object]] = {}
required_observation_types = ["pt"]
required_state_types = []

def __init__(self, obs_steps: int = 25):
"""
Initialize the ExponentialMovingAverage motion model.
"""
super().__init__()
self.model = Linear(obs_steps=obs_steps)

def predict(
self,
observations: dict[str, Any],
states: dict[str, float | np.ndarray[Any, Any]] = ...,
) -> tuple[np.ndarray[Any, Any], dict[str, float | np.ndarray[Any, Any]]]:
x = torch.tensor(observations.get("pt", None))
y = self.model(x)
return y, states
2 changes: 2 additions & 0 deletions sportslabkit/motion_model/models/kf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any

import numpy as np
import torch
from filterpy.kalman import predict, update
from numpy import ndarray

Expand Down Expand Up @@ -138,4 +139,5 @@ def predict(
states["H"],
)
pred = new_states["x"][:4]
pred = torch.tensor(pred).unsqueeze(0)
return pred, new_states

0 comments on commit 9bd7348

Please sign in to comment.