diff --git a/.env.example b/.env.example index 96dd446..55d6a09 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,4 @@ # required environment variables MONGO_CONNECTION_URI=mongodb://localhost:27017 -OPEN_DATA_SERICE_KEY=some_key +OPEN_DATA_SERVICE_KEY=some_key MONGO_DATABASE=council \ No newline at end of file diff --git a/API/candidate.py b/API/candidate.py index 804ec27..c00afda 100644 --- a/API/candidate.py +++ b/API/candidate.py @@ -68,28 +68,36 @@ def fetch_all_data( parser.add_argument( "--drop-columns", type=str, - default="num,huboid,hanjaName,status", + default="num,huboid,hanjaName,status,gihoSangse", help="제거할 열 이름을 ','로 구분하여 입력하세요", ) parser.add_argument( - "--save-method", - type=str, - # TODO: Add MongoDB support - # choices=["excel", "mongo"], - choices=["excel"], - default="excel", - help="데이터 저장 방식: 'excel' (현재는 excel만 지원)", + "-m", "--update-mongo", help="API 요청 결과를 MongoDB에 업데이트", action="store_true" + ) + parser.add_argument( + "-o", "--output-store", help="API 요청 결과를 로컬에 저장", action="store_true" + ) + parser.add_argument("--output-path", help="API 요청 결과 저장 경로", default="output") + + args = vars(parser.parse_args()) + print(args) + sgIds = args.get("sgIds").split(",") + if args.get("drop_columns"): + drop_columns = args.get("drop_columns").split(",") + else: + drop_columns = [] + print(drop_columns) + + data_list = fetch_all_data( + sgIds, args.get("sgTypecodes"), drop_columns=drop_columns ) - args = parser.parse_args() - sgIds = args.sgIds.split(",") - drop_columns = args.drop_columns.split(",") if args.drop_columns else [] - - data_list = fetch_all_data(sgIds, args.sgTypecodes, drop_columns=drop_columns) - for sgTypecode in args.sgTypecodes.split(","): + for sgTypecode in args.get("sgTypecodes").split(","): if sgTypecode not in SG_TYPECODE: raise ValueError(f"Invalid sgTypecode: {sgTypecode}") - if args.save_method == "excel": - save_to_excel(data_list, sgTypecode, is_elected=False) - elif args.save_method == "mongo": + + if args.get("update_mongo"): save_to_mongo(data_list, sgTypecode, CANDIDATE_TYPECODE_TYPE[sgTypecode]) + + if args.get("output_store"): + save_to_excel(data_list, sgTypecode, is_elected=False) diff --git a/API/elected.py b/API/elected.py index db5f502..5e13669 100644 --- a/API/elected.py +++ b/API/elected.py @@ -73,22 +73,30 @@ def fetch_all_data( help="제거할 열 이름을 ','로 구분하여 입력하세요", ) parser.add_argument( - "--save-method", - type=str, - choices=["excel", "mongo"], - default="excel", - help="데이터 저장 방식: 'excel', 'mongo'", + "-m", "--update-mongo", help="API 요청 결과를 MongoDB에 업데이트", action="store_true" + ) + parser.add_argument( + "-o", "--output-store", help="API 요청 결과를 로컬에 저장", action="store_true" ) + parser.add_argument("--output-path", help="API 요청 결과 저장 경로", default="output") - args = parser.parse_args() - sgIds = args.sgIds.split(",") - drop_columns = args.drop_columns.split(",") if args.drop_columns else [] + args = vars(parser.parse_args()) + sgIds = args.get("sgIds").split(",") + if args.get("drop_columns"): + drop_columns = args.get("drop_columns").split(",") + else: + drop_columns = [] + + data_list = fetch_all_data( + sgIds, args.get("sgTypecodes"), drop_columns=drop_columns + ) - data_list = fetch_all_data(sgIds, args.sgTypecodes, drop_columns=drop_columns) - for sgTypecode in args.sgTypecodes.split(","): + for sgTypecode in args.get("sgTypecodes").split(","): if sgTypecode not in SG_TYPECODE: raise ValueError(f"Invalid sgTypecode: {sgTypecode}") - if args.save_method == "excel": - save_to_excel(data_list, sgTypecode, is_elected=True) - elif args.save_method == "mongo": + + if args.get("update_mongo"): save_to_mongo(data_list, sgTypecode, ELECTED_TYPECODE_TYPE[sgTypecode]) + + if args.get("output_store"): + save_to_excel(data_list, sgTypecode, is_elected=True) diff --git a/API/utils.py b/API/utils.py index b8b1dc8..f16738a 100644 --- a/API/utils.py +++ b/API/utils.py @@ -72,7 +72,7 @@ def save_to_mongo(data: List[dict], sgTypecode: str, where: str) -> None: upsert=True, ) else: - raise NotImplementedError("현재 구시군의회의원(6) 및 기초의원비례대표(9)만 구현되어 있습니다.") + raise NotImplementedError(f"아직 구현되지 않은 sgTypecode: {sgTypecode}") print(f"데이터를 성공적으로 MongoDB '{main_collection.name}' 컬렉션에 저장하였습니다.") diff --git a/configurations/secrets.py b/configurations/secrets.py index cd4ef62..63b0c09 100644 --- a/configurations/secrets.py +++ b/configurations/secrets.py @@ -26,7 +26,7 @@ class OpenDataPortalSecrets: 공공데이터포털(data.go.kr) API 호출에 필요한 서비스 키를 정의합니다. """ - service_key = str(os.getenv("OPEN_DATA_SERICE_KEY") or "") + service_key = str(os.getenv("OPEN_DATA_SERVICE_KEY") or "") class EmailSecrets: diff --git a/scrap/utils/export.py b/scrap/utils/data_io.py similarity index 69% rename from scrap/utils/export.py rename to scrap/utils/data_io.py index 3c1795b..44f928b 100644 --- a/scrap/utils/export.py +++ b/scrap/utils/data_io.py @@ -3,7 +3,8 @@ from dataclasses import asdict from typing import Dict -from scrap.utils.types import ScrapResult, ScrapBasicArgument +from scrap.utils.types import ScrapResult +from db.types import Councilor def export_results_to_json( @@ -38,3 +39,21 @@ def export_results_to_txt( for cid, councilors in results.items(): councilors = "\n".join([c.to_txt() for c in councilors]) f.write(f"| {cid} | {councilors}\n") + + +def import_results_from_json( + input_path: str, council_type: str +) -> Dict[str, ScrapResult]: + with open(input_path, "r", encoding="utf-8") as f: + results = json.load(f) + + results = { + k: ScrapResult( + council_id=k, + council_type=council_type, + councilors=[Councilor(**c) for c in v], + ) + for k, v in results.items() + } + + return results diff --git a/scrap/utils/runner.py b/scrap/utils/runner.py index da63e9e..cfee3da 100644 --- a/scrap/utils/runner.py +++ b/scrap/utils/runner.py @@ -12,7 +12,11 @@ from configurations.secrets import WebhookSecrets -from scrap.utils.export import export_results_to_json, export_results_to_txt +from scrap.utils.data_io import ( + export_results_to_json, + export_results_to_txt, + import_results_from_json, +) from scrap.utils.database import save_to_database from scrap.utils.types import ScrapResult, ScrapBasicArgument from scrap.utils.spreadsheet import read_record_from_spreadsheet @@ -250,26 +254,36 @@ def main(args: Dict[str, str]) -> None: where = args.get("where") current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M") - runner_kwargs = args | {"current_time": current_time} - runner = ScraperFactory(where, runner_kwargs).create_scraper() + json_import_path = args.get("import_from_json") + if json_import_path: + if not args.get("update_mongo"): + raise ValueError( + "JSON 파일에서 가져온 결과를 MongoDB에 업데이트하려면 --update-mongo (-m) 옵션을 사용해야 합니다." + ) - cids_to_run = parse_cids(args.get("cids"), where) - enable_webhook = args.get("disable-webhook") - if cids_to_run: - results = runner.run(cids_to_run, enable_webhook) + print("JSON 파일에서 결과를 가져옵니다. 다른 스크랩 관련 옵션은 무시됩니다.") + results = import_results_from_json(json_import_path, where) else: - results = runner.run() + runner_kwargs = args | {"current_time": current_time} + runner = ScraperFactory(where, runner_kwargs).create_scraper() + + cids_to_run = parse_cids(args.get("cids"), where) + enable_webhook = args.get("disable_webhook") + if cids_to_run: + results = runner.run(cids_to_run, enable_webhook) + else: + results = runner.run() - if args.get("update-mongo"): + if args.get("update_mongo"): for result in results.values(): save_to_database(result) - if args.get("output-store"): - if args.get("output-format") == "json": - export_results_to_json(results, args.get("output-path"), current_time) - elif args.get("output-format") == "txt": - export_results_to_txt(results, args.get("output-path"), current_time) + if args.get("output_store"): + if args.get("output_format") == "json": + export_results_to_json(results, args.get("output_path"), current_time) + elif args.get("output_format") == "txt": + export_results_to_txt(results, args.get("output_path"), current_time) def parse_cids(cids_str: Optional[str], where: str) -> Optional[Iterable[int]]: @@ -294,6 +308,9 @@ def parse_cids(cids_str: Optional[str], where: str) -> Optional[Iterable[int]]: choices=["local", "metro", "national", "leaders"], default="local", ) + parser.add_argument( + "--import-from-json", help="경로에서 JSON 파일을 읽어와 결과를 받아옴", default=None + ) parser.add_argument( "--data-source", help="사용할 데이터 소스 ('google_sheets', 'mongodb')",