Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

690 #693

Merged
merged 2 commits into from
Oct 31, 2024
Merged

690 #693

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions src/ai_hawk/linkedIn_easy_applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,20 @@

import src.utils as utils
from src.logging import logger
from src.job import Job

def question_already_exists_in_data(question: str, data: List[dict]) -> bool:
"""
Check if a question already exists in the data list.

Args:
question: The question text to search for
data: List of question dictionaries to search through

Returns:
bool: True if question exists, False otherwise
"""
return any(item['question'] == question for item in data)

class AIHawkEasyApplier:
def __init__(self, driver: Any, resume_dir: Optional[str], set_old_answers: List[Tuple[str, str, str]],
Expand Down Expand Up @@ -79,7 +92,7 @@ def check_for_premium_redirect(self, job: Any, max_attempts=3):
raise Exception(
f"Redirected to AIHawk Premium page and failed to return after {max_attempts} attempts. Job application aborted.")

def apply_to_job(self, job: Any) -> None:
def apply_to_job(self, job: Job) -> None:
"""
Starts the process of applying to a job.
:param job: A job object with the job details.
Expand All @@ -93,7 +106,7 @@ def apply_to_job(self, job: Any) -> None:
logger.error(f"Failed to apply to job: {job.title}, error: {str(e)}")
raise e

def job_apply(self, job: Any):
def job_apply(self, job: Job):
logger.debug(f"Starting job application for job: {job}")

try:
Expand Down Expand Up @@ -844,10 +857,9 @@ def _save_questions_to_json(self, question_data: dict) -> None:
logger.error("JSON decoding failed")
data = []

# Use the new function to check for existing questions
question_exists = self._question_exists_in_data(question_data['question'], data)
should_be_saved: bool = not question_already_exists_in_data(question_data['question'], data) and not self.answer_contians_company_name(question_data['answer'])

if not question_exists:
if should_be_saved:
logger.debug("New question found, appending to JSON")
data.append(question_data)
f.seek(0)
Expand All @@ -866,19 +878,6 @@ def _save_questions_to_json(self, question_data: dict) -> None:
logger.error(f"Error saving questions data to JSON file: {tb_str}")
raise Exception(f"Error saving questions data to JSON file: \nTraceback:\n{tb_str}")

def _question_exists_in_data(self, question: str, data: List[dict]) -> bool:
"""
Check if a question already exists in the data list.

Args:
question: The question text to search for
data: List of question dictionaries to search through

Returns:
bool: True if question exists, False otherwise
"""
return any(item['question'] == question for item in data)

def _sanitize_text(self, text: str) -> str:
sanitized_text = text.lower().strip().replace('"', '').replace('\\', '')
sanitized_text = re.sub(r'[\x00-\x1F\x7F]', '', sanitized_text).replace('\n', ' ').replace('\r', '').rstrip(',')
Expand All @@ -891,5 +890,6 @@ def _find_existing_answer(self, question_text):
return item
return None


def answer_contians_company_name(self,answer:Any)->bool:
return isinstance(answer,str) and not self.current_job.company is None and self.current_job.company in answer

Loading