Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

screen detector for stationary setups #2341

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 174 additions & 1 deletion pupil_src/shared_modules/square_marker_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,93 @@ def correct_gradient(gray_img, r):
return True


def sortCorners(corners, center):
"""
corners : list of points
center : point
"""
top = [corner for corner in corners if corner[1] < center[1]]
bot = [corner for corner in corners if corner[1] >= center[1]]

corners = np.zeros(shape=(4, 2))

if (len(top) == 2) and (len(bot) == 2):
tl, tr = sorted(top, key=lambda p: p[0])
bl, br = sorted(bot, key=lambda p: p[0])

corners[0] = np.array(tl)
corners[1] = np.array(tr)
corners[2] = np.array(br)
corners[3] = np.array(bl)

return corners


criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.001)


def detect_screen_corners_as_markers(gray_img):
edges = cv2.adaptiveThreshold(
gray_img, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 25, -5
)

*_, contours, hierarchy = cv2.findContours(
edges, mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_SIMPLE, offset=(0, 0)
) # TC89_KCOS

hierarchy = hierarchy[0]
contours = np.array(contours, dtype=object)
contours = contours[np.logical_and(hierarchy[:, 3] >= 0, hierarchy[:, 2] >= 0)]

# logging.info(np. __version__)
# 1.26.1

# logging.info(cv2. __version__)
# 4.8.1

# logging.info(str(contours))
contours = np.array(
[c for c in contours if cv2.contourArea(c) > (20 * 2500)], dtype=object
)

screen_corners = []
if len(contours) > 0:
contours = contours[0].astype(np.int32)

epsilon = cv2.arcLength(contours, True) * 0.1
aprox_contours = [cv2.approxPolyDP(contours, epsilon, True)]
rect_cand = [r for r in aprox_contours if r.shape[0] == 4]

for count, r in enumerate(rect_cand):
r = np.float32(r)
cv2.cornerSubPix(gray_img, r, (3, 3), (-1, -1), criteria)
corners = np.array([r[0][0], r[1][0], r[2][0], r[3][0]])
centroid = corners.sum(axis=0, dtype="float64") * 0.25
centroid.shape = 2

corners = sortCorners(corners, centroid)
r[0][0], r[1][0], r[2][0], r[3][0] = (
corners[0],
corners[1],
corners[2],
corners[3],
)

corner = {
"id": 32 + count,
"verts": r.tolist(),
"perimeter": cv2.arcLength(r, closed=True),
"centroid": centroid.tolist(),
"frames_since_true_detection": 0,
"id_confidence": 1.0,
"soft_id": 32 + count,
}

screen_corners.append(corner)

return screen_corners


