diff --git a/Pose2Sim/Demo_Batch/Config.toml b/Pose2Sim/Demo_Batch/Config.toml index 0936ebd0..a8401146 100644 --- a/Pose2Sim/Demo_Batch/Config.toml +++ b/Pose2Sim/Demo_Batch/Config.toml @@ -34,7 +34,16 @@ exclude_from_batch = [] # List of trials to be excluded from batch analysis, ['< [pose] -vid_img_extension = 'mp4' # any video or image extension +rotation = 0 # Degree of rotation applied to the image, allowed in multiples of 90. Use [90, 90, ...] for different angles per source. +vid_img_extension = 'mp4' # File extension for videos or images, or 'webcam' for webcam input. + +webcam_ids = 0 # Webcam ID to use. Default is 0. For multiple webcams, use a list like [0, 1, ...]. +capture_mode = 'continuous' # Only useful for webcams. 'Continuous' captures frames as soon as buffer space is available; 'alternating' waits until the buffer is free. +webcam_recording = false # If True, saves webcam streams while analyzing them to prevent frame loss due to processing delays. + +combined_frames = true # Accelerates processing by batch processing frames. (Beta feature) +multi_workers = false # Enables multiple worker processes for parallel frame analysis. (Not recommended) +max_simultaneous_sources = 9 # Limits the number of sources processed simultaneously. Recommended values: 4, 9, 16, ... (perfect square). pose_model = 'Body_with_feet' #With RTMLib: # - Body_with_feet (default HALPE_26 model), diff --git a/Pose2Sim/Demo_MultiPerson/Config.toml b/Pose2Sim/Demo_MultiPerson/Config.toml index 266e748a..ac0a1000 100644 --- a/Pose2Sim/Demo_MultiPerson/Config.toml +++ b/Pose2Sim/Demo_MultiPerson/Config.toml @@ -34,7 +34,16 @@ exclude_from_batch = [] # List of trials to be excluded from batch analysis, ['< [pose] -vid_img_extension = 'mp4' # any video or image extension +rotation = 0 # Degree of rotation applied to the image, allowed in multiples of 90. Use [90, 90, ...] for different angles per source. +vid_img_extension = 'mp4' # File extension for videos or images, or 'webcam' for webcam input. + +webcam_ids = 0 # Webcam ID to use. Default is 0. For multiple webcams, use a list like [0, 1, ...]. +capture_mode = 'continuous' # Only useful for webcams. 'Continuous' captures frames as soon as buffer space is available; 'alternating' waits until the buffer is free. +webcam_recording = false # If True, saves webcam streams while analyzing them to prevent frame loss due to processing delays. + +combined_frames = true # Accelerates processing by batch processing frames. (Beta feature) +multi_workers = false # Enables multiple worker processes for parallel frame analysis. (Not recommended) +max_simultaneous_sources = 9 # Limits the number of sources processed simultaneously. Recommended values: 4, 9, 16, ... (perfect square). pose_model = 'Body_with_feet' #With RTMLib: # - Body_with_feet (default HALPE_26 model), diff --git a/Pose2Sim/Demo_SinglePerson/Config.toml b/Pose2Sim/Demo_SinglePerson/Config.toml index cef36b91..0e68213c 100644 --- a/Pose2Sim/Demo_SinglePerson/Config.toml +++ b/Pose2Sim/Demo_SinglePerson/Config.toml @@ -34,7 +34,16 @@ exclude_from_batch = [] # List of trials to be excluded from batch analysis, ['< [pose] -vid_img_extension = 'mp4' # any video or image extension +rotation = 0 # Degree of rotation applied to the image, allowed in multiples of 90. Use [90, 90, ...] for different angles per source. +vid_img_extension = 'mp4' # File extension for videos or images, or 'webcam' for webcam input. + +webcam_ids = 0 # Webcam ID to use. Default is 0. For multiple webcams, use a list like [0, 1, ...]. +capture_mode = 'continuous' # Only useful for webcams. 'Continuous' captures frames as soon as buffer space is available; 'alternating' waits until the buffer is free. +webcam_recording = false # If True, saves webcam streams while analyzing them to prevent frame loss due to processing delays. + +combined_frames = true # Accelerates processing by batch processing frames. (Beta feature) +multi_workers = false # Enables multiple worker processes for parallel frame analysis. (Not recommended) +max_simultaneous_sources = 9 # Limits the number of sources processed simultaneously. Recommended values: 4, 9, 16, ... (perfect square). pose_model = 'Body_with_feet' #With RTMLib: # - Body_with_feet (default HALPE_26 model), @@ -93,7 +102,7 @@ output_format = 'openpose' # 'openpose', 'mmpose', 'deeplabcut', 'none' or a lis [synchronization] -display_sync_plots = true # true or false (lowercase) +display_sync_plots = false # true or false (lowercase) keypoints_to_consider = 'all' # 'all' if all points should be considered, for example if the participant did not perform any particicular sharp movement. In this case, the capture needs to be 5-10 seconds long at least # ['RWrist', 'RElbow'] list of keypoint names if you want to specify keypoints with a sharp vertical motion. approx_time_maxspeed = 'auto' # 'auto' if you want to consider the whole capture (default, slower if long sequences) diff --git a/Pose2Sim/poseEstimation.py b/Pose2Sim/poseEstimation.py index aca1f64b..6f10b5a4 100644 --- a/Pose2Sim/poseEstimation.py +++ b/Pose2Sim/poseEstimation.py @@ -7,19 +7,19 @@ ## POSE ESTIMATION ## ########################################################################### - Estimate pose from a video file or a folder of images and + Estimate pose from a video file or a folder of images and write the results to JSON files, videos, and/or images. Results can optionally be displayed in real time. Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body) - Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you - need nother detection or pose models) + Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you + need another detection or pose models) Optionally gives consistent person ID across frames (slower but good for 2D analysis) - Optionally runs detection every n frames and inbetween tracks points (faster but less accurate). + Optionally runs detection every n frames and in-between tracks points (faster but less accurate). - If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise, - uses the CPU with the OpenVINO backend. + If a valid CUDA installation is detected, uses the GPU with the ONNXRuntime backend. + Otherwise, uses the CPU with the OpenVINO backend. INPUTS: - videos or image folders from the video directory @@ -27,27 +27,36 @@ OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - - Optionally, videos and/or image files with the detected keypoints + - Optionally, videos and/or image files with the detected keypoints ''' ## INIT import os -import glob -import json +import time +import math import re -import logging +import json +import glob import ast +import logging +import queue +import multiprocessing +import psutil +import cv2 import numpy as np + +from datetime import datetime +from pathlib import Path from functools import partial +from multiprocessing import shared_memory from tqdm import tqdm from anytree.importer import DictImporter -import cv2 from rtmlib import PoseTracker, BodyWithFeet, Wholebody, Body, Hand, Custom, draw_skeleton + from deep_sort_realtime.deepsort_tracker import DeepSort -from Pose2Sim.common import natural_sort_key, sort_people_sports2d, sort_people_deepsort, sort_people_rtmlib,\ - colors, thickness, draw_bounding_box, draw_keypts, draw_skel +from Pose2Sim.common import natural_sort_key, sort_people_sports2d, sort_people_deepsort, sort_people_rtmlib, colors, thickness, draw_bounding_box, draw_keypts, draw_skel from Pose2Sim.skeletons import * @@ -63,39 +72,84 @@ ## FUNCTIONS -def setup_pose_tracker(ModelClass, det_frequency, mode, tracking, backend, device): - ''' - Set up the RTMLib pose tracker with the appropriate model and backend. - If CUDA is available, use it with ONNXRuntime backend; else use CPU with openvino - - INPUTS: - - ModelClass: class. The RTMlib model class to use for pose detection (Body, BodyWithFeet, Wholebody) - - det_frequency: int. The frequency of pose detection (every N frames) - - mode: str. The mode of the pose tracker ('lightweight', 'balanced', 'performance') - - tracking: bool. Whether to track persons across frames with RTMlib tracker - - backend: str. The backend to use for pose detection (onnxruntime, openvino, opencv) - - device: str. The device to use for pose detection (cpu, cuda, rocm, mps) - - OUTPUTS: - - pose_tracker: PoseTracker. The initialized pose tracker object - ''' - - backend, device = setup_backend_device(backend=backend, device=device) - - # Initialize the pose tracker with Halpe26 model - pose_tracker = PoseTracker( - ModelClass, - det_frequency=det_frequency, - mode=mode, - backend=backend, - device=device, - tracking=tracking, - to_openpose=False) - - return pose_tracker +def get_formatted_timestamp(): + dt = datetime.now() + ms = dt.microsecond // 1000 + return dt.strftime("%Y%m%d_%H%M%S_") + f"{ms:03d}" + + +def transform(frame, desired_w, desired_h, full, rotation = 0): + rotation = rotation % 360 + if rotation == 90: + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) + elif rotation == 180: + frame = cv2.rotate(frame, cv2.ROTATE_180) + elif rotation == 270: + frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) + + rotated_h, rotated_w = frame.shape[:2] + + scale = min(desired_w / rotated_w, desired_h / rotated_h) + new_w = int(rotated_w * scale) + new_h = int(rotated_h * scale) + if scale != 0: + frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) + if full: + canvas = np.zeros((desired_h, desired_w, 3), dtype=np.uint8) + x_offset = (desired_w - new_w) // 2 + y_offset = (desired_h - new_h) // 2 + canvas[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = frame + bottom_left_offset_y = desired_h - (y_offset + new_h) + transform_info = { + 'rotation': rotation, + 'scale': scale, + 'x_offset': x_offset, + 'y_offset': bottom_left_offset_y, + 'rotated_size': (rotated_w, rotated_h), + 'canvas_size': (desired_w, desired_h) + } + return canvas, transform_info + else: + return frame, None + + +def inverse_transform_keypoints(keypoints, transform_info): + desired_w, desired_h = transform_info['canvas_size'] + scale = transform_info['scale'] + x_offset = transform_info['x_offset'] + y_offset = transform_info['y_offset'] + rotation = transform_info['rotation'] + rotated_size = transform_info['rotated_size'] + new_keypoints = [] + for person in keypoints: + new_person = [] + for (x, y) in person: + + y_bl = desired_h - y + x_bl = desired_w - x + + X = (x_bl - x_offset) / scale + Y = (y_bl - y_offset) / scale + + if rotation % 360 == 0: + orig_x, orig_y = X, Y + elif rotation % 360 == 90: + orig_x = rotated_size[0] - Y + orig_y = X + elif rotation % 360 == 180: + orig_x = rotated_size[0] - X + orig_y = rotated_size[1] - Y + elif rotation % 360 == 270: + orig_x = Y + orig_y = rotated_size[1] - X + else: + orig_x, orig_y = X, Y + new_person.append([orig_x, orig_y]) + new_keypoints.append(np.array(new_person)) + return new_keypoints -def setup_backend_device(backend='auto', device='auto'): +def init_backend_device(backend='auto', device='auto'): ''' Set up the backend and device for the pose tracker based on the availability of hardware acceleration. TensorRT is not supported by RTMLib yet: https://github.com/Tau-J/rtmlib/issues/12 @@ -107,45 +161,37 @@ def setup_backend_device(backend='auto', device='auto'): 4. CPU with OpenVINO backend (default fallback) ''' - if device!='auto' and backend!='auto': - device = device.lower() - backend = backend.lower() - - if device=='auto' or backend=='auto': - if device=='auto' and backend!='auto' or device!='auto' and backend=='auto': - logging.warning(f"If you set device or backend to 'auto', you must set the other to 'auto' as well. Both device and backend will be determined automatically.") + if device == 'auto' or backend == 'auto': + if device != 'auto' or backend != 'auto': + logging.warning("If you set device or backend to 'auto', you must set the other to 'auto' as well. Both device and backend will be determined automatically.") try: import torch import onnxruntime as ort - if torch.cuda.is_available() == True and 'CUDAExecutionProvider' in ort.get_available_providers(): - device = 'cuda' - backend = 'onnxruntime' - logging.info(f"\nValid CUDA installation found: using ONNXRuntime backend with GPU.") - elif torch.cuda.is_available() == True and 'ROCMExecutionProvider' in ort.get_available_providers(): - device = 'rocm' - backend = 'onnxruntime' - logging.info(f"\nValid ROCM installation found: using ONNXRuntime backend with GPU.") + if torch.cuda.is_available() and 'CUDAExecutionProvider' in ort.get_available_providers(): + logging.info("Valid CUDA installation found: using ONNXRuntime backend with GPU.") + return 'onnxruntime', 'cuda' + elif torch.cuda.is_available() and 'ROCMExecutionProvider' in ort.get_available_providers(): + logging.info("Valid ROCM installation found: using ONNXRuntime backend with GPU.") + return 'onnxruntime', 'rocm' else: - raise + raise except: try: import onnxruntime as ort - if 'MPSExecutionProvider' in ort.get_available_providers() or 'CoreMLExecutionProvider' in ort.get_available_providers(): - device = 'mps' - backend = 'onnxruntime' - logging.info(f"\nValid MPS installation found: using ONNXRuntime backend with GPU.") + if ('MPSExecutionProvider' in ort.get_available_providers() or 'CoreMLExecutionProvider' in ort.get_available_providers()): + logging.info("Valid MPS installation found: using ONNXRuntime backend with GPU.") + return 'onnxruntime', 'mps' else: raise except: - device = 'cpu' - backend = 'openvino' - logging.info(f"\nNo valid CUDA installation found: using OpenVINO backend with CPU.") - - return backend, device + logging.info("No valid CUDA installation found: using OpenVINO backend with CPU.") + return 'openvino', 'cpu' + else: + return backend.lower(), device.lower() -def save_to_openpose(json_file_path, keypoints, scores): +def save_keypoints_to_openpose(json_file_path, all_keypoints, all_scores): ''' Save the keypoints and scores to a JSON file in the OpenPose format @@ -158,278 +204,648 @@ def save_to_openpose(json_file_path, keypoints, scores): - JSON file with the detected keypoints and confidence scores in the OpenPose format ''' - # Prepare keypoints with confidence scores for JSON output - nb_detections = len(keypoints) - # print('results: ', keypoints, scores) detections = [] - for i in range(nb_detections): # nb of detected people + + for idx_person in range(len(all_keypoints)): keypoints_with_confidence_i = [] - for kp, score in zip(keypoints[i], scores[i]): + for (kp, score) in zip(all_keypoints[idx_person], all_scores[idx_person]): keypoints_with_confidence_i.extend([kp[0].item(), kp[1].item(), score.item()]) detections.append({ - "person_id": [-1], - "pose_keypoints_2d": keypoints_with_confidence_i, - "face_keypoints_2d": [], - "hand_left_keypoints_2d": [], - "hand_right_keypoints_2d": [], - "pose_keypoints_3d": [], - "face_keypoints_3d": [], - "hand_left_keypoints_3d": [], - "hand_right_keypoints_3d": [] - }) - + "person_id": [-1], + "pose_keypoints_2d": keypoints_with_confidence_i, + "face_keypoints_2d": [], + "hand_left_keypoints_2d": [], + "hand_right_keypoints_2d": [], + "pose_keypoints_3d": [], + "face_keypoints_3d": [], + "hand_left_keypoints_3d": [], + "hand_right_keypoints_3d": [] + }) + # Create JSON output structure json_output = {"version": 1.3, "people": detections} - - # Save JSON output for each frame - json_output_dir = os.path.abspath(os.path.join(json_file_path, '..')) - if not os.path.isdir(json_output_dir): os.makedirs(json_output_dir) - with open(json_file_path, 'w') as json_file: - json.dump(json_output, json_file) - - -def process_video(video_path, pose_tracker, pose_model, output_format, save_video, save_images, display_detection, frame_range, multi_person, tracking_mode, deepsort_tracker): - ''' - Estimate pose from a video file - - INPUTS: - - video_path: str. Path to the input video file - - pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib - - pose_model: str. The pose model to use for pose estimation (HALPE_26, COCO_133, COCO_17) - - output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut') - - save_video: bool. Whether to save the output video - - save_images: bool. Whether to save the output images - - display_detection: bool. Whether to show real-time visualization - - frame_range: list. Range of frames to process - - multi_person: bool. Whether to detect multiple people in the video - - tracking_mode: str. The tracking mode to use for person tracking (deepsort, sports2d) - - deepsort_tracker: DeepSort tracker object or None - - OUTPUTS: - - JSON files with the detected keypoints and confidence scores in the OpenPose format - - if save_video: Video file with the detected keypoints and confidence scores drawn on the frames - - if save_images: Image files with the detected keypoints and confidence scores drawn on the frames - ''' - try: - cap = cv2.VideoCapture(video_path) - cap.read() - if cap.read()[0] == False: - raise - except: - raise NameError(f"{video_path} is not a video. Images must be put in one subdirectory per camera.") - - pose_dir = os.path.abspath(os.path.join(video_path, '..', '..', 'pose')) - if not os.path.isdir(pose_dir): os.makedirs(pose_dir) - video_name_wo_ext = os.path.splitext(os.path.basename(video_path))[0] - json_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_json') - output_video_path = os.path.join(pose_dir, f'{video_name_wo_ext}_pose.mp4') - img_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_img') - - if save_video: # Set up video writer - fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video - fps = round(cap.get(cv2.CAP_PROP_FPS)) # Get the frame rate from the raw video - W, H = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Get the width and height from the raw video - out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file - - if display_detection: - cv2.namedWindow(f"Pose Estimation {os.path.basename(video_path)}", cv2.WINDOW_NORMAL + cv2.WINDOW_KEEPRATIO) - - frame_idx = 0 - cap = cv2.VideoCapture(video_path) - total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - f_range = [[total_frames] if frame_range==[] else frame_range][0] - with tqdm(total=total_frames, desc=f'Processing {os.path.basename(video_path)}') as pbar: - frame_count = 0 - while cap.isOpened(): - # print('\nFrame ', frame_idx) - success, frame = cap.read() - frame_count += 1 - if not success: + # Save JSON output for each frame + with open(json_file_path, 'w') as outfile: + json.dump(json_output, outfile) + + +class PoseEstimatorWorker(multiprocessing.Process): + def __init__(self, **kwargs): + super().__init__() + self.__dict__.update(kwargs) + self.stopped = False + self.prev_keypoints = {} + + def run(self): + model = self.ModelClass(mode=self.mode, + to_openpose=self.to_openpose, + backend=self.backend, + device=self.device) + + try: + self.det_model = model.det_model + except: # rtmo + self.det_model = None + self.pose_model = model.pose_model + + self.tracker_ready_event.set() + + while not self.stopped: + # If no new frames and no active sources remain, stop + if self.queue.empty() and self.active_sources.value == 0: + logging.info("Stopping worker as no active sources or items are in the queue.") + self.stop() break + + try: + item = self.queue.get_nowait() + self.process_frame(item) + except queue.Empty: + time.sleep(0.005) + + def process_frame(self, item): + # Unpack frame info + buffer_name, idx, frame_shape, frame_dtype, *others = item + # Convert shared memory to numpy + frame = np.ndarray(frame_shape, dtype=frame_dtype, buffer=self.buffers[buffer_name].buf) + + if not others[1]: + if self.det_model is not None: + # if self.frame_cnt % self.det_frequency == 0: + bboxes = self.det_model(frame) + # else: + # bboxes = self.bboxes_last_frame + keypoints, scores = self.pose_model(frame, bboxes=bboxes) + else: # rtmo + keypoints, scores = self.pose_model(frame) + else: + keypoints, scores = None, None - if frame_idx in range(*f_range): - # Detect poses - keypoints, scores = pose_tracker(frame) - - # Track poses across frames - if multi_person: - if tracking_mode == 'deepsort': - keypoints, scores = sort_people_deepsort(keypoints, scores, deepsort_tracker, frame, frame_count) - if tracking_mode == 'sports2d': - if 'prev_keypoints' not in locals(): prev_keypoints = keypoints - prev_keypoints, keypoints, scores = sort_people_sports2d(prev_keypoints, keypoints, scores=scores) + if multi_person: + if tracking_mode == 'deepsort': + keypoints, scores = sort_people_deepsort(keypoints, scores, deepsort_tracker, frame, frame_count) + if tracking_mode == 'sports2d': + if 'prev_keypoints' not in locals(): prev_keypoints = keypoints + prev_keypoints, keypoints, scores = sort_people_sports2d(prev_keypoints, keypoints, scores=scores) + + result = (buffer_name, idx, frame_shape, frame_dtype, others[0], others[1], keypoints, scores) + self.result_queue.put(result) + + def stop(self): + self.stopped = True + + +class BaseSynchronizer: + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + self.sync_data = {} + self.placeholder_map = {} + + self.video_writers = {} + self.stopped = False + + def get_frames_list(self): + if not self.sync_data: + return None + + for idx in sorted(self.sync_data.keys()): + group = self.sync_data[idx] + if len(group) == len(self.sources): + complete_group = self.sync_data.pop(idx) + frames_list = [] + for source in self.sources: + sid = source['id'] + buffer_name, frame_shape, frame_dtype, keypoints, scores, is_placeholder, transform_info = complete_group[sid] + frame = np.ndarray(frame_shape, dtype=np.dtype(frame_dtype), buffer=self.frame_buffers[buffer_name].buf).copy() + self.available_frame_buffers.put(buffer_name) + orig_w, orig_h = frame_shape[1], frame_shape[0] + if is_placeholder: + cv2.putText(frame, "Not connected", (50, 50), + cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 2) + frames_list.append((frame, sid, orig_w, orig_h, transform_info, keypoints, scores)) + return frames_list + return None + + def build_mosaic(self, frames_list): + target_w, target_h = self.frame_size + mosaic = np.zeros((self.mosaic_rows * target_h, self.mosaic_cols * target_w, 3), dtype=np.uint8) + subinfo = {} + for frame_tuple in frames_list: + frame, *others = frame_tuple + info = self.mosaic_subinfo[others[0]] + x_off = info["x_offset"] + y_off = info["y_offset"] + mosaic[y_off:y_off+target_h, x_off:x_off+target_w] = frame + subinfo[others[0]] = { + "x_offset": x_off, + "y_offset": y_off, + "scaled_w": target_w, + "scaled_h": target_h, + "orig_w": others[1], + "orig_h": others[2], + "transform_info": others[3] + } + return mosaic, subinfo + + def read_mosaic(self, mosaic_np, subinfo, keypoints, scores): + frames = {} + recovered_keypoints = {} + recovered_scores = {} + + for s in self.sources: + sid = s['id'] + if sid not in subinfo: + continue + info = subinfo[sid] + x_off = info["x_offset"] + y_off = info["y_offset"] + sc_w = info["scaled_w"] + sc_h = info["scaled_h"] + orig_w = info["orig_w"] + orig_h = info["orig_h"] + + frame_region = mosaic_np[y_off:y_off+sc_h, x_off:x_off+sc_w].copy() + frames[sid] = frame_region + + rec_kpts = [] + rec_scores = [] + for p in range(len(keypoints)): + kp_person = keypoints[p] + sc_person = scores[p] + + center_x = np.mean([x for (x, y) in kp_person]) + center_y = np.mean([y for (x, y) in kp_person]) + + if x_off <= center_x <= x_off + sc_w and y_off <= center_y <= y_off + sc_h: + local_kpts = [] + local_scores = [] - # Save to json - if 'openpose' in output_format: - json_file_path = os.path.join(json_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.json') - save_to_openpose(json_file_path, keypoints, scores) - - # Draw skeleton on the frame - if display_detection or save_video or save_images: - # try: - # # MMPose skeleton - # img_show = frame.copy() - # img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low - # except: - # Sports2D skeleton - valid_X, valid_Y, valid_scores = [], [], [] - for person_keypoints, person_scores in zip(keypoints, scores): - person_X, person_Y = person_keypoints[:, 0], person_keypoints[:, 1] - valid_X.append(person_X) - valid_Y.append(person_Y) - valid_scores.append(person_scores) - img_show = frame.copy() - img_show = draw_bounding_box(img_show, valid_X, valid_Y, colors=colors, fontSize=2, thickness=thickness) - img_show = draw_keypts(img_show, valid_X, valid_Y, valid_scores, cmap_str='RdYlGn') - img_show = draw_skel(img_show, valid_X, valid_Y, pose_model) + for (xk, yk), scv in zip(kp_person, sc_person): + x_local = (xk - x_off) * (orig_w / float(sc_w)) + y_local = (yk - y_off) * (orig_h / float(sc_h)) + local_kpts.append([x_local, y_local]) + local_scores.append(scv) + rec_kpts.append(np.array(local_kpts)) + rec_scores.append(np.array(local_scores)) + recovered_keypoints[sid] = rec_kpts + recovered_scores[sid] = rec_scores + + return frames, recovered_keypoints, recovered_scores + + def stop(self): + self.stopped = True + + +class FrameQueueProcessor(multiprocessing.Process, BaseSynchronizer): + def __init__(self, frame_queue, pose_queue, **kwargs): + multiprocessing.Process.__init__(self) + BaseSynchronizer.__init__(self, **kwargs) + self.frame_queue = frame_queue + self.pose_queue = pose_queue + + def run(self): + while not self.stopped: + try: + buffer_name, idx, frame_shape, frame_dtype, sid, is_placeholder, _ = self.frame_queue.get_nowait() + if idx not in self.sync_data: + self.sync_data[idx] = {} + self.sync_data[idx][sid] = (buffer_name, frame_shape, frame_dtype, None, None, is_placeholder, None) - if display_detection: - cv2.imshow(f"Pose Estimation {os.path.basename(video_path)}", img_show) - if cv2.waitKey(1) & 0xFF == ord('q'): + if frames_list := self.get_frames_list(): + mosaic, subinfo = self.build_mosaic(frames_list) + pose_buf_name = self.available_pose_buffers.get_nowait() + np.ndarray(mosaic.shape, dtype=mosaic.dtype, buffer=self.pose_buffers[pose_buf_name].buf)[:] = mosaic + self.pose_queue.put((pose_buf_name, idx, mosaic.shape, mosaic.dtype.str, subinfo, False)) + except queue.Empty: + time.sleep(0.005) + + +class ResultQueueProcessor(multiprocessing.Process, BaseSynchronizer): + def __init__(self, result_queue, pose_model, **kwargs): + multiprocessing.Process.__init__(self) + BaseSynchronizer.__init__(self, **kwargs) + self.result_queue = result_queue + self.pose_model = pose_model + + def run(self): + self.init_video_writers() + + while not self.stopped: + try: + result_item = self.result_queue.get_nowait() + self.process_result_item(result_item) + except queue.Empty: + time.sleep(0.005) + + for vw in self.video_writers.values(): + vw.release() + cv2.destroyAllWindows() + + def init_video_writers(self): + if self.save_video and self.source_outputs: + for s in self.sources: + sid = s['id'] + out_dirs = self.source_outputs[sid] + output_video_path = out_dirs[-2] + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + self.video_writers[sid] = cv2.VideoWriter( + output_video_path, + fourcc, + self.frame_rate, + (self.frame_size[0], self.frame_size[1]) + ) + + def process_result_item(self, result_item): + buffer_name, idx, frame_shape, frame_dtype, info, is_placeholder, keypoints, scores = result_item + if self.combined_frames: + self.handle_combined_frames(buffer_name, frame_shape, frame_dtype, keypoints, scores, info, idx) + else: + self.handle_individual_frames(buffer_name, frame_shape, frame_dtype, info, is_placeholder, keypoints, scores, idx) + + def handle_combined_frames(self, buffer_name, frame_shape, frame_dtype, keypoints, scores, subinfo, idx): + shm = self.pose_buffers[buffer_name] + mosaic = np.ndarray(frame_shape, dtype=frame_dtype, buffer=shm.buf) + + if self.save_images or self.save_video or self.display_detection: + mosaic = draw_skeleton(mosaic, keypoints, scores) + if self.display_detection: + self.show_mosaic(mosaic) + + frames, recovered_keypoints, recovered_scores = self.read_mosaic(mosaic, subinfo, keypoints, scores) + self.available_pose_buffers.put(buffer_name) + + for sid in frames: + trans_info = subinfo[sid].get("transform_info", None) + self.handle_output(sid, frames[sid], recovered_keypoints[sid], recovered_scores[sid], idx, trans_info) + + def handle_individual_frames(self, buffer_name, frame_shape, frame_dtype, sid, is_placeholder, keypoints, scores, idx): + if idx not in self.sync_data: + self.sync_data[idx] = {} + self.sync_data[idx][sid] = (buffer_name, frame_shape, frame_dtype, keypoints, scores, is_placeholder, None) + frames_list = self.get_frames_list() + if frames_list: + annotated_frames = [] + for (frame, sid, orig_w, orig_h, transform_info, *others) in frames_list: + if others[0] is not None: + if self.save_images or self.save_video or self.display_detection: + frame = draw_skeleton(frame, others[0], others[1]) + annotated_frames.append((frame, sid, orig_w, orig_h, transform_info)) + self.handle_output(sid, frame, others[0], others[1], idx, transform_info) + + if self.display_detection: + mosaic, _ = self.build_mosaic(annotated_frames) + self.show_mosaic(mosaic) + + def draw_skeleton(self, frame, keypoints, scores): + # try: + # # MMPose skeleton + # frame = draw_skeleton(frame, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low + # except: + # Sports2D skeleton + valid_X, valid_Y, valid_scores = [], [], [] + for person_keypoints, person_scores in zip(keypoints, scores): + person_X, person_Y = person_keypoints[:, 0], person_keypoints[:, 1] + valid_X.append(person_X) + valid_Y.append(person_Y) + valid_scores.append(person_scores) + if self.multi_person: frame = draw_bounding_box(frame, valid_X, valid_Y, colors=colors, fontSize=2, thickness=thickness) + frame = draw_keypts(frame, valid_X, valid_Y, valid_scores, cmap_str='RdYlGn') + frame = draw_skel(frame, valid_X, valid_Y, self.pose_model) + return frame + + def show_mosaic(self, mosaic): + desired_w, desired_h = 1280, 720 + mosaic = transform(mosaic, desired_w, desired_h, False)[0] + cv2.imshow("Display", mosaic) + key = cv2.waitKey(1) + if key in [ord('q'), 27]: + logging.info("User closed display.") + self.stop() + + def handle_output(self, sid, frame, keypoints, scores, idx, transform_info): + out_dirs = self.source_outputs[sid] + file_name = f"{out_dirs[0]}_{idx}" + + # Save to json + if transform_info is not None: + keypoints = inverse_transform_keypoints(keypoints, transform_info) + if 'openpose' in self.output_format: + json_path = os.path.join(out_dirs[2], f"{file_name}.json") + save_keypoints_to_openpose(json_path, keypoints, scores) + if self.save_images: + cv2.imwrite(os.path.join(out_dirs[1], f"{file_name}.jpg"), frame) + if self.save_video: + if sid in self.video_writers: + self.video_writers[sid].write(frame) + + +class CaptureCoordinator(multiprocessing.Process): + def __init__(self, **kwargs): + super().__init__() + self.__dict__.update(kwargs) + + self.min_interval = 1.0 / self.frame_rate if self.frame_rate > 0 else 0.0 + self.stopped = multiprocessing.Value('b', False) + + def run(self): + self.tracker_ready_event.wait() + last_capture_time = time.time() + while not self.stopped.value: + if all(self.source_ended[s['id']] for s in self.sources): + break + + now = time.time() + elapsed = now - last_capture_time + if elapsed < self.min_interval: + time.sleep(self.min_interval - elapsed) + + def is_ready_source(s): + sid = s['id'] + if self.source_ended[sid]: + return False + if s['type'] == 'webcam': + return bool(self.webcam_ready[sid]) + return True + + ready_srcs = [s for s in self.sources if is_ready_source(s)] + + if self.mode == 'continuous': + if self.available_frame_buffers.qsize() >= len(ready_srcs): + buffs = [self.available_frame_buffers.get() for _ in ready_srcs] + for i, src in enumerate(ready_srcs): + sid = src['id'] + self.command_queues[sid].put(("CAPTURE_FRAME", buffs[i])) + last_capture_time = time.time() + else: + time.sleep(0.005) + + elif self.mode == 'alternating': + chunk = self.available_frame_buffers.qsize() + if chunk <= 0: + time.sleep(0.005) + continue + + frames_sent = 0 + src_index = 0 + sources_requested = [] + + ready_count = len(ready_srcs) + while frames_sent < chunk and not all(self.source_ended[s['id']] for s in self.sources): + if ready_count == 0: break + src = ready_srcs[src_index] + sid = src['id'] + buf_name = self.available_frame_buffers.get() + self.command_queues[sid].put(("CAPTURE_FRAME", buf_name)) + sources_requested.append(sid) + frames_sent += 1 - if save_video: - out.write(img_show) + src_index = (src_index + 1) % ready_count - if save_images: - if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir) - cv2.imwrite(os.path.join(img_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.jpg'), img_show) + last_capture_time = time.time() + if frames_sent > 0: + self.wait_until_processed(frames_sent, sources_requested) - frame_idx += 1 - pbar.update(1) + else: + time.sleep(0.01) - cap.release() - if save_video: - out.release() - logging.info(f"--> Output video saved to {output_video_path}.") - if save_images: - logging.info(f"--> Output images saved to {img_output_dir}.") - if display_detection: - cv2.destroyAllWindows() + self.stop() + def wait_until_processed(self, total_frames, sources_list): + needed_counts = {} + old_counts = {} -def process_images(image_folder_path, vid_img_extension, pose_tracker, pose_model, output_format, fps, save_video, save_images, display_detection, frame_range, multi_person, tracking_mode, deepsort_tracker): - ''' - Estimate pose estimation from a folder of images - - INPUTS: - - image_folder_path: str. Path to the input image folder - - vid_img_extension: str. Extension of the image files - - pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib - - pose_model: str. The pose model to use for pose estimation (HALPE_26, COCO_133, COCO_17) - - output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut') - - save_video: bool. Whether to save the output video - - save_images: bool. Whether to save the output images - - display_detection: bool. Whether to show real-time visualization - - frame_range: list. Range of frames to process - - multi_person: bool. Whether to detect multiple people in the video - - tracking_mode: str. The tracking mode to use for person tracking (deepsort, sports2d) - - deepsort_tracker: DeepSort tracker object or None + for sid in sources_list: + needed_counts[sid] = needed_counts.get(sid, 0) + 1 - OUTPUTS: - - JSON files with the detected keypoints and confidence scores in the OpenPose format - - if save_video: Video file with the detected keypoints and confidence scores drawn on the frames - - if save_images: Image files with the detected keypoints and confidence scores drawn on the frames - ''' - - pose_dir = os.path.abspath(os.path.join(image_folder_path, '..', '..', 'pose')) - if not os.path.isdir(pose_dir): os.makedirs(pose_dir) - json_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_json') - output_video_path = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_pose.mp4') - img_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_img') - - image_files = glob.glob(os.path.join(image_folder_path, '*'+vid_img_extension)) - sorted(image_files, key=natural_sort_key) - - if save_video: # Set up video writer - logging.warning('Using default framerate of 60 fps.') - fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video - W, H = cv2.imread(image_files[0]).shape[:2][::-1] # Get the width and height from the first image (assuming all images have the same size) - out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file - - if display_detection: - cv2.namedWindow(f"Pose Estimation {os.path.basename(image_folder_path)}", cv2.WINDOW_NORMAL) - - f_range = [[len(image_files)] if frame_range==[] else frame_range][0] - for frame_idx, image_file in enumerate(tqdm(image_files, desc=f'\nProcessing {os.path.basename(img_output_dir)}')): - if frame_idx in range(*f_range): - try: - frame = cv2.imread(image_file) - frame_idx += 1 - except: - raise NameError(f"{image_file} is not an image. Videos must be put in the video directory, not in subdirectories.") + for sid in needed_counts.keys(): + old_counts[sid] = self.shared_counts[sid]['processed'].value + + while not self.stopped.value: + done = 0 + for sid, needed_val in needed_counts.items(): + current_processed = self.shared_counts[sid]['processed'].value + if current_processed >= old_counts[sid] + needed_val: + done += needed_val + if done >= total_frames: + break + time.sleep(0.01) + + def stop(self): + self.stopped.value = True + for src in self.sources: + self.command_queues[src['id']].put(None) + + +class MediaSource(multiprocessing.Process): + def __init__(self, **kwargs): + super().__init__() + self.__dict__.update(kwargs) + + self.frame_idx = 0 + self.total_frames = 0 + self.cap = None + self.image_files = [] + self.stopped = False + + def run(self): + try: + # Open the specific source + if self.source['type'] == 'webcam': + self.open_webcam() + self.shared_counts[self.source['id']]['total'].value = 0 + if self.webcam_ready is not None: + self.webcam_ready[self.source['id']] = (self.cap is not None and self.cap.isOpened()) + + if self.webcam_recording and self.cap is not None and self.cap.isOpened() and self.output_raw_video_path: + raw_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + raw_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + self.raw_writer = cv2.VideoWriter(self.source_outputs[-1], fourcc, 30, (raw_width, raw_height)) - # Detect poses - keypoints, scores = pose_tracker(frame) - - # Track poses across frames - if multi_person: - if tracking_mode == 'deepsort': - keypoints, scores = sort_people_deepsort(keypoints, scores, deepsort_tracker, frame, frame_idx) - if tracking_mode == 'sports2d': - if 'prev_keypoints' not in locals(): prev_keypoints = keypoints - prev_keypoints, keypoints, scores = sort_people_sports2d(prev_keypoints, keypoints, scores=scores) - - # Extract frame number from the filename - if 'openpose' in output_format: - json_file_path = os.path.join(json_output_dir, f"{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.json") - save_to_openpose(json_file_path, keypoints, scores) + elif self.source['type'] == 'video': + self.open_video() + if self.frame_ranges: + self.total_frames = len(self.frame_ranges) + else: + self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) + self.shared_counts[self.source['id']]['total'].value = self.total_frames - # Draw skeleton on the image - if display_detection or save_video or save_images: + elif self.source['type'] == 'images': + self.load_images() + self.shared_counts[self.source['id']]['total'].value = self.total_frames + + while not self.stopped: try: - # MMPose skeleton - img_show = frame.copy() - img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low - except: - # Sports2D skeleton - valid_X, valid_Y, valid_scores = [], [], [] - for person_keypoints, person_scores in zip(keypoints, scores): - person_X, person_Y = person_keypoints[:, 0], person_keypoints[:, 1] - valid_X.append(person_X) - valid_Y.append(person_Y) - valid_scores.append(person_scores) - img_show = frame.copy() - img_show = draw_bounding_box(img_show, valid_X, valid_Y, colors=colors, fontSize=2, thickness=thickness) - img_show = draw_keypts(img_show, valid_X, valid_Y, valid_scores, cmap_str='RdYlGn') - img_show = draw_skel(img_show, valid_X, valid_Y, pose_model) - - if display_detection: - cv2.imshow(f"Pose Estimation {os.path.basename(image_folder_path)}", img_show) - if cv2.waitKey(1) & 0xFF == ord('q'): + cmd = self.command_queue.get(timeout=0.1) + except queue.Empty: + continue + + if cmd is None: break - if save_video: - out.write(img_show) + if isinstance(cmd, tuple): + if cmd[0] == "CAPTURE_FRAME": + buffer_name = cmd[1] + # Stop if we've reached total frames for video/images + if self.source['type'] in ('video', 'images'): + if self.frame_idx >= self.total_frames: + self.stop() + break - if save_images: - if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir) - cv2.imwrite(os.path.join(img_output_dir, f'{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.png'), img_show) + # Read frame and send it + frame, is_placeholder = self.read_frame() + if frame is None: + self.stop() + break - if save_video: - logging.info(f"--> Output video saved to {output_video_path}.") - if save_images: - logging.info(f"--> Output images saved to {img_output_dir}.") - if display_detection: - cv2.destroyAllWindows() + with self.shared_counts[self.source['id']]['processed'].get_lock(): + self.shared_counts[self.source['id']]['processed'].value += 1 + + frame, transform_info = transform(frame, self.frame_size[0], self.frame_size[1], True, self.rotation) + + self.send_frame(frame, buffer_name, is_placeholder, transform_info) + + elif cmd[0] == "STOP_CAPTURE": + self.stop() + break + else: + pass + + logging.info(f"MediaSource {self.source['id']} ended.") + + with self.active_sources.get_lock(): + self.active_sources.value -= 1 + + if self.source_ended is not None: + self.source_ended[self.source['id']] = True + + except Exception as e: + logging.error(f"MediaSource {self.source['id']} error: {e}") + self.stop() + with self.active_sources.get_lock(): + self.active_sources.value -= 1 + if self.source_ended is not None: + self.source_ended[self.source['id']] = True + + def open_webcam(self): + self.cap = cv2.VideoCapture(int(self.source['id']), cv2.CAP_DSHOW) + + if not self.cap or not self.cap.isOpened(): + logging.warning(f"Unable to open webcam {self.source['id']}.") + self.cap = None + else: + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_size[0]) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_size[1]) + logging.info(f"Webcam {self.source['id']} ready.") + time.sleep(1) + + def open_video(self): + self.cap = cv2.VideoCapture(self.source['path']) + if not self.cap.isOpened(): + logging.error(f"Cannot open video file: {self.source['path']}") + self.stop() + + def load_images(self): + pattern = os.path.join(self.source['path'], f"*{self.vid_img_extension}") + self.image_files = sorted(glob.glob(pattern), key=natural_sort_key) + self.total_frames = len(self.image_files) + + def read_frame(self): + if self.source['type'] == 'video': + if not self.cap: + return None, False + if self.frame_ranges and self.frame_idx not in self.frame_ranges: + self.frame_idx += 1 + return self.read_frame() + + ret, frame = self.cap.read() + if not ret or frame is None: + logging.info(f"Video finished: {self.source['path']}") + self.stop() + return None, False + + return frame, False + + elif self.source['type'] == 'images': + if self.frame_idx < len(self.image_files): + frame = cv2.imread(self.image_files[self.frame_idx]) + if frame is None: + return None, False + + self.frame_idx += 1 + return frame, False + else: + self.stop() + return None, False + + elif self.source['type'] == 'webcam': + if self.cap is None or not self.cap.isOpened(): + if self.webcam_ready is not None: + self.webcam_ready[self.source['id']] = False + + self.open_webcam() + + if self.cap is None or not self.cap.isOpened(): + placeholder_frame = np.zeros((self.frame_size[1], self.frame_size[0], 3), dtype=np.uint8) + return placeholder_frame, True + + ret, frame = self.cap.read() + if not ret or frame is None: + logging.warning(f"Failed to read from webcam {self.source['id']}. Reconnecting loop.") + self.cap.release() + self.cap = None + placeholder_frame = np.zeros((self.frame_size[1], self.frame_size[0], 3), dtype=np.uint8) + return placeholder_frame, True + + if self.webcam_ready is not None: + self.webcam_ready[self.source['id']] = True + + return frame, False + + return None, False + + def send_frame(self, frame, buffer_name, is_placeholder, transform_info): + shm = self.frame_buffers[buffer_name] + np_frame = np.ndarray(frame.shape, dtype=frame.dtype, buffer=shm.buf) + np_frame[:] = frame + + item = ( + buffer_name, + self.frame_idx if self.source['type'] != 'webcam' else get_formatted_timestamp(), + frame.shape, + frame.dtype.str, + self.source['id'], + is_placeholder, + transform_info + ) + self.frame_queue.put(item) + + if not is_placeholder: + with self.shared_counts[self.source['id']]['queued'].get_lock(): + self.shared_counts[self.source['id']]['queued'].value += 1 + self.frame_idx += 1 + + def stop(self): + self.stopped = True + if self.cap: + self.cap.release() def estimate_pose_all(config_dict): ''' - Estimate pose from a video file or a folder of images and - write the results to JSON files, videos, and/or images. - Results can optionally be displayed in real time. + Estimate pose from webcams, video files, or a folder of images, and write the results to JSON files, videos, and/or images. + Results can optionally be displayed in real-time. Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body) - Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you - need nother detection or pose models) + Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you need another detection or pose models) Optionally gives consistent person ID across frames (slower but good for 2D analysis) - Optionally runs detection every n frames and inbetween tracks points (faster but less accurate). + Optionally runs detection every n frames and in between tracks points (faster but less accurate). - If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise, - uses the CPU with the OpenVINO backend. + If a valid CUDA installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise, uses the CPU with the OpenVINO backend. INPUTS: - videos or image folders from the video directory @@ -437,92 +853,602 @@ def estimate_pose_all(config_dict): OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - - Optionally, videos and/or image files with the detected keypoints + - Optionally, videos and/or image files with the detected keypoints ''' # Read config - project_dir = config_dict['project']['project_dir'] - # if batch - session_dir = os.path.realpath(os.path.join(project_dir, '..')) - # if single trial - session_dir = session_dir if 'Config.toml' in os.listdir(session_dir) else os.getcwd() - frame_range = config_dict.get('project').get('frame_range') - multi_person = config_dict.get('project').get('multi_person') - video_dir = os.path.join(project_dir, 'videos') - pose_dir = os.path.join(project_dir, 'pose') - - pose_model = config_dict['pose']['pose_model'] - mode = config_dict['pose']['mode'] # lightweight, balanced, performance + output_dir = config_dict['project']['project_dir'] + source_dir = os.path.join(output_dir, 'videos') + pose_dir = os.path.join(output_dir, 'pose') + overwrite_pose = config_dict['pose'].get('overwrite_pose', False) + + # Check for existing results + if os.path.exists(pose_dir) and not overwrite_pose: + logging.info('Skipping pose estimation as it has already been done. ' + 'Set overwrite_pose to true in Config.toml if you want to run it again.') + return + elif overwrite_pose: + logging.info("Overwriting existing pose estimation results...") + + logging.info("Starting pose estimation...") + display_detection = config_dict['pose'].get('display_detection', False) vid_img_extension = config_dict['pose']['vid_img_extension'] - - output_format = config_dict['pose']['output_format'] - save_video = True if 'to_video' in config_dict['pose']['save_video'] else False - save_images = True if 'to_images' in config_dict['pose']['save_video'] else False - display_detection = config_dict['pose']['display_detection'] - overwrite_pose = config_dict['pose']['overwrite_pose'] - det_frequency = config_dict['pose']['det_frequency'] - tracking_mode = config_dict.get('pose').get('tracking_mode') - if tracking_mode == 'deepsort' and multi_person: - deepsort_params = config_dict.get('pose').get('deepsort_params') - try: - deepsort_params = ast.literal_eval(deepsort_params) - except: # if within single quotes instead of double quotes when run with sports2d --mode """{dictionary}""" - deepsort_params = deepsort_params.strip("'").replace('\n', '').replace(" ", "").replace(",", '", "').replace(":", '":"').replace("{", '{"').replace("}", '"}').replace('":"/',':/').replace('":"\\',':\\') - deepsort_params = re.sub(r'"\[([^"]+)",\s?"([^"]+)\]"', r'[\1,\2]', deepsort_params) # changes "[640", "640]" to [640,640] - deepsort_params = json.loads(deepsort_params) - deepsort_tracker = DeepSort(**deepsort_params) + webcam_ids = config_dict['pose'].get('webcam_ids', []) + capture_mode = config_dict['pose'].get('capture_mode', 'continuous') + save_files = config_dict['pose'].get('save_video', []) + save_images = ('to_images' in save_files) + save_video = ('to_video' in save_files) + combined_frames = config_dict['pose'].get('combined_frames', False) + multi_workers = config_dict['pose'].get('multi_workers', False) + multi_person = config_dict['project'].get('multi_person', False) + output_format = config_dict['project'].get('output_format', 'openpose') + rotation = config_dict['pose'].get('rotation', 0) + webcam_recording = config_dict['pose'].get('webcam_recording', False) + + # Gather sources + sources = [] + if vid_img_extension == 'webcam': + sources.extend({ + 'type': 'webcam', + 'id': cam_id, + 'path': cam_id + } for cam_id in (webcam_ids if isinstance(webcam_ids, list) else [webcam_ids])) else: - deepsort_tracker = None - backend = config_dict['pose']['backend'] - device = config_dict['pose']['device'] + video_files = sorted([str(f) for f in Path(source_dir).rglob('*' + vid_img_extension) if f.is_file()]) + sources.extend({ + 'type': 'video', + 'id': idx, + 'path': video_path + } for idx, video_path in enumerate(video_files)) + + image_dirs = sorted([str(f) for f in Path(source_dir).iterdir() if f.is_dir()]) + sources.extend({ + 'type': 'images', + 'id': idx, + 'path': folder + } for idx, folder in enumerate(image_dirs, start=len(video_files))) + + if not sources: + raise FileNotFoundError(f"\nNo sources found in {source_dir} matching extension '{vid_img_extension}'.") + + logging.info(f"Sources: {sources}") + + # Pose tracker settings + ModelClass, det_frequency, mode, to_openpose, backend, device, pose_model, frame_size = determine_tracker_settings(config_dict) + + logging.info(f"Model input size: {frame_size[0]}x{frame_size[1]}") + + # Handle frame_range + def parse_frame_ranges(frame_range): + if not frame_range: + return None + if len(frame_range) == 2 and all(isinstance(x, int) for x in frame_range): + start, end = frame_range + return set(range(start, end + 1)) + return set(frame_range) + + frame_range = None + if vid_img_extension != 'webcam': + frame_range = parse_frame_ranges(config_dict['project'].get('frame_range', [])) + + # Determine FPS if 'auto' + frame_rate = config_dict['project'].get('frame_rate', 30) + if str(frame_rate).lower() == 'auto': + frame_rate = find_lowest_fps(sources) + logging.info(f"Auto-detected lowest frame rate: {frame_rate} fps") + else: + logging.info(f"Using user-defined frame rate: {frame_rate} fps") - # Determine frame rate - video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension)) - frame_rate = config_dict.get('project').get('frame_rate') - if frame_rate == 'auto': - try: - cap = cv2.VideoCapture(video_files[0]) - if not cap.isOpened(): - raise FileNotFoundError(f'Error: Could not open {video_files[0]}. Check that the file exists.') - frame_rate = round(cap.get(cv2.CAP_PROP_FPS)) - if frame_rate == 0: - frame_rate = 30 - logging.warning(f'Error: Could not retrieve frame rate from {video_files[0]}. Defaulting to 30fps.') - except: - frame_rate = 30 + # Prepare shared memory + num_sources = len(sources) + + fw, fh = find_largest_frame_size(sources, vid_img_extension) + + if fw >= fh: + mosaic_cols = math.ceil(math.sqrt(num_sources)) + mosaic_rows = math.ceil(num_sources / mosaic_cols) + else: + mosaic_rows = math.ceil(math.sqrt(num_sources)) + mosaic_cols = math.ceil(num_sources / mosaic_rows) + + if combined_frames: + cell_w = frame_size[0] // mosaic_cols + cell_h = frame_size[1] // mosaic_rows - # Set detection frequency - if det_frequency>1: - logging.info(f'Inference run only every {det_frequency} frames. Inbetween, pose estimation tracks previously detected points.') - elif det_frequency==1: - logging.info(f'Inference run on every single frame.') + logging.info(f"Combined frames: {mosaic_rows} rows & {mosaic_cols} cols") + logging.info(f"Frame input size: {cell_w}x{cell_h}") + + frame_size = (cell_w, cell_h) else: - raise ValueError(f"Invalid det_frequency: {det_frequency}. Must be an integer greater or equal to 1.") + frame_size = frame_size + + mosaic_subinfo = {} + for i, s in enumerate(sources): + sid = s['id'] + r = i // mosaic_cols + c = i % mosaic_cols + if combined_frames: + x_off = c * cell_w + y_off = r * cell_h + mosaic_subinfo[sid] = {"x_offset": x_off, "y_offset": y_off, "scaled_w": cell_w, "scaled_h": cell_h} + else: + x_off = c * frame_size[0] + y_off = r * frame_size[1] + mosaic_subinfo[sid] = {"x_offset": x_off, "y_offset": y_off, "scaled_w": frame_size[0], "scaled_h": frame_size[1]} + + available_memory = psutil.virtual_memory().available + frame_bytes = frame_size[0] * frame_size[1] * 3 + n_buffers_total = int((available_memory / 2) / (frame_bytes * (num_sources / (num_sources + 1)))) + + frame_buffer_count = 0 + + if not combined_frames: + frame_buffer_count = n_buffers_total + + logging.info(f"Allocating {frame_buffer_count} buffers.") + else: + frame_buffer_count = num_sources * 3 + + logging.info(f"Allocating {frame_buffer_count} frame buffers.") + + pose_buffer_count = n_buffers_total - frame_buffer_count + + logging.info(f"Allocating {pose_buffer_count} pose buffers.") + + frame_queue = multiprocessing.Queue(maxsize=frame_buffer_count) + pose_queue = multiprocessing.Queue() + if combined_frames: + pose_queue = multiprocessing.Queue(maxsize=pose_buffer_count) + result_queue = multiprocessing.Queue() + + frame_buffers = {} + available_frame_buffers = multiprocessing.Queue() + + for i in range(frame_buffer_count): + buf_name = f"frame_{i}" + shm = shared_memory.SharedMemory(name=buf_name, create=True, size=frame_bytes) + frame_buffers[buf_name] = shm + available_frame_buffers.put(buf_name) + + pose_buffers = {} + available_pose_buffers = {} + + if combined_frames: + available_pose_buffers = multiprocessing.Queue() + for i in range(pose_buffer_count): + buf_name = f"pose_buffer_{i}" + shm = shared_memory.SharedMemory(name=buf_name, create=True, size=frame_bytes * num_sources) + pose_buffers[buf_name] = shm + available_pose_buffers.put(buf_name) + + # Prepare per-source counters + source_outputs = {} + shared_counts = {} + for s in sources: + shared_counts[s['id']] = { + 'queued': multiprocessing.Value('i', 0), + 'processed': multiprocessing.Value('i', 0), + 'total': multiprocessing.Value('i', 0) + } + + # Decide how many workers to start + cpu_count = multiprocessing.cpu_count() + + active_sources = multiprocessing.Value('i', len(sources)) + + if combined_frames: + initial_workers = max(1, cpu_count - len(sources) - 3) + else: + initial_workers = max(1, cpu_count - len(sources) - 2) + + if not multi_workers: + initial_workers = 1 + + logging.info(f"Starting {initial_workers} workers.") + + tracker_ready_event = multiprocessing.Event() + + def spawn_new_worker(): + worker = PoseEstimatorWorker( + queue=pose_queue if combined_frames else frame_queue, + result_queue=result_queue, + shared_counts=shared_counts, + ModelClass = ModelClass, + det_frequency=det_frequency, + mode=mode, + to_openpose=to_openpose, + backend=backend, + device=device, + buffers=pose_buffers if combined_frames else frame_buffers, + available_buffers=available_pose_buffers if combined_frames else available_frame_buffers, + active_sources=active_sources, + combined_frames=combined_frames, + tracker_ready_event=tracker_ready_event + ) + worker.start() + return worker + + + workers = [spawn_new_worker() for _ in range(initial_workers)] + + manager = multiprocessing.Manager() + webcam_ready = manager.dict() + source_ended = manager.dict() + command_queues = {} + + media_sources = [] + for s in sources: + source_ended[s['id']] = False + command_queues[s['id']] = manager.Queue() + out_dirs = create_output_folders(s['path'], pose_dir, save_images, webcam_recording) + source_outputs[s['id']] = out_dirs + + if s['type'] == 'webcam': + webcam_ready[s['id']] = False + + ms = MediaSource( + source = s, + frame_queue = frame_queue, + frame_buffers = frame_buffers, + shared_counts = shared_counts, + frame_size = frame_size, + active_sources = active_sources, + command_queue = command_queues[s['id']], + vid_img_extension = vid_img_extension, + frame_ranges = frame_range, + webcam_ready = webcam_ready, + source_ended = source_ended, + rotation = rotation, + webcam_recording = webcam_recording, + ) + ms.start() + media_sources.append(ms) + + result_processor = ResultQueueProcessor( + result_queue = result_queue, + pose_model = pose_model, + sources = sources, + frame_buffers = frame_buffers, + pose_buffers = pose_buffers, + available_frame_buffers = available_frame_buffers, + available_pose_buffers = available_pose_buffers, + vid_img_extension= vid_img_extension, + source_outputs = source_outputs, + shared_counts = shared_counts, + save_images = save_images, + save_video = save_video, + frame_size = frame_size, + frame_rate = frame_rate, + combined_frames = combined_frames, + multi_person = multi_person, + output_format = output_format, + display_detection= display_detection, + mosaic_cols=mosaic_cols, + mosaic_rows=mosaic_rows, + mosaic_subinfo=mosaic_subinfo + ) + result_processor.start() + + if combined_frames: + frame_processor = FrameQueueProcessor( + frame_queue = frame_queue, + pose_queue = pose_queue, + sources = sources, + frame_buffers = frame_buffers, + pose_buffers = pose_buffers, + available_frame_buffers = available_frame_buffers, + available_pose_buffers = available_pose_buffers, + vid_img_extension = vid_img_extension, + source_outputs = source_outputs, + shared_counts = shared_counts, + save_images = save_images, + save_video = save_video, + frame_size = frame_size, + frame_rate = frame_rate, + combined_frames = combined_frames, + multi_person = multi_person, + output_format = output_format, + display_detection = display_detection, + mosaic_cols=mosaic_cols, + mosaic_rows=mosaic_rows, + mosaic_subinfo=mosaic_subinfo + ) + frame_processor.start() + + # Start capture coordinator + capture_coordinator = CaptureCoordinator( + sources=sources, + command_queues=command_queues, + available_frame_buffers=available_frame_buffers, + shared_counts=shared_counts, + source_ended=source_ended, + webcam_ready=webcam_ready, + frame_rate=frame_rate if vid_img_extension == 'webcam' else 0, + mode=capture_mode, + tracker_ready_event=tracker_ready_event + ) + capture_coordinator.start() + + # Setup progress bars + progress_bars = {} + bar_ended_state = {} + bar_pos = 0 + + source_type_map = {s['id']: s['type'] for s in sources} + + if vid_img_extension == 'webcam': + cv2.namedWindow("StopWindow", cv2.WINDOW_NORMAL) + + for s in sources: + sid = s['id'] + if s['type'] == 'webcam': + pb = tqdm( + total=0, + desc=f"\033[32mWebcam {sid} (not connected)\033[0m", + position=bar_pos, + leave=True, + bar_format="{desc}" + ) + progress_bars[sid] = pb + bar_ended_state[sid] = False + else: + pb = tqdm( + total=0, + desc=f"\033[32mSource {sid}\033[0m", + position=bar_pos, + leave=True + ) + progress_bars[sid] = pb + bar_ended_state[sid] = False + + bar_pos += 1 + + frame_buffer_bar = tqdm( + total=frame_buffer_count, + desc='Frame Buffers Free', + position=bar_pos, + leave=True, + colour='blue' + ) + bar_pos += 1 + + if combined_frames: + pose_buffer_bar = tqdm( + total=pose_buffer_count, + desc='Pose Buffers Free', + position=bar_pos, + leave=True, + colour='blue' + ) + bar_pos += 1 + + if multi_workers: + worker_bar = tqdm( + total=len(workers), + desc='Active Workers', + position=bar_pos, + leave=True, + colour='blue' + ) + bar_pos += 1 + + previous_ended_count = 0 + try: + while True: + # Update progress bars + for sid, pb in progress_bars.items(): + s_type = source_type_map[sid] + cnts = shared_counts[sid] + pval = cnts['processed'].value + qval = cnts['queued'].value + tval = cnts['total'].value + + if s_type == 'webcam': + connected = webcam_ready[sid] + if connected: + pb.set_description_str( + f"\033[32mWebcam {sid}\033[0m : {pval}/{qval} processed/read" + ) + else: + pb.set_description_str( + f"\033[31mWebcam {sid} (not connected)\033[0m : {pval}/{qval}" + ) + pb.refresh() + + if source_ended[sid] and not bar_ended_state[sid]: + bar_ended_state[sid] = True + pb.set_description_str( + f"\033[31mWebcam {sid} (Ended)\033[0m : {pval}/{qval}" + ) + pb.refresh() + + if check_stop_window_open(): + stop_all_cameras_immediately(sources, command_queues) + break + + else: + if tval > 0: + pb.total = tval + pb.n = pval + pb.refresh() + + if source_ended[sid] and not bar_ended_state[sid]: + bar_ended_state[sid] = True + pb.set_description_str(f"\033[31mSource {sid} (Ended)\033[0m") + pb.refresh() + + frame_buffer_bar.n = available_frame_buffers.qsize() + frame_buffer_bar.refresh() + + if combined_frames: + pose_buffer_bar.n = available_pose_buffers.qsize() + pose_buffer_bar.refresh() + + # Check if user closed the display + if result_processor and result_processor.stopped: + logging.info("\nUser closed display. Stopping all streams.") + capture_coordinator.stop() + for ms_proc in media_sources: + ms_proc.stop() + break + + alive_workers = sum(w.is_alive() for w in workers) + + if multi_workers: + worker_bar.n = alive_workers + worker_bar.refresh() + + # Possibly spawn new worker(s) if new sources ended + current_ended_count = sum(1 for s in sources if source_ended[s['id']]) + ended_delta = current_ended_count - previous_ended_count + if ended_delta > 0: + for _ in range(ended_delta): + logging.info("Spawning a new PoseEstimatorWorker.") + new_w = spawn_new_worker() + workers.append(new_w) + worker_bar.total = len(workers) + previous_ended_count = current_ended_count + + # If all sources ended, queue empty, and no alive workers => done + all_ended = all(source_ended[s['id']] for s in sources) + if all_ended and frame_queue.empty() and pose_queue.empty() and alive_workers == 0: + logging.info("All sources ended, queues empty, and worker finished. Exiting loop.") + break + + time.sleep(0.05) + + except KeyboardInterrupt: + logging.info("User interrupted pose estimation.") + capture_coordinator.stop() + for ms_proc in media_sources: + ms_proc.stop() + + finally: + # Stop capture coordinator + capture_coordinator.stop() + capture_coordinator.join() + if capture_coordinator.is_alive(): + capture_coordinator.terminate() + + # Stop media sources + for s in sources: + command_queues[s['id']].put(None) + + for ms in media_sources: + ms.join(timeout=2) + if ms.is_alive(): + ms.terminate() + + # Stop workers + for w in workers: + w.join(timeout=2) + if w.is_alive(): + logging.warning(f"Forcibly terminating worker {w.pid}") + w.terminate() + + # Free shared memory + for shm in frame_buffers.values(): + shm.close() + shm.unlink() + for shm in pose_buffers.values(): + shm.close() + shm.unlink() + + if combined_frames: + frame_processor.stop() + frame_processor.join(timeout=2) + if frame_processor.is_alive(): + frame_processor.terminate() + + result_processor.stop() + result_processor.join(timeout=2) + if result_processor.is_alive(): + result_processor.terminate() + + # Final bar updates + frame_buffer_bar.n = available_frame_buffers.qsize() + frame_buffer_bar.refresh() + if combined_frames: + pose_buffer_bar.n = available_pose_buffers.qsize() + pose_buffer_bar.refresh() + + for sid, pb in progress_bars.items(): + pb.close() + frame_buffer_bar.close() + if combined_frames: + pose_buffer_bar.close() + if multi_workers: + worker_bar.close() + + logging.info("Pose estimation done. Exiting now.") + logging.shutdown() + + if display_detection: + cv2.destroyAllWindows() + + +def create_output_folders(source_path, output_dir, save_images, webcam_recording): + ''' + Set up output directories for saving images and JSON files. + + Returns: + tuple: (output_dir, output_dir_name, img_output_dir, json_output_dir, output_video_path) + ''' + + if isinstance(source_path, int): + now = datetime.now().strftime("%Y%m%d_%H%M%S") + output_dir_name = f"webcam{source_path}_{now}" + else: + output_dir_name = os.path.basename(os.path.splitext(str(source_path))[0]) + + os.makedirs(output_dir, exist_ok=True) + json_output_dir = os.path.join(output_dir, f"{output_dir_name}_json") + os.makedirs(json_output_dir, exist_ok=True) + + img_output_dir = None + if save_images: + img_output_dir = os.path.join(output_dir, f"{output_dir_name}_img") + os.makedirs(img_output_dir, exist_ok=True) + + output_video_path = os.path.join(output_dir, f"{output_dir_name}_pose.avi") + + output_record_path = None + if webcam_recording: + output_video_path = os.path.join(output_dir, f"{output_dir_name}_record.avi") + + return output_dir_name, img_output_dir, json_output_dir, output_video_path, output_record_path + + +def determine_tracker_settings(config_dict): + det_frequency = config_dict['pose']['det_frequency'] + mode = config_dict['pose']['mode'] + pose_model = config_dict['pose']['pose_model'] + backend = config_dict['pose']['backend'] + device = config_dict['pose']['device'] # Select the appropriate model based on the model_type - logging.info('\nEstimating pose...') if pose_model.upper() in ('HALPE_26', 'BODY_WITH_FEET'): model_name = 'HALPE_26' ModelClass = BodyWithFeet # 26 keypoints(halpe26) - logging.info(f"Using HALPE_26 model (body and feet) for pose estimation.") + logging.info("Using HALPE_26 model (body and feet) for pose estimation.") elif pose_model.upper() in ('COCO_133', 'WHOLE_BODY', 'WHOLE_BODY_WRIST'): model_name = 'COCO_133' ModelClass = Wholebody - logging.info(f"Using COCO_133 model (body, feet, hands, and face) for pose estimation.") + logging.info("Using COCO_133 model (body, feet, hands, and face) for pose estimation.") elif pose_model.upper() in ('COCO_17', 'BODY'): model_name = 'COCO_17' ModelClass = Body - logging.info(f"Using COCO_17 model (body) for pose estimation.") - elif pose_model.upper() =='HAND': + logging.info("Using COCO_17 model (body) for pose estimation.") + elif pose_model.upper() == 'HAND': model_name = 'HAND_21' ModelClass = Hand - logging.info(f"Using HAND_21 model for pose estimation.") + logging.info("Using HAND_21 model for pose estimation.") elif pose_model.upper() =='FACE': model_name = 'FACE_106' - logging.info(f"Using FACE_106 model for pose estimation.") - elif pose_model.upper() =='ANIMAL': + logging.info("Using FACE_106 model for pose estimation.") + elif pose_model.upper() == 'ANIMAL': model_name = 'ANIMAL2D_17' - logging.info(f"Using ANIMAL2D_17 model for pose estimation.") + logging.info("Using ANIMAL2D_17 model for pose estimation.") else: model_name = pose_model.upper() logging.info(f"Using model {model_name} for pose estimation.") @@ -538,7 +1464,7 @@ def estimate_pose_all(config_dict): raise NameError(f'{pose_model} not found in skeletons.py nor in Config.toml') # Select device and backend - backend, device = setup_backend_device(backend=backend, device=device) + backend, device = init_backend_device(backend=backend, device=device) # Manually select the models if mode is a dictionary rather than 'lightweight', 'balanced', or 'performance' if not mode in ['lightweight', 'balanced', 'performance'] or 'ModelClass' not in locals(): @@ -557,56 +1483,136 @@ def estimate_pose_all(config_dict): pose_input_size = mode.get('pose_input_size') ModelClass = partial(Custom, - det_class=det_class, det=det, det_input_size=det_input_size, - pose_class=pose_class, pose=pose, pose_input_size=pose_input_size, - backend=backend, device=device) - + det_class=det_class, det=det, det_input_size=det_input_size, + pose_class=pose_class, pose=pose, pose_input_size=pose_input_size, + backend=backend, device=device) + except (json.JSONDecodeError, TypeError): logging.warning("\nInvalid mode. Must be 'lightweight', 'balanced', 'performance', or '''{dictionary}''' of parameters within triple quotes. Make sure input_sizes are within square brackets.") logging.warning('Using the default "balanced" mode.') mode = 'balanced' + logging.info(f'\nPose tracking set up for "{pose_model_name}" model.') + logging.info(f'Mode: {mode}.') - # Estimate pose - try: - pose_listdirs_names = next(os.walk(pose_dir))[1] - os.listdir(os.path.join(pose_dir, pose_listdirs_names[0]))[0] - if not overwrite_pose: - logging.info('Skipping pose estimation as it has already been done. Set overwrite_pose to true in Config.toml if you want to run it again.') - else: - logging.info('Overwriting previous pose estimation. Set overwrite_pose to false in Config.toml if you want to keep the previous results.') - raise - - except: - # Set up pose tracker - try: - pose_tracker = setup_pose_tracker(ModelClass, det_frequency, mode, False, backend, device) - except: - logging.error('Error: Pose estimation failed. Check in Config.toml that pose_model and mode are valid.') - raise ValueError('Error: Pose estimation failed. Check in Config.toml that pose_model and mode are valid.') - - if tracking_mode not in ['deepsort', 'sports2d']: - logging.warning(f"Tracking mode {tracking_mode} not recognized. Using sports2d method.") - tracking_mode = 'sports2d' - logging.info(f'\nPose tracking set up for "{pose_model_name}" model.') - logging.info(f'Mode: {mode}.') - logging.info(f'Tracking is done with {tracking_mode}{" " if not tracking_mode=="deepsort" else f" with parameters: {deepsort_params}"}.\n') - - video_files = sorted(glob.glob(os.path.join(video_dir, '*'+vid_img_extension))) - if not len(video_files) == 0: - # Process video files - logging.info(f'Found video files with {vid_img_extension} extension.') - for video_path in video_files: - pose_tracker.reset() - if tracking_mode == 'deepsort': deepsort_tracker.tracker.delete_all_tracks() - process_video(video_path, pose_tracker, pose_model, output_format, save_video, save_images, display_detection, frame_range, multi_person, tracking_mode, deepsort_tracker) + det_input_size = ModelClass.MODE[mode]['det_input_size'] - else: - # Process image folders - logging.info(f'Found image folders with {vid_img_extension} extension.') - image_folders = sorted([f for f in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, f))]) - for image_folder in image_folders: - pose_tracker.reset() - image_folder_path = os.path.join(video_dir, image_folder) - if tracking_mode == 'deepsort': deepsort_tracker.tracker.delete_all_tracks() - process_images(image_folder_path, vid_img_extension, pose_tracker, pose_model, output_format, frame_rate, save_video, save_images, display_detection, frame_range, multi_person, tracking_mode, deepsort_tracker) + return (ModelClass, det_frequency, mode, False, backend, device, pose_model, det_input_size) + + +def find_largest_frame_size(sources, vid_img_extension): + ''' + If input_size is not specified, find the maximum (width, height) + among all sources (videos, images, webcams). + ''' + + max_w, max_h = 0, 0 + + for s in sources: + # Handle each source type + if s['type'] == 'webcam': + cap_test = cv2.VideoCapture(int(s['id']), cv2.CAP_DSHOW) + if cap_test.isOpened(): + cap_test.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) + cap_test.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) + time.sleep(0.2) + ret, frame_test = cap_test.read() + if ret and frame_test is not None: + h, w = frame_test.shape[:2] + max_w = max(max_w, w) + max_h = max(max_h, h) + cap_test.release() + + elif s['type'] == 'video': + cap_test = cv2.VideoCapture(s['path']) + if cap_test.isOpened(): + w = int(cap_test.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(cap_test.get(cv2.CAP_PROP_FRAME_HEIGHT)) + max_w = max(max_w, w) + max_h = max(max_h, h) + cap_test.release() + + elif s['type'] == 'images': + pattern = os.path.join(sources['path'], f"*{vid_img_extension}") + found = sorted(glob.glob(pattern), key=natural_sort_key) + if found: + # Just read the first image for dimension + im = cv2.imread(found[0]) + if im is not None: + h, w = im.shape[:2] + max_w = max(max_w, w) + max_h = max(max_h, h) + + # If none found, default to (640,480) + if max_w == 0 or max_h == 0: + max_w, max_h = 640, 480 + return max_w, max_h + + +def measure_webcam_fps(cam_index, warmup_frames=20, measure_frames=50): + cap = cv2.VideoCapture(cam_index, cv2.CAP_DSHOW) + if not cap.isOpened(): + return 30 + + for _ in range(warmup_frames): + ret, _ = cap.read() + if not ret: + cap.release() + return 30 + + import time + start = time.time() + count = 0 + for _ in range(measure_frames): + ret, _ = cap.read() + if not ret: + break + count += 1 + + end = time.time() + cap.release() + if count < 1: + return 30 + + fps_measured = count / (end - start) + return fps_measured + + +def find_lowest_fps(sources): + ''' + Auto-detect the largest FPS among all video/webcam sources. + If none found or invalid, default to 30. + ''' + + min_fps = 9999 + + for s in sources: + if s['type'] == 'video': + cap = cv2.VideoCapture(s['path']) + if cap.isOpened(): + fps = round(cap.get(cv2.CAP_PROP_FPS)) + if fps <= 0 or math.isnan(fps): + fps = 30 + min_fps = min(min_fps, fps) + cap.release() + + elif s['type'] == 'webcam': + fps = measure_webcam_fps(int(s['id'])) + min_fps = min(min_fps, fps) + + return int(math.ceil(min_fps)) if min_fps < 9999 else 30 + + +def check_stop_window_open(): + if cv2.getWindowProperty("StopWindow", cv2.WND_PROP_VISIBLE) < 1: + return True + key = cv2.waitKey(1) + if key == 27: + return True + return False + + +def stop_all_cameras_immediately(sources, command_queues): + for s in sources: + if s['type'] == 'webcam': + command_queues[s['id']].put(("STOP_CAPTURE", None)) diff --git a/setup.cfg b/setup.cfg index 4b556daf..176bc2f3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,6 +38,7 @@ install_requires = mpl_interactions # Pillow PyQt5 + psutil tqdm anytree pandas>=1.5