Skip to content

Commit

Permalink
Simplify search module
Browse files Browse the repository at this point in the history
  • Loading branch information
PabloLec committed Feb 17, 2024
1 parent 4eef76b commit 4b593e7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 84 deletions.
55 changes: 0 additions & 55 deletions recoverpy/lib/search/result_filter.py

This file was deleted.

80 changes: 57 additions & 23 deletions recoverpy/lib/search/search_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
from io import BufferedReader
from queue import Queue
from subprocess import Popen
from time import sleep
from typing import List

from recoverpy.lib.helper import get_dd_output, decode_result
from recoverpy.lib.search.result_filter import ResultFilter
from recoverpy.lib.helper import get_dd_output, decode_result, get_inode
from recoverpy.lib.search.thread_factory import (
start_grep_process,
start_progress_monitoring_thread,
start_result_formatter_thread,
start_result_converter_thread,
start_grep_stdout_consumer_thread,
)
from recoverpy.log.logger import log
Expand All @@ -43,16 +41,10 @@ class SearchEngine:
_seen_inodes: set[int] = set()

def __init__(self, partition: str, searched_string: str):
self._initialize_search_components(partition, searched_string)
self.raw_grep_results_queue: Queue[bytes] = Queue()
self.formatted_results_queue: AsyncQueue[GrepResult] = AsyncQueue()

def _initialize_search_components(
self, partition: str, searched_string: str
) -> None:
self.search_params = SearchParams(partition, searched_string)
self.search_progress = SearchProgress()
self.result_filter = ResultFilter(self.search_params)
self.raw_grep_results_queue: Queue[bytes] = Queue()
self.formatted_results_queue: AsyncQueue[GrepResult] = AsyncQueue()

async def start_search(self) -> None:
self._start_grep_process()
Expand All @@ -71,33 +63,75 @@ def _start_auxiliary_threads(self) -> None:
start_grep_stdout_consumer_thread(
_consume_grep_stdout, self._grep_process, self.raw_grep_results_queue
)
start_result_formatter_thread(self._format_results)
start_result_converter_thread(self._convert_results)
start_progress_monitoring_thread(self._grep_process, self.search_progress)

def _format_results(self) -> None:
def _convert_results(self) -> None:
"""Initiate formatting and filtering of raw grep results.
A new event loop is created to avoid blocking the main App loop."""
A new event loop is created to interact with the async queue."""
loop = new_event_loop()
while True:
results = self.result_filter.filter_results(self.raw_grep_results_queue)
results = self._decode_new_results(self.raw_grep_results_queue)
self._process_new_results(results, loop)
log.debug(f"search_engine - Dequeued {len(results)} results")
sleep(0.1)

def _decode_new_results(self, queue_object: Queue[bytes]) -> List[str]:
"""Consume raw grep results, filter out false positives if multiline and return decoded results."""
queue_list: List[bytes] = list(queue_object.queue)
queue_size = len(queue_list)
queue_object.queue.clear()
if queue_size == 0:
return []

decoded_results: List[str] = [decode_result(r) for r in queue_list]

if self.search_params.is_multi_line:
return self._filter_multiline_results(decoded_results)
return decoded_results

def _filter_multiline_results(self, results: List[str]) -> List[str]:
"""Filter out false positives from multiline results."""
return [result for result in results if self._is_result_valid(result)]

def _is_result_valid(self, result: str) -> bool:
"""Check if result contains all searched lines."""
inode = int(get_inode(result))
block_factor = self.search_params.block_size * 8

both_block_output = self._get_combined_block_output(inode, block_factor)
return all(
line in both_block_output for line in self.search_params.searched_lines
)

def _get_combined_block_output(self, inode: int, block_factor: int) -> str:
"""Get combined output of current and next block."""
block_index = inode // block_factor
block_output = get_dd_output(
self.search_params.partition, block_factor, block_index
)
next_block_output = get_dd_output(
self.search_params.partition, block_factor, block_index + 1
)

return decode_result(block_output) + decode_result(next_block_output)

def _process_new_results(self, results: List[str], loop: AbstractEventLoop) -> None:
"""Consumes filtered grep results, convert them into GrepResult objects
"""Consumes grep result strings, convert them into GrepResult objects
and enqueues them into the formatted results queue for UI."""
for result in results:
grep_result = self._create_grep_result(
result, self.search_progress.result_count
)

if grep_result.inode not in self._seen_inodes:
self._seen_inodes.add(grep_result.inode)
loop.run_until_complete(self.formatted_results_queue.put(grep_result))
self.search_progress.result_count += 1
if grep_result.inode in self._seen_inodes:
continue

self._seen_inodes.add(grep_result.inode)
loop.run_until_complete(self.formatted_results_queue.put(grep_result))
self.search_progress.result_count += 1

def _create_grep_result(self, result: str, result_index: int) -> GrepResult:
"""Create a GrepResult object from a raw grep result string."""
grep_result = GrepResult(result)
self._configure_grep_result(grep_result, result_index)
log.debug(f"search_engine - Created grep result {grep_result.inode}")
Expand Down Expand Up @@ -129,4 +163,4 @@ def _fix_inode(self, inode: int) -> int:
def _fix_line_start(self, line: str) -> str:
"""Remove unnecessary characters from the start of the line to improve readability."""
result_index: int = line.find(self.search_params.searched_lines[0])
return line[min(result_index, 15) :]
return line[min(result_index, 15) :]
4 changes: 2 additions & 2 deletions recoverpy/lib/search/thread_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def start_grep_stdout_consumer_thread(
).start()


def start_result_formatter_thread(format_function: Callable[[], None]) -> None:
log.debug("thread_factory - Starting result formatter thread")
def start_result_converter_thread(format_function: Callable[[], None]) -> None:
log.debug("thread_factory - Starting result converter thread")
Thread(
target=format_function,
daemon=True,
Expand Down
4 changes: 0 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ def system_calls_mock(session_mocker):
"recoverpy.lib.search.search_engine.start_grep_process",
new=mock_grep_process.mock_start_grep_process,
)
session_mocker.patch(
"recoverpy.lib.search.result_filter.get_dd_output",
side_effect=mock_dd_output.mock_dd_string_output,
)
session_mocker.patch(
"recoverpy.lib.search.search_engine.get_dd_output",
side_effect=mock_dd_output.mock_dd_string_output,
Expand Down

0 comments on commit 4b593e7

Please sign in to comment.