diff --git a/pipelines/datasets/br_camara_dados_abertos/constants.py b/pipelines/datasets/br_camara_dados_abertos/constants.py index f83d71d73..ab20ad2e8 100644 --- a/pipelines/datasets/br_camara_dados_abertos/constants.py +++ b/pipelines/datasets/br_camara_dados_abertos/constants.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- +from datetime import datetime from enum import Enum +from dateutil.relativedelta import relativedelta + class constants(Enum): TABLE_NAME_ARCHITECTURE = { @@ -14,7 +17,9 @@ class constants(Enum): INPUT_PATH = "/tmp/input/" OUTPUT_PATH = "/tmp/output/" - ANOS = [2023] + ANOS = (datetime.now() - relativedelta(years=1)).year + + ANOS_ATUAL = (datetime.now()).year TABLE_LIST = { "votacao_microdados": "votacoes", @@ -53,3 +58,11 @@ class constants(Enum): "deputado_ocupacao": "https://docs.google.com/spreadsheets/d/1Cj6WE3jk63p21IjrINeaYKoMSOGoDDf1XpY3UH8sct4/edit#gid=0", "deputado_profissao": "https://docs.google.com/spreadsheets/d/12R2OY7eqUKxuojcpYYBsCiHyzUOLBBdObnkuv2JUMNI/edit#gid=0", } + + # ------------------------------------------------------------> PROPOSIÇÃO + + TABLE_LIST_PROPOSICAO = { + "proposicao_microdados": "proposicoes", + "proposicao_autor": "proposicoesAutores", + "proposicao_tema": "proposicoesTemas", + } diff --git a/pipelines/datasets/br_camara_dados_abertos/flows.py b/pipelines/datasets/br_camara_dados_abertos/flows.py index 09c9a2afd..56e89f7f5 100644 --- a/pipelines/datasets/br_camara_dados_abertos/flows.py +++ b/pipelines/datasets/br_camara_dados_abertos/flows.py @@ -4,20 +4,27 @@ from datetime import timedelta -from prefect import Parameter, case +from prefect import Parameter, case, unmapped from prefect.run_configs import KubernetesRun from prefect.storage import GCS from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants +from pipelines.datasets.br_camara_dados_abertos.constants import ( + constants as constants_camara, +) from pipelines.datasets.br_camara_dados_abertos.schedules import ( every_day_camara_dados_abertos, every_day_camara_dados_abertos_deputados, + every_day_camara_dados_abertos_proposicao, ) from pipelines.datasets.br_camara_dados_abertos.tasks import ( + dict_list_parameters, download_files_and_get_max_date, download_files_and_get_max_date_deputados, make_partitions, + output_path_list, + save_data_proposicao, treat_and_save_table, ) from pipelines.utils.constants import constants as utils_constants @@ -584,3 +591,100 @@ br_camara_deputado.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_camara_deputado.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) br_camara_deputado.schedule = every_day_camara_dados_abertos_deputados + + +# --------------------------------------------------------------------------------------- +# --------------------------------------------------------------------------------------- +# --------------------------------------------------------------------------------------- + +# ------------------------------ TABLES PROPOSIÇÃO --------------------------------------- + +with Flow( + name="br_camara_dados_abertos.proposicao", code_owners=["trick"] +) as br_camara_proposicao: + # Parameters + dataset_id = Parameter( + "dataset_id", default="br_camara_dados_abertos", required=True + ) + table_id = Parameter( + "table_id", + default=[ + "proposicao_microdados", + "proposicao_autor", + "proposicao_tema", + ], + required=True, + ) + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + materialize_after_dump = Parameter( + "materialize_after_dump", default=True, required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="Dump: ", + dataset_id=dataset_id, + table_id="Proposição", + wait=table_id, + ) + + update_metadata = Parameter("update_metadata", default=True, required=False) + + filepath = save_data_proposicao.map( + table_id=table_id, + upstream_tasks=[unmapped(rename_flow_run)], + ) + output_path_list = output_path_list(table_id) + wait_upload_table = create_table_and_upload_to_gcs.map( + data_path=output_path_list, + dataset_id=unmapped(dataset_id), + table_id=table_id, + dump_mode=unmapped("append"), + wait=unmapped(output_path_list), + upstream_tasks=[unmapped(filepath)], + ) + parameters = dict_list_parameters(dataset_id, materialization_mode, dbt_alias) + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run.map( + flow_name=unmapped(utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value), + project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value), + parameters=parameters, + labels=unmapped(current_flow_labels), + run_name=f"Materialize {unmapped(dataset_id)}.{table_id}", + upstream_tasks=[unmapped(wait_upload_table)], + ) + + wait_for_materialization = wait_for_flow_run.map( + materialization_flow, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(update_metadata, True): + update_django_metadata.map( + dataset_id=unmapped(dataset_id), + table_id=table_id, + date_format=["%Y-%m-%d", "%Y-%m-%d", "%Y-%m-%d"], + date_column_name=[{"date": "data"}, {"date": "data"}, {"date": "data"}], + coverage_type=["part_bdpro", "all_free", "all_free"], + prefect_mode=unmapped(materialization_mode), + time_delta=[{"months": 6}, {"months": 6}, {"months": 6}], + bq_project=unmapped("basedosdados"), + historical_database=[True, False, False], + upstream_tasks=[unmapped(wait_for_materialization)], + ) + +br_camara_proposicao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_camara_proposicao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_camara_proposicao.schedule = every_day_camara_dados_abertos_proposicao diff --git a/pipelines/datasets/br_camara_dados_abertos/schedules.py b/pipelines/datasets/br_camara_dados_abertos/schedules.py index 252544482..d8cfc05e8 100644 --- a/pipelines/datasets/br_camara_dados_abertos/schedules.py +++ b/pipelines/datasets/br_camara_dados_abertos/schedules.py @@ -52,3 +52,25 @@ ), ], ) + +every_day_camara_dados_abertos_proposicao = Schedule( + clocks=[ + CronClock( + cron="0 10 * * *", # every day at 9:00 UTC + start_date=datetime(2021, 1, 1), + labels=[constants.BASEDOSDADOS_PROD_AGENT_LABEL.value], + parameter_defaults={ + "update_metadata": True, + "dbt_alias": True, + "materialize_after_dump": True, + "materialization_mode": "prod", + "table_id": [ + "proposicao_microdados", + "proposicao_autor", + "proposicao_tema", + ], + "dataset_id": "br_camara_dados_abertos", + }, + ), + ], +) diff --git a/pipelines/datasets/br_camara_dados_abertos/tasks.py b/pipelines/datasets/br_camara_dados_abertos/tasks.py index f3502bcf6..f639b8f52 100644 --- a/pipelines/datasets/br_camara_dados_abertos/tasks.py +++ b/pipelines/datasets/br_camara_dados_abertos/tasks.py @@ -3,6 +3,7 @@ from datetime import timedelta import pandas as pd +import requests from prefect import task from pipelines.constants import constants @@ -10,6 +11,7 @@ constants as constants_camara, ) from pipelines.datasets.br_camara_dados_abertos.utils import ( + download_and_read_data_proposicao, get_data, get_data_deputados, read_and_clean_camara_dados_abertos, @@ -87,3 +89,117 @@ def treat_and_save_table(table_id): log(f"{constants_camara.OUTPUT_PATH.value}{table_id}/data.csv") return f"{constants_camara.OUTPUT_PATH.value}{table_id}/data.csv" + + +# -------------------------------------------------------------------> PROPOSIÇÃO + + +@task +def save_data_proposicao(table_id: str): + df = download_and_read_data_proposicao(table_id) + valor = constants_camara.TABLE_LIST_PROPOSICAO.value[table_id] + if not os.path.exists(f"{constants_camara.OUTPUT_PATH.value}{table_id}"): + os.makedirs(f"{constants_camara.OUTPUT_PATH.value}{table_id}") + + if table_id == "proposicao_microdados": + valor = constants_camara.TABLE_LIST_PROPOSICAO.value[table_id] + url = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{constants_camara.ANOS_ATUAL.value}.csv" + response = requests.get(url) + if response.status_code == 200: + df["ultimoStatus_despacho"] = df["ultimoStatus_despacho"].apply( + lambda x: str(x).replace(";", ",").replace("\n", "").replace("\r", "") + ) + df["ementa"] = df["ementa"].apply( + lambda x: str(x).replace(";", ",").replace("\n", "").replace("\r", "") + ) + df["ano"] = df.apply( + lambda x: x["dataApresentacao"][0:4] if x["ano"] == 0 else x["ano"], + axis=1, + ) + df.to_csv( + f"{constants_camara.OUTPUT_PATH.value}{table_id}/{valor}_{constants_camara.ANOS_ATUAL.value}.csv", + sep=",", + index=False, + encoding="utf-8", + ) + elif response.status_code >= 400 and response.status_code <= 599: + url_2 = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{constants_camara.ANOS.value}.csv" + response = requests.get(url_2) + + df["ultimoStatus_despacho"] = df["ultimoStatus_despacho"].apply( + lambda x: str(x).replace(";", ",").replace("\n", "").replace("\r", "") + ) + df["ementa"] = df["ementa"].apply( + lambda x: str(x).replace(";", ",").replace("\n", "").replace("\r", "") + ) + df["ano"] = df.apply( + lambda x: x["dataApresentacao"][0:4] if x["ano"] == 0 else x["ano"], + axis=1, + ) + df.to_csv( + f"{constants_camara.OUTPUT_PATH.value}{table_id}/{valor}_{constants_camara.ANOS.value}.csv", + sep=",", + index=False, + encoding="utf-8", + ) + + else: + valor = constants_camara.TABLE_LIST_PROPOSICAO.value[table_id] + url = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{constants_camara.ANOS_ATUAL.value}.csv" + response = requests.get(url) + if response.status_code == 200: + df.to_csv( + f"{constants_camara.OUTPUT_PATH.value}{table_id}/{valor}_{constants_camara.ANOS_ATUAL.value}.csv", + sep=",", + index=False, + encoding="utf-8", + ) + elif response.status_code >= 400 and response.status_code <= 599: + df.to_csv( + f"{constants_camara.OUTPUT_PATH.value}{table_id}/{valor}_{constants_camara.ANOS.value}.csv", + sep=",", + index=False, + encoding="utf-8", + ) + + +@task +def output_path_list(table_id_list): + output_path_list = [] + for table_id in table_id_list: + output_path_list.append(f"{constants_camara.OUTPUT_PATH.value}{table_id}/") + return output_path_list + + +@task +def dict_list_parameters(dataset_id, materialization_mode, dbt_alias): + table_id = [ + "proposicao_microdados", + "proposicao_autor", + "proposicao_tema", + ] + + parameters = [ + dict( + dataset_id=dataset_id, + table_id=table_id[0], + mode=materialization_mode, + dbt_alias=dbt_alias, + dbt_command="run and test", + ), + dict( + dataset_id=dataset_id, + table_id=table_id[1], + mode=materialization_mode, + dbt_alias=dbt_alias, + dbt_command="run and test", + ), + dict( + dataset_id=dataset_id, + table_id=table_id[2], + mode=materialization_mode, + dbt_alias=dbt_alias, + dbt_command="run and test", + ), + ] + return parameters diff --git a/pipelines/datasets/br_camara_dados_abertos/utils.py b/pipelines/datasets/br_camara_dados_abertos/utils.py index 6fb677dce..aece1f382 100644 --- a/pipelines/datasets/br_camara_dados_abertos/utils.py +++ b/pipelines/datasets/br_camara_dados_abertos/utils.py @@ -14,12 +14,15 @@ # -------------------------------------------------------------------------------------> VOTACAO def download_csvs_camara() -> None: """ - Docs: - This function does download all csvs from archives of camara dos deputados. - The csvs saved in conteiners of docker. + Downloads CSV files from the Camara de Proposicao API. + + This function iterates over the years and table list of chamber defined in the constants module, + and downloads the corresponding CSV files from the Camara de Proposicao API. The downloaded files are + saved in the input path specified in the constants module. + + Raises: + Exception: If there is an error in the request, such as a non-successful status code. - return: - None """ log("Downloading csvs from camara dos deputados") if not os.path.exists(constants.INPUT_PATH.value): @@ -102,14 +105,17 @@ def read_and_clean_camara_dados_abertos( def download_csvs_camara_deputado() -> None: """ - Docs: - This function does download all csvs from archives of camara dos deputados. - The csvs saved in conteiners of docker. + Downloads CSV files from the Camara de Proposicao API. + + This function iterates over the years and table list of congressperson defined in the constants module, + and downloads the corresponding CSV files from the Camara de Proposicao API. The downloaded files are + saved in the input path specified in the constants module. + + Raises: + Exception: If there is an error in the request, such as a non-successful status code. - return: - None """ - print("Downloading csvs from camara dos deputados") + log("Downloading csvs from camara dos deputados") if not os.path.exists(constants.INPUT_PATH.value): os.makedirs(constants.INPUT_PATH.value) @@ -155,3 +161,47 @@ def get_data_deputados(): ) return df + + +# ----------------------------------------------------------------------------------- > Proposição + + +def download_csvs_camara_proposicao(table_id: str) -> None: + """ + Downloads CSV files from the Camara de Proposicao API. + + This function iterates over the years and table list of propositions defined in the constants module, + and downloads the corresponding CSV files from the Camara de Proposicao API. The downloaded files are + saved in the input path specified in the constants module. + + Raises: + Exception: If there is an error in the request, such as a non-successful status code. + + """ + if not os.path.exists(constants.INPUT_PATH.value): + os.makedirs(constants.INPUT_PATH.value) + + valor = constants.TABLE_LIST_PROPOSICAO.value[table_id] + url = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{constants.ANOS_ATUAL.value}.csv" + response = requests.get(url) + if response.status_code == 200: + with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f: + f.write(response.content) + log(f"download complete {valor}-{constants.ANOS_ATUAL.value}") + + elif response.status_code >= 400 and response.status_code <= 599: + url_2 = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{constants.ANOS.value}.csv" + response = requests.get(url_2) + with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f: + f.write(response.content) + log(f"download complete {valor}-{constants.ANOS.value}") + + +def download_and_read_data_proposicao(table_id: str) -> pd.DataFrame: + download_csvs_camara_proposicao(table_id) + df = pd.read_csv( + f"{constants.INPUT_PATH.value}{constants.TABLE_LIST_PROPOSICAO.value[table_id]}.csv", + sep=";", + ) + + return df