diff --git a/bazos/__init__.py b/bazos/__init__.py index e39dbab..309d049 100644 --- a/bazos/__init__.py +++ b/bazos/__init__.py @@ -1,8 +1,9 @@ import argparse +from pathlib import Path import sys from typing import Dict, Any -from bazos.main import bazos_main as bz, BazosScrapper +from bazos.main import BazosScrapper, BazosUser __version__ = "0.1.0" __apiversion__ = "0.1.0" @@ -12,10 +13,11 @@ def parse_cli_argument() -> Dict[str, Any]: parser = argparse.ArgumentParser() + # true/false parser.add_argument('--login', action='store_true', help='Login to bazos') - parser.add_argument('-b', '--bazos', + parser.add_argument('--bazos', action='store_true', help='Use bazos') parser.add_argument('--add-only', @@ -24,15 +26,34 @@ def parse_cli_argument() -> Dict[str, Any]: parser.add_argument('--print-rubrics', action='store_true', help='Print rubrics') + parser.add_argument("--verbose", + action='store_true', + default=True, + help='Verbose') + parser.add_argument("--delete-all", + action='store_true', + default=True, + help='Verbose') + parser.add_argument("--create-all", + action='store_true', + default=True, + help='Verbose') + # ? + parser.add_argument('--items-path', + type=Path, + required=True, + nargs='?', + help='Path to products directory') + parser.add_argument('--credentials-path', + type=Path, + required=True, + nargs='?', + help='Path to products directory') + # + parser.add_argument('--country', nargs="+", help="What bazos country to use", default=['cz', 'sk']) - parser.add_argument('-p', '--path', - help='Path to products directory') - parser.add_argument("--update-credentials", - action='store_true', - help='Update credentials') cli_args = vars(parser.parse_args()) return cli_args @@ -40,17 +61,50 @@ def parse_cli_argument() -> Dict[str, Any]: def main(): cli_args = parse_cli_argument() + if cli_args['verbose']: + # print(f"==> CLI args: {cli_args}") + print(' '.join(sys.argv)) + if cli_args['login']: - bazos_scrapper = BazosScrapper(country=cli_args['country'][0], cli_args=cli_args) - # bazos_scrapper.check_user_files_available() - bazos_scrapper.save_authentication() - bazos_scrapper.load_page_with_cookies() - bazos_scrapper.check_authentication() - return + bazos_user = BazosUser( + country='cz', items_path=cli_args['items_path'], + credentials_path=cli_args['credentials_path'] + ) + bazos_user.authenticate() + bazos_user.save_user_credentials() if cli_args['bazos']: - bz(cli_args=cli_args) + for country in cli_args['country']: + if cli_args['verbose']: + print(f"==> Processing country: {country}") + + # Check if user credentials exists + user = BazosUser( + country='cz', items_path=cli_args['items_path'], + credentials_path=cli_args['credentials_path'] + ) + try: + user.exists_user_credentials(raise_exception=True) + except FileNotFoundError as e: + print(e) + sys.exit(1) + + if cli_args['print_rubrics']: + bazos_scrapper = BazosScrapper(country=country, cli_args=cli_args, user=user) + # bazos_scrapper.check_user_files_available() + bazos_scrapper.load_page_with_cookies() + # bazos_scrapper.check_authentication() + bazos_scrapper.print_all_rubrics_and_categories() + return + + bazos_scrapper = BazosScrapper(country=country, cli_args=cli_args, user=user) + bazos_scrapper.load_page_with_cookies() + # Restore advertisements + if cli_args['delete_all']: + bazos_scrapper.delete_advertisements() + if cli_args['create_all']: + bazos_scrapper.create_advertisements() sys.exit() diff --git a/bazos/core/settings.py b/bazos/core/settings.py index d1e2a33..3ff813e 100644 --- a/bazos/core/settings.py +++ b/bazos/core/settings.py @@ -1,5 +1,4 @@ import os -from os import path from pathlib import Path ROOT_DIR = Path(os.getcwd()) @@ -11,8 +10,8 @@ os.makedirs(TOKENS_DIR) -COOKIES_FILE = path.join(TOKENS_DIR, 'cookies') -LOCAL_STORAGE_FILE = path.join(TOKENS_DIR, 'local_storage') +COOKIES_FILE = 'cookies' +LOCAL_STORAGE_FILE = 'local_storage' if __name__ == '__main__': print('run') diff --git a/bazos/main.py b/bazos/main.py index 3103d66..981ef49 100644 --- a/bazos/main.py +++ b/bazos/main.py @@ -1,5 +1,6 @@ from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.options import Options +from pathlib import Path import os import pickle # nosec import sys @@ -14,7 +15,6 @@ from bazos.core import settings from bazos.info.product import Product, get_all_products -from bazos.info.user import User from bazos.shared.utils import wait_random_time from bazos.info.rubric_category import get_rubric, get_category from bazos.shared.utils import parse_yaml @@ -50,11 +50,10 @@ class XPathsBazos: class BazosDriver: - def __init__(self, country: str, cli_args: dict): + def __init__(self, country: str): self.country = country - self.cli_args = cli_args - self.url_bazos = f"https://bazos.{country}" - self.url_moje_inzeraty = path.join(self.url_bazos, 'moje-inzeraty.php') + self.bazos_base_url = f"https://bazos.{country}" + self.bazos_moje_inzeraty_url = path.join(self.bazos_base_url, 'moje-inzeraty.php') self.driver = self.get_driver() def set_chrome_options(self) -> Options: @@ -91,81 +90,83 @@ def __del__(self): self.driver.close() -class BazosUser: - def __init__(self, country: str, products_path: str): - self.country = country - if len(sys.argv) > 2 and path.isdir(sys.argv[1]): - products_path = sys.argv[1] - - user_info = parse_yaml(filename=path.join( - products_path, f"user_{country}.yml")) - - self.name = user_info['name'] - self.phone_number = user_info['phone_number'] - self.email = user_info['email'] - self.password = user_info['password'] - self.psc = user_info['psc'] - self.products_path = products_path - - def check_user_files_available(self) -> None: - if (not os.path.isfile(f"{settings.COOKIES_FILE}_{self.bazos_country}.pkl") # nosec - or not os.path.isfile(f"{settings.LOCAL_STORAGE_FILE}_{self.bazos_country}.pkl")): # nosec - print("==> User files not found, please login - login flag: --login") - # self.save_authentication(user=self.user) - - def check_authentication(self) -> None: - user_input = self.driver.find_elements( - By.XPATH, XPathsBazos.user_inputs) - # tel_input = self.driver.find_element(By.XPATH, XPathsBazos.auth_phone_input) +class BazosUser(BazosDriver): + def __init__(self, country: str, items_path: Path, credentials_path: Path): + super().__init__(country) + self.items_path = items_path + self.credentaials_path = credentials_path + self.required_keys = ['name', 'phone_number', 'email', 'password', 'psc'] + try: + for k, v in parse_yaml(filename=path.join(items_path, f"user_{country}.yml")).items(): + setattr(self, k, v) + + # Check that all keys are present + if not all([hasattr(self, k) for k in self.required_keys]): + raise KeyError( + 'Some of keys are missing, check `user_{country}.yml` file, ' + 'if all keys are present (name, phone_number, email, password, psc)' + ) + except KeyError as e: + print(f"KeyError: {e}") + print(f"Please provide correct `user_{country}.yml` file in `{items_path}`") + sys.exit(1) + except FileNotFoundError: + print(f"FileNotFoundError: Please provide `user_{country}.yml` file in `{items_path}`") + sys.exit(1) + + def exists_user_credentials(self, raise_exception: bool = False) -> None: + cookies_file = self.credentaials_path / f"{settings.COOKIES_FILE}_{self.country}.pkl" + local_storage_file = self.credentaials_path / f"{settings.LOCAL_STORAGE_FILE}_{self.country}.pkl" + if not os.path.isfile(cookies_file or not os.path.isfile(local_storage_file)): # nosec + if raise_exception: + raise FileNotFoundError("User files not found, please login - login flag: --login") + + def is_authentication(self) -> bool: + user_input = self.driver.find_elements(By.XPATH, XPathsBazos.user_inputs) if len(user_input) != 3: # Mean is Authenticated, because there is not "Overit" button - self.save_authentication(user=self.user) + return False - def save_authentication(self) -> None: - self.driver.get(self.url_moje_inzeraty) + def authenticate(self) -> None: + self.driver.get(self.bazos_moje_inzeraty_url) # Prepare authentication - telefon_input = self.driver.find_element( - By.XPATH, XPathsBazos.auth_phone_input) + telefon_input = self.driver.find_element(By.XPATH, XPathsBazos.auth_phone_input) telefon_input.clear() - telefon_input.send_keys(self.user.phone_number) - self.driver.find_element( - By.XPATH, XPathsBazos.auth_phone_check_submit).click() # Submit + phone_number = getattr(self, 'phone_number') + telefon_input.send_keys(phone_number) + self.driver.find_element(By.XPATH, XPathsBazos.auth_phone_check_submit).click() # Submit # Authenticate - code_input = self.driver.find_element( - By.XPATH, XPathsBazos.auth_code_input) + code_input = self.driver.find_element(By.XPATH, XPathsBazos.auth_code_input) code_input.clear() - code_input.send_keys( - input('Please provide authentification code sended to your phone: ')) - self.driver.find_element( - By.XPATH, XPathsBazos.auth_code_submit).click() # Submit + code_input.send_keys(input('Please provide authentification code sended to your phone: ')) + self.driver.find_element(By.XPATH, XPathsBazos.auth_code_submit).click() # Submit - # Save cookies - pickle.dump(self.driver.get_cookies(), - file=open(f"{settings.COOKIES_FILE}_{self.bazos_country}.pkl", "wb")) # nosec - # Save Local Storage - pickle.dump(self.driver.execute_script("return window.localStorage;"), - file=open(f"{settings.LOCAL_STORAGE_FILE}_{self.bazos_country}.pkl", "wb")) # nosec + def save_user_credentials(self) -> None: + cookies_file = self.credentaials_path / f"{settings.COOKIES_FILE}_{self.country}.pkl" + local_storage_file = self.credentaials_path / f"{settings.LOCAL_STORAGE_FILE}_{self.country}.pkl" + cookies = self.driver.get_cookies() + local_storage = self.driver.execute_script("return window.localStorage;") + pickle.dump(cookies, file=open(cookies_file.__str__(), "wb")) # nosec + pickle.dump(local_storage, file=open(local_storage_file, "wb")) # nosec class BazosScrapper(BazosDriver): - def __init__(self, country: str, cli_args: dict): - super().__init__(country, cli_args) + def __init__(self, country: str, cli_args: dict, user: BazosUser): + super().__init__(country) + self.cli_args = cli_args options = self.set_chrome_options() webdriver_manager = ChromeDriverManager().install() service = Service(executable_path=webdriver_manager) self.driver = webdriver.Chrome(service=service, options=options) # URLs - self.user = User(country=country, products_path=cli_args['path']) - self.bazos_country = country + self.user = user + self.country = country self.advertisements: int self.url_bazos = f"https://bazos.{country}" self.url_moje_inzeraty = path.join(self.url_bazos, 'moje-inzeraty.php') - # def __del__(self): - # self.driver.close() - def print_all_rubrics_and_categories(self): self.driver.find_element( By.CLASS_NAME, 'pridati').click() # go to add page @@ -195,99 +196,98 @@ def print_all_rubrics_and_categories(self): def load_page_with_cookies(self) -> None: self.driver.get(self.url_moje_inzeraty) - for cookie_dict in pickle.load(open(f"{settings.COOKIES_FILE}_{self.bazos_country}.pkl", 'rb')): # nosec + cookies_file = self.user.credentaials_path / f"{settings.COOKIES_FILE}_{self.country}.pkl" + for cookie_dict in pickle.load(open(cookies_file, 'rb')): # nosec self.driver.add_cookie(cookie_dict) self.driver.get(self.url_moje_inzeraty) - def remove_advertisment(self, user: User): + def remove_advertisment(self): self.driver.find_element(By.CLASS_NAME, 'inzeratydetdel').find_element( By.TAG_NAME, 'a').click() pwd_input = self.driver.find_element( By.XPATH, XPathsBazos.delete_pwd_input) pwd_input.clear() - pwd_input.send_keys(user.password) - self.driver.find_element( - By.XPATH, XPathsBazos.delete_submit).click() # Submit-Delete + pwd_input.send_keys(getattr(self.user, 'password')) + self.driver.find_element(By.XPATH, XPathsBazos.delete_submit).click() # Submit-Delete - def remove_advertisements(self, user: User): - self.advertisements = len( - self.driver.find_elements(By.CLASS_NAME, 'nadpis')) + def delete_advertisements(self): + self.advertisements = len(self.driver.find_elements(By.CLASS_NAME, 'nadpis')) - print("==> Removing old advertisements") + if self.cli_args['verbose']: + print("==> Removing old advertisements") for i in range(self.advertisements): element = self.driver.find_element(By.CLASS_NAME, 'nadpis') - print(f"Removing[{i}/{self.advertisements}]: {element.text}") + if self.cli_args['verbose']: + print(f"Removing[{i}/{self.advertisements}]: {element.text}") # wait_random_time() element.find_element(By.TAG_NAME, 'a').click() wait_random_time() - self.remove_advertisment(user=user) + self.remove_advertisment() wait_random_time() - def add_advertisement(self, product: Product, user: User): + def add_advertisement(self, product: Product): # Rubrik select_rubrik = Select(self.driver.find_element( By.XPATH, XPathsBazos.product_rubric)) select_rubrik.select_by_visible_text( - get_rubric(self.bazos_country, product.rubric)) + get_rubric(self.country, product.rubric)) # Product select_category = Select(self.driver.find_element( By.XPATH, XPathsBazos.product_category)) select_category.select_by_visible_text(get_category( - self.bazos_country, product.rubric, product.category)) + self.country, product.rubric, product.category)) wait_random_time() self.driver.find_element(By.ID, 'nadpis').send_keys(product.title) self.driver.find_element(By.ID, 'popis').send_keys(product.description) self.driver.find_element(By.ID, 'cena').send_keys( - product.get_location_price(self.bazos_country)) + product.get_location_price(self.country)) wait_random_time() self.driver.find_element(By.ID, 'lokalita').clear() - self.driver.find_element(By.ID, 'lokalita').send_keys(user.psc) + self.driver.find_element(By.ID, 'lokalita').send_keys(getattr(self.user, 'psc')) self.driver.find_element(By.ID, 'jmeno').clear() - self.driver.find_element(By.ID, 'jmeno').send_keys(user.name) + self.driver.find_element(By.ID, 'jmeno').send_keys(getattr(self.user, 'name')) self.driver.find_element(By.ID, 'telefoni').clear() self.driver.find_element( - By.ID, 'telefoni').send_keys(user.phone_number) + By.ID, 'telefoni').send_keys(getattr(self.user, 'phone_number')) self.driver.find_element(By.ID, 'maili').clear() - self.driver.find_element(By.ID, 'maili').send_keys(user.email) + self.driver.find_element(By.ID, 'maili').send_keys(getattr(self.user, 'email')) self.driver.find_element(By.ID, 'heslobazar').clear() - self.driver.find_element(By.ID, 'heslobazar').send_keys(user.password) + self.driver.find_element(By.ID, 'heslobazar').send_keys(getattr(self.user, 'password')) wait_random_time() self.driver.find_element(By.CLASS_NAME, 'ovse').click() - self.driver.find_element(By.XPATH, XPathsBazos.product_img_input).send_keys( - '\n'.join(product.images)) + self.driver.find_element(By.XPATH, XPathsBazos.product_img_input).send_keys('\n'.join(product.images)) wait_random_time() self.driver.find_element(By.XPATH, XPathsBazos.product_submit).click() - def add_advertisements(self, user: User) -> None: - products = get_all_products( - products_path=user.products_path, country=self.bazos_country) + def create_advertisements(self) -> None: + products = get_all_products(products_path=getattr(self, "items_path"), country=self.country) self.advertisements = len(products) - print("==> Adding advertisements") + if self.cli_args['verbose']: + print("==> Adding advertisements") for idx, product in enumerate(products): if self.product_already_advertised(product): - print( - f"Skipping[{idx}/{self.advertisements}]: {product.product_path}") + if self.cli_args['verbose']: + print(f"Skipping[{idx}/{self.advertisements}]: {product.product_path}") continue - print( - f"Adding[{idx}/{self.advertisements}]: {product.product_path}") + if self.cli_args['verbose']: + print(f"Adding[{idx}/{self.advertisements}]: {product.product_path}") # product not advertised ADD them wait_random_time() - self.driver.find_element( - By.CLASS_NAME, 'pridati').click() # go to add page + self.driver.find_element(By.CLASS_NAME, 'pridati').click() # go to add page wait_random_time() self.driver.find_elements(By.CLASS_NAME, 'iconstblcell')[0].click() - self.add_advertisement(product=product, user=user) + self.add_advertisement(product=product) def product_already_advertised(self, product: Product) -> bool: self.load_page_with_cookies() @@ -298,29 +298,3 @@ def product_already_advertised(self, product: Product) -> bool: def __exit__(self, exc_type, exc_val, exc_tb): self.driver.quit() - - -def bazos_main(cli_args: vars) -> None: - for country in cli_args['country']: - print(f"==> Processing country: {country}") - if cli_args['print_rubrics']: - bazos_scrapper = BazosScrapper(country=country, cli_args=cli_args) - bazos_scrapper.check_user_files_available() - bazos_scrapper.load_page_with_cookies() - bazos_scrapper.check_authentication() - bazos_scrapper.print_all_rubrics_and_categories() - return - - bazos_scrapper = BazosScrapper(country=country, cli_args=cli_args) - bazos_scrapper.check_user_files_available() - bazos_scrapper.load_page_with_cookies() - bazos_scrapper.check_authentication() - - # Restore advertisements - if '--add-only' not in sys.argv: - bazos_scrapper.remove_advertisements(user=bazos_scrapper.user) - bazos_scrapper.add_advertisements(user=bazos_scrapper.user) - - -if __name__ == '__main__': - bazos_main({})