Skip to content

Commit

Permalink
Merge pull request #441 from openvinotoolkit/fix-bug-in-async-video-p…
Browse files Browse the repository at this point in the history
…rocessor

Fix bug in `AsyncVideoProcessor`
  • Loading branch information
ljcornel authored Jun 19, 2024
2 parents e189f6a + ab05c74 commit 3908abe
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 31 deletions.
52 changes: 28 additions & 24 deletions geti_sdk/demos/video_helpers/async_video_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def __init__(
"""
self.deployment = deployment
self.processing_function = processing_function

# Try to set min buffer size based on cpu count
if min_buffer_size is None and os.cpu_count() is not None:
min_buffer_size = 2 * os.cpu_count()
logging.debug(
Expand All @@ -74,6 +76,21 @@ def __init__(
"This may result in video frames not being processed in order. It"
"is recommended to increase the minimum buffer size."
)
# Make sure min_buffer_size is not larger than max_buffer_size
if min_buffer_size is not None and max_buffer_size is not None:
target_min_buffer_size = min_buffer_size
min_buffer_size = max(2, min(target_min_buffer_size, max_buffer_size - 1))
if min_buffer_size < target_min_buffer_size:
logging.info(
f"Minimum buffer size `{target_min_buffer_size}` was set larger "
f"than maximum size `{max_buffer_size}`. Limiting minimum buffer "
f"size to {min_buffer_size}."
)
# Ultimately, fall back to 2 as absolute minimum
if min_buffer_size is None or min_buffer_size < 2:
logging.info("Setting minimum buffer size to 2")
min_buffer_size = 2

self.buffer = OrderedResultBuffer(
maxsize=max_buffer_size, minsize=min_buffer_size
)
Expand Down Expand Up @@ -143,7 +160,9 @@ def process_items():
"""
while True:
if self._should_stop_now:
logging.debug("Stopping processing thread immediately")
logging.debug(
"AsyncVideoProcessor: Stopping processing thread immediately"
)
return

# Check if all frames have been inferred and put to the buffer already
Expand All @@ -153,30 +172,19 @@ def process_items():
self.buffer.total_items_buffered == num_frames - 1
)

# In case of the `should_stop` signal, check if the buffer has reached
# it's minimum size yet. If so, we can start to empty it
processed_up_to_min_buffer = False
if self._should_stop:
with (
self._current_index.get_lock(),
self._processed_count.get_lock(),
):
if (
self._current_index.value - self._processed_count.value
<= self._min_buffer_size
):
processed_up_to_min_buffer = True
empty_buffer = processed_up_to_min_buffer or all_frames_inferred

empty_buffer = self._should_stop or all_frames_inferred
# Get the item from the buffer
try:
item = self.buffer.get(empty_buffer=empty_buffer)
item = self.buffer.get(empty_buffer=empty_buffer, timeout=1e-6)
except Empty:
with (
self._processed_count.get_lock(),
self._current_index.get_lock(),
):
if self._processed_count.value == self._current_index.value:
if (
self._processed_count.value == self._current_index.value
and self._should_stop
):
# Buffer is empty, all items have been processed.
# Stop thread
return
Expand Down Expand Up @@ -231,14 +239,10 @@ def await_all(self):
if not self._is_running:
return
self._should_stop = True
logging.info("AsyncVideoProcessor: Awaiting processing of all frames in buffer")
self._worker.join()
self._is_running = False
self._should_stop = False
while True:
if self.buffer.is_empty and not self._worker.is_alive():
# Buffer should be empty and worker thread should have stopped
break
time.sleep(1e-9)
logging.debug(
logging.info(
"AsyncVideoProcessor: All frames processed. Processing thread stopped"
)
6 changes: 3 additions & 3 deletions geti_sdk/demos/video_helpers/ordered_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def put(
with self._put_count.get_lock():
self._put_count.value += 1

def get(self, timeout: int = 0, empty_buffer: bool = False) -> IndexedResult:
def get(self, timeout: float = 0, empty_buffer: bool = False) -> IndexedResult:
"""
Get the next item from the buffer
Expand All @@ -107,11 +107,10 @@ def get(self, timeout: int = 0, empty_buffer: bool = False) -> IndexedResult:
t_start = time.time()
item: Optional[IndexedResult] = None
if not empty_buffer:
while timeout == 0 or t_start - time.time() < timeout:
while timeout == 0 or time.time() - t_start < timeout:
if self._queue.qsize() > self._minsize:
try:
item = self._queue.get(block=False)
self._queue.task_done()
except Empty:
time.sleep(1e-9)
continue
Expand All @@ -121,6 +120,7 @@ def get(self, timeout: int = 0, empty_buffer: bool = False) -> IndexedResult:
item = self._queue.get(block=True, timeout=timeout + 1e-9)
if item is None:
raise Empty
self._queue.task_done()
with self._get_count.get_lock():
self._get_count.value += 1
return item
Expand Down
11 changes: 7 additions & 4 deletions notebooks/014_asynchronous_inference.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,9 @@
"source": [
"You should see clearly now that the frames are not processed in the order of their original index.\n",
"\n",
"Let's set up the `AsyncVideoProcessor` to do the same experiment, and have a look at the processing order again."
"Let's set up the `AsyncVideoProcessor` to do the same experiment, and have a look at the processing order again.\n",
"\n",
"The cell below initializes the AsyncVideoProcessor. "
]
},
{
Expand All @@ -475,11 +477,12 @@
"\n",
"# Initialize the processor\n",
"video_processor = AsyncVideoProcessor(\n",
" deployment=deployment, processing_function=inference_callback\n",
" deployment=deployment, # Deployment that is used for inference\n",
" processing_function=inference_callback, # Processing function to apply to each frame, once it is inferred\n",
")\n",
"\n",
"indices_async_vp: List[int] = []\n",
"video_processor.start()\n",
"video_processor.start() # Start the video_processor. This will create a worker thread that listens for video frames to process\n",
"tstart_async_vp = time.time()\n",
"for img_index, image_path in enumerate(coco_image_filepaths):\n",
" img = cv2.imread(image_path)\n",
Expand Down Expand Up @@ -547,7 +550,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "d38db933-ccfb-4731-97b1-7da4479e77e1",
"id": "68a904d0-6736-4deb-b69a-e7447965d225",
"metadata": {},
"outputs": [],
"source": []
Expand Down

0 comments on commit 3908abe

Please sign in to comment.