Skip to content

Commit

Permalink
Add more docs to search module
Browse files Browse the repository at this point in the history
  • Loading branch information
PabloLec committed Feb 17, 2024
1 parent 0fc95b6 commit 4eef76b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 16 deletions.
4 changes: 2 additions & 2 deletions recoverpy/lib/search/progress_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def monitor_search_progress(grep_pid: int, progress: SearchProgress) -> None:
while True:
output: str = get_progress_output(grep_pid)
output: str = _get_progress_output(grep_pid)

if not output:
progress.progress_percent = 100.0
Expand All @@ -22,7 +22,7 @@ def monitor_search_progress(grep_pid: int, progress: SearchProgress) -> None:
sleep(0.5)


def get_progress_output(grep_pid: int) -> str:
def _get_progress_output(grep_pid: int) -> str:
return check_output(["progress", "-p", str(grep_pid)], stderr=DEVNULL).decode(
"utf8"
)
35 changes: 25 additions & 10 deletions recoverpy/lib/search/search_engine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
"""
SearchEngine initiates the search process upon request from the UI.
A Grep process is started with the requested parameters along with subthreads acting as workers.
The first thread consumes the standard output from Grep and simply enqueues each line into a first queue.
A second thread consumes this queue, filters and formats the raw strings into objects ready to be integrated
into the UI and finally stores these objects in an asynchronous queue to be consumed by the UI.
"""

from __future__ import annotations

from asyncio import AbstractEventLoop
Expand Down Expand Up @@ -36,15 +44,15 @@ class SearchEngine:

def __init__(self, partition: str, searched_string: str):
self._initialize_search_components(partition, searched_string)
self.results_queue: Queue[bytes] = Queue()
self.list_items_queue: AsyncQueue[GrepResult] = AsyncQueue()
self.raw_grep_results_queue: Queue[bytes] = Queue()
self.formatted_results_queue: AsyncQueue[GrepResult] = AsyncQueue()

def _initialize_search_components(
self, partition: str, searched_string: str
) -> None:
self.search_params = SearchParams(partition, searched_string)
self.search_progress = SearchProgress()
self.result_processor = ResultFilter(self.search_params)
self.result_filter = ResultFilter(self.search_params)

async def start_search(self) -> None:
self._start_grep_process()
Expand All @@ -61,28 +69,32 @@ def _start_grep_process(self) -> None:

def _start_auxiliary_threads(self) -> None:
start_grep_stdout_consumer_thread(
_consume_grep_stdout, self._grep_process, self.results_queue
_consume_grep_stdout, self._grep_process, self.raw_grep_results_queue
)
start_result_formatter_thread(self._format_results)
start_progress_monitoring_thread(self._grep_process, self.search_progress)

def _format_results(self) -> None:
"""Initiate formatting and filtering of raw grep results.
A new event loop is created to avoid blocking the main App loop."""
loop = new_event_loop()
while True:
results = self.result_processor.filter_results(self.results_queue)
results = self.result_filter.filter_results(self.raw_grep_results_queue)
self._process_new_results(results, loop)
log.debug(f"search_engine - Dequeued {len(results)} results")
sleep(0.1)

def _process_new_results(self, results: List[str], loop: AbstractEventLoop) -> None:
"""Consumes filtered grep results, convert them into GrepResult objects
and enqueues them into the formatted results queue for UI."""
for result in results:
grep_result = self._create_grep_result(
result, self.search_progress.result_count
)

if grep_result.inode not in self._seen_inodes:
self._seen_inodes.add(grep_result.inode)
loop.run_until_complete(self.list_items_queue.put(grep_result))
loop.run_until_complete(self.formatted_results_queue.put(grep_result))
self.search_progress.result_count += 1

def _create_grep_result(self, result: str, result_index: int) -> GrepResult:
Expand All @@ -94,17 +106,15 @@ def _create_grep_result(self, result: str, result_index: int) -> GrepResult:
def _configure_grep_result(
self, grep_result: GrepResult, result_index: int
) -> None:
"""Fix inode number and line start then sets the CSS class for the UI."""
grep_result.inode = self._fix_inode(grep_result.inode)
grep_result.line = self._fix_line_start(grep_result.line)
grep_result.css_class = (
"grep-result-odd" if result_index % 2 == 0 else "grep-result-even"
)

def _fix_line_start(self, line: str) -> str:
result_index: int = line.find(self.search_params.searched_lines[0])
return line[min(result_index, 15) :]

def _fix_inode(self, inode: int) -> int:
"""Shift inode to the first block containing the searched string."""
inode //= self.search_params.block_size

for _ in range(10):
Expand All @@ -115,3 +125,8 @@ def _fix_inode(self, inode: int) -> int:
return inode
inode += 1
return inode

def _fix_line_start(self, line: str) -> str:
"""Remove unnecessary characters from the start of the line to improve readability."""
result_index: int = line.find(self.search_params.searched_lines[0])
return line[min(result_index, 15) :]
4 changes: 3 additions & 1 deletion recoverpy/ui/screens/screen_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ async def _wait_for_grep_list_focus(self) -> None:
async def _start_search_engine(self) -> None:
await self.search_engine.start_search()
ensure_future(
self._grep_result_list.start_consumer(self.search_engine.list_items_queue)
self._grep_result_list.start_consumer(
self.search_engine.formatted_results_queue
)
)
ensure_future(self._update_progress_labels())

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_search_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_search_progress_after_search(search_engine):


def test_list_items_queue_size(search_engine):
assert search_engine.list_items_queue.qsize() == GREP_RESULT_COUNT
assert search_engine.formatted_results_queue.qsize() == GREP_RESULT_COUNT


def test_list_items_queue_content(search_engine):
Expand All @@ -40,8 +40,8 @@ def test_list_items_queue_content(search_engine):
]

results = []
while not search_engine.list_items_queue.empty():
results.append(search_engine.list_items_queue.get_nowait())
while not search_engine.formatted_results_queue.empty():
results.append(search_engine.formatted_results_queue.get_nowait())

for i, result in enumerate(results):
assert expected_list_items[i][0] == result.inode
Expand Down

0 comments on commit 4eef76b

Please sign in to comment.