diff --git a/pipelines/datasets/br_camara_dados_abertos/constants.py b/pipelines/datasets/br_camara_dados_abertos/constants.py index 32f1367a9..45bbaf2ff 100644 --- a/pipelines/datasets/br_camara_dados_abertos/constants.py +++ b/pipelines/datasets/br_camara_dados_abertos/constants.py @@ -57,7 +57,7 @@ class constants(Enum): # ------------------------------------------------------------> PROPOSIÇÃO TABLE_LIST_PROPOSICAO = { - "microdados": "proposicoes", - "autor": "proposicoesAutores", - "tema": "proposicoesTemas", + "proposicao_microdados": "proposicoes", + "proposicao_autor": "proposicoesAutores", + "proposicao_tema": "proposicoesTemas", } diff --git a/pipelines/datasets/br_camara_dados_abertos/flows.py b/pipelines/datasets/br_camara_dados_abertos/flows.py index aa4c9e98e..3633ce6e2 100644 --- a/pipelines/datasets/br_camara_dados_abertos/flows.py +++ b/pipelines/datasets/br_camara_dados_abertos/flows.py @@ -17,7 +17,6 @@ from pipelines.datasets.br_camara_dados_abertos.tasks import ( download_files_and_get_max_date, download_files_and_get_max_date_deputados, - download_files_and_get_max_date_proposicao, make_partitions, save_data_proposicao, treat_and_save_table, @@ -628,20 +627,16 @@ update_metadata = Parameter("update_metadata", default=True, required=False) filepath_proposicao_microdados = save_data_proposicao.map( - table_id=[ - "proposicao_microdados", - "proposicao_autor", - "proposicao_tema", - ], + table_id=table_id, upstream_tasks=[rename_flow_run], ) wait_upload_table = create_table_and_upload_to_gcs.map( data_path=filepath_proposicao_microdados, - dataset_id=dataset_id, + dataset_id=unmapped(dataset_id), table_id=table_id, - dump_mode="append", - wait=filepath_proposicao_microdados, + dump_mode=unmapped("append"), + wait=unmapped(filepath_proposicao_microdados), ) with case(materialize_after_dump, True): @@ -651,17 +646,17 @@ flow_name=unmapped(utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value), project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value), parameters={ - "dataset_id": dataset_id, + "dataset_id": unmapped(dataset_id), "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, + "mode": unmapped(materialization_mode), + "dbt_alias": unmapped(dbt_alias), }, labels=unmapped(current_flow_labels), - run_name=unmapped(f"Materialize {dataset_id}.{table_id[0]}"), + run_name=unmapped(f"Materialize {dataset_id}.{table_id}"), ) wait_for_materialization = wait_for_flow_run.map( - unmapped(materialization_flow), + materialization_flow, stream_states=unmapped(True), stream_logs=unmapped(True), raise_final_state=unmapped(True), diff --git a/pipelines/datasets/br_camara_dados_abertos/tasks.py b/pipelines/datasets/br_camara_dados_abertos/tasks.py index 2752229d2..9c9589612 100644 --- a/pipelines/datasets/br_camara_dados_abertos/tasks.py +++ b/pipelines/datasets/br_camara_dados_abertos/tasks.py @@ -10,12 +10,11 @@ constants as constants_camara, ) from pipelines.datasets.br_camara_dados_abertos.utils import ( + download_and_read_data_proposicao, get_data, get_data_deputados, - get_data_proposicao, read_and_clean_camara_dados_abertos, read_and_clean_data_deputados, - read_data_proposicao, ) from pipelines.utils.utils import log, to_partitions @@ -94,22 +93,11 @@ def treat_and_save_table(table_id): # -------------------------------------------------------------------> PROPOSIÇÃO -@task -def download_files_and_get_max_date_proposicao(): - df = get_data_proposicao() - df["dataApresentacao"] = pd.to_datetime(df["dataApresentacao"]) - max_data_proposicao = df["dataApresentacao"].max() - - return max_data_proposicao - - @task def save_data_proposicao(table_id: str): - df = read_data_proposicao(table_id=table_id) + df = download_and_read_data_proposicao() df.to_csv( - f"{constants.OUTPUT_PATH.value}{table_id}_{constants.ANOS.value}.csv", - index=False, - sep=",", + f"{constants_camara.OUTPUT_PATH.value}{table_id}/data.csv", sep=",", index=False ) return constants.OUTPUT_PATH.value diff --git a/pipelines/datasets/br_camara_dados_abertos/utils.py b/pipelines/datasets/br_camara_dados_abertos/utils.py index 54c35f3e0..4dea7cabd 100644 --- a/pipelines/datasets/br_camara_dados_abertos/utils.py +++ b/pipelines/datasets/br_camara_dados_abertos/utils.py @@ -166,7 +166,7 @@ def get_data_deputados(): # ----------------------------------------------------------------------------------- > Proposição -def download_csvs_camara_proposicao() -> None: +def download_csvs_camara_proposicao(table_id: str) -> None: """ Downloads CSV files from the Camara de Proposicao API. @@ -183,41 +183,23 @@ def download_csvs_camara_proposicao() -> None: os.makedirs(constants.INPUT_PATH.value) for anos in constants.ANOS.value: - for key, valor in constants.TABLE_LIST_PROPOSICAO.value.items(): - url_2 = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{anos}.csv" - - response = requests.get(url_2) - if response.status_code == 200: - with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f: - f.write(response.content) - print(f"download complete {valor}") - - elif response.status_code >= 400 and response.status_code <= 599: - raise Exception( - f"Erro de requisição: status code {response.status_code}" - ) + valor = constants.TABLE_LIST_PROPOSICAO.value[table_id] + url_2 = ( + f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{anos}.csv" + ) + response = requests.get(url_2) + if response.status_code == 200: + with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f: + f.write(response.content) + print(f"download complete {valor}") -def get_data_proposicao() -> pd.DataFrame: - """ - Reads the data for a given table_id from the Camara dos Deputados dataset. + elif response.status_code >= 400 and response.status_code <= 599: + raise Exception(f"Erro de requisição: status code {response.status_code}") - Parameters: - table_id (str): The ID of the table to read. - Returns: - pd.DataFrame: The DataFrame containing the data. - """ +def download_and_read_data_proposicao(table_id: str) -> pd.DataFrame: download_csvs_camara_proposicao() - df = pd.read_csv( - f'{constants.INPUT_PATH.value}{constants.TABLE_LIST_PROPOSICAO.value["microdados"]}.csv', - sep=";", - ) - - return df - - -def read_data_proposicao(table_id: str) -> pd.DataFrame: df = pd.read_csv( f"{constants.INPUT_PATH.value}{constants.TABLE_LIST_PROPOSICAO.value[table_id]}.csv", sep=";",