def detect_markers(
gray_img, grid_size, min_marker_perimeter=40, aperture=11, visualize=False
):
Expand All @@ -173,7 +260,7 @@ def detect_markers(

# remove extra encapsulation
hierarchy = hierarchy[0]
contours = np.array(contours)
contours = np.array(contours, dtype=object)
# keep only contours with parents and children
contained_contours = contours[
np.logical_and(hierarchy[:, 3] >= 0, hierarchy[:, 2] >= 0)
Expand Down Expand Up @@ -382,6 +469,92 @@ def m_screen_to_marker(marker):
tick = 0


def detect_screen_robust(
gray_img,
prev_markers,
true_detect_every_frame=1,
invert_image=False,
):
global prev_img

if invert_image:
gray_img = 255 - gray_img

global tick
if tick == 0:
tick = true_detect_every_frame
new_markers = detect_screen_corners_as_markers(gray_img)
else:
new_markers = []
tick -= 1

if prev_img is not None and prev_img.shape == gray_img.shape and prev_markers:
new_ids = [m["id"] for m in new_markers]

# any old markers not found in the new list?
not_found = [m for m in prev_markers if m["id"] not in new_ids and m["id"] >= 0]
if not_found:
prev_pts = np.array(
[np.array(m["verts"], dtype=np.float32) for m in not_found]
)
prev_pts = np.vstack(prev_pts)
new_pts, flow_found, err = cv2.calcOpticalFlowPyrLK(
prev_img, gray_img, prev_pts, None, minEigThreshold=0.01, **lk_params
)
for marker_idx in range(flow_found.shape[0] // 4):
m = not_found[marker_idx]
m_slc = slice(marker_idx * 4, marker_idx * 4 + 4)
if flow_found[m_slc].sum() >= 4:
found, _ = np.where(flow_found[m_slc])
# calculate differences
old_verts = prev_pts[m_slc][found, :]
new_verts = new_pts[m_slc][found, :]
vert_difs = new_verts - old_verts
# calc mean dif
mean_dif = vert_difs.mean(axis=0)
# take n-1 closest difs
dist_variance = np.linalg.norm(mean_dif - vert_difs, axis=1)
if max(np.abs(dist_variance).flatten()) > 5:
m["frames_since_true_detection"] = 100
else:
closest_mean_dif = np.argsort(dist_variance, axis=0)[:-1, 0]
# recalc mean dif
mean_dif = vert_difs[closest_mean_dif].mean(axis=0)
# apply mean dif
proj_verts = prev_pts[m_slc] + mean_dif
m["verts"] = new_verts.tolist()
m["centroid"] = new_verts.sum(axis=0) / 4.0
m["centroid"].shape = 2
m["centroid"] = m["centroid"].tolist()
m["frames_since_true_detection"] += 1
# m['opf_vel'] = mean_dif
else:
m["frames_since_true_detection"] = 100

# cocatenating like this will favour older markers in the doublication deletion process
markers = [
m for m in not_found if m["frames_since_true_detection"] < 5
] + new_markers
if markers: # del double detected markers
min_distace = min(m["perimeter"] for m in markers) / 4.0
# min_distace = 50
if len(markers) > 1:
remove = set()
close_markers = get_close_markers(markers, min_distance=min_distace)
for f, s in close_markers.T:
# remove the markers further down in the list
remove.add(s)
remove = list(remove)
remove.sort(reverse=True)
for i in remove:
del markers[i]
else:
markers = new_markers

prev_img = gray_img.copy()
return markers


def detect_markers_robust(
gray_img,
grid_size,
Expand Down
39 changes: 39 additions & 0 deletions pupil_src/shared_modules/surface_tracker/surface_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

@enum.unique
class Surface_Marker_Type(enum.Enum):
SCREEN = "screen"
SQUARE = "legacy"
APRILTAG_V3 = "apriltag_v3"

Expand Down Expand Up @@ -185,6 +186,42 @@ def tag_id(self) -> Surface_Marker_TagID:
return Surface_Marker_TagID(int(self.raw_id))


class _Screen_Marker_Detection(_Square_Marker_Detection_Raw, Surface_Base_Marker):
__slots__ = ()

marker_type = Surface_Marker_Type.SCREEN

@staticmethod
def from_tuple(state: tuple) -> "_Screen_Marker_Detection":
cls = _Screen_Marker_Detection
expected_marker_type = cls.marker_type

assert len(state) > 0
assert isinstance(state[-1], str)

try:
real_marker_type = Surface_Marker_Type(state[-1])
except ValueError:
real_marker_type = expected_marker_type
state = (*state, real_marker_type.value)

assert real_marker_type == expected_marker_type
return cls(*state)

def to_tuple(self) -> tuple:
return tuple(self)

@property
def uid(self) -> Surface_Marker_UID:
return create_surface_marker_uid(
marker_type=self.marker_type, tag_family=None, tag_id=self.tag_id
)

@property
def tag_id(self) -> Surface_Marker_TagID:
return Surface_Marker_TagID(int(self.raw_id))


_Apriltag_V3_Marker_Detection_Raw = collections.namedtuple(
"Apriltag_V3_Marker_Detection",
[
Expand Down Expand Up @@ -312,6 +349,8 @@ def from_tuple(state: tuple) -> "Surface_Marker":
marker_type = state[-1]
if marker_type == _Square_Marker_Detection.marker_type.value:
raw_marker = _Square_Marker_Detection.from_tuple(state)
elif marker_type == _Screen_Marker_Detection.marker_type.value:
raw_marker = _Screen_Marker_Detection.from_tuple(state)
elif marker_type == _Apriltag_V3_Marker_Detection.marker_type.value:
raw_marker = _Apriltag_V3_Marker_Detection.from_tuple(state)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

@enum.unique
class MarkerType(enum.Enum):
SCREEN_MARKER = "screen_marker"
SQUARE_MARKER = "square_marker"
APRILTAG_MARKER = "apriltag_marker"

Expand All @@ -51,17 +52,22 @@ class MarkerDetectorMode(T.NamedTuple):

@classmethod
def all_supported_cases(cls) -> T.Set["MarkerDetectorMode"]:
all_screen = {cls(MarkerType.SCREEN_MARKER, None)}
all_square = {cls(MarkerType.SQUARE_MARKER, None)}
all_apriltag = {
cls(MarkerType.APRILTAG_MARKER, family) for family in ApriltagFamily
}
return all_square | all_apriltag
return all_square | all_apriltag | all_screen

@classmethod
def from_marker(cls, marker: Surface_Marker) -> "MarkerDetectorMode":
marker_type = marker.marker_type
if marker_type == Surface_Marker_Type.SQUARE:
return cls(MarkerType.SQUARE_MARKER, None)

if marker_type == Surface_Marker_Type.SCREEN:
return cls(MarkerType.SCREEN_MARKER, None)

if marker_type == Surface_Marker_Type.APRILTAG_V3:
return cls(
MarkerType.APRILTAG_MARKER, ApriltagFamily(marker.raw_marker.tag_family)
Expand All @@ -74,6 +80,8 @@ def from_marker(cls, marker: Surface_Marker) -> "MarkerDetectorMode":
def label(self) -> str:
if self.marker_type == MarkerType.SQUARE_MARKER:
return "Legacy square markers"
if self.marker_type == MarkerType.SCREEN_MARKER:
return "Screen marker"
if self.marker_type == MarkerType.APRILTAG_MARKER:
return f"Apriltag ({self.family.value})"
raise ValueError(f"Unlabeled surface marker mode: {self}")
Expand Down Expand Up @@ -189,6 +197,84 @@ def detect_markers_iter(
return markers


class Screen_Marker_Detector(Surface_Base_Marker_Detector):
def __init__(
self,
marker_min_perimeter: int = ...,
square_marker_inverted_markers: bool = ...,
square_marker_use_online_mode: bool = ...,
):
self.__marker_min_perimeter = (
marker_min_perimeter if marker_min_perimeter is not ... else 60
)
self.__inverted_markers = (
square_marker_inverted_markers
if square_marker_inverted_markers is not ...
else False
)
self.__previous_raw_markers = []
self.__previous_frame_index = -1
self.use_online_mode = (
square_marker_use_online_mode
if square_marker_use_online_mode is not ...
else False
)

@property
def inverted_markers(self) -> bool:
return self.__inverted_markers

@inverted_markers.setter
def inverted_markers(self, value: bool):
self.__inverted_markers = value

@property
def marker_min_perimeter(self) -> int:
return self.__marker_min_perimeter

@marker_min_perimeter.setter
def marker_min_perimeter(self, value: int):
self.__marker_min_perimeter = value

def _surface_marker_filter(self, marker: Surface_Marker) -> bool:
return self.marker_min_perimeter <= marker.perimeter

def detect_markers_iter(
self, gray_img, frame_index: int
) -> T.Iterable[Surface_Marker]:
if self.use_online_mode:
true_detect_every_frame = 3
else:
true_detect_every_frame = 1
# in offline mode we can get non-monotonic data,
# in which case the previous data is invalid
if frame_index != self.__previous_frame_index + 1:
self.__previous_raw_markers = []
# TODO: Does this mean that seeking in the recording while the
# surface is being detected will essentially compromise the data? As
# in these cases we cannot use the previous frame data for inferring
# better marker positions. But if we would not have seeked we could
# have used this information! This looks like an inconsistency!

grid_size = 5
aperture = 9
min_perimeter = self.marker_min_perimeter

markers = square_marker_detect.detect_screen_robust(
gray_img=gray_img,
prev_markers=self.__previous_raw_markers,
true_detect_every_frame=true_detect_every_frame,
invert_image=self.__inverted_markers,
)
# Robust marker detection requires previous markers to be in a different
# format than the surface tracker.
self.__previous_raw_markers = markers
self.__previous_frame_index = frame_index
markers = map(Surface_Marker.from_square_tag_detection, markers)
markers = filter(self._surface_marker_filter, markers)
return markers


class Surface_Apriltag_V3_Marker_Detector_Params:
def __init__(
self,
Expand Down Expand Up @@ -303,6 +389,12 @@ def init_detector(self):
f"\tapriltag_decode_sharpening={self._apriltag_decode_sharpening}\n"
")"
)
elif self._marker_detector_mode.marker_type == MarkerType.SCREEN_MARKER:
self.__detector = Screen_Marker_Detector(
marker_min_perimeter=self._marker_min_perimeter,
square_marker_inverted_markers=self._square_marker_inverted_markers,
square_marker_use_online_mode=self._square_marker_use_online_mode,
)
elif self._marker_detector_mode.marker_type == MarkerType.SQUARE_MARKER:
self.__detector = Surface_Square_Marker_Detector(
marker_min_perimeter=self._marker_min_perimeter,
Expand Down