From 278bb0e49336c0b3669fa9ac9a300b2ebe7348c9 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Thu, 16 Jan 2025 14:33:50 -0300 Subject: [PATCH 1/8] update: data model for documentos_perfil_mensal --- pipelines/datasets/br_cvm_fi/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cvm_fi/tasks.py b/pipelines/datasets/br_cvm_fi/tasks.py index 6cf746147..e976c369d 100644 --- a/pipelines/datasets/br_cvm_fi/tasks.py +++ b/pipelines/datasets/br_cvm_fi/tasks.py @@ -470,7 +470,7 @@ def clean_data_make_partitions_perfil(diretorio, table_id): df_final[colunas_mapeamento] = df_final[colunas_mapeamento].applymap( lambda x: cvm_constants.MAPEAMENTO.value.get(x, x) ) - df_final["CNPJ_FUNDO"] = df_final["CNPJ_FUNDO"].str.replace(r"[/.-]", "") + df_final["CNPJ_FUNDO_CLASSE"] = df_final["CNPJ_FUNDO_CLASSE"].str.replace(r"[/.-]", "") df_final["CPF_CNPJ_COMITENTE_1"] = df_final["CPF_CNPJ_COMITENTE_1"].str.replace( r"[/.-]", "" ) From d4d7f5772f3435161e6b7daed8170dc3f3d15bdb Mon Sep 17 00:00:00 2001 From: Patrick Teixeira <105399231+tricktx@users.noreply.github.com> Date: Wed, 22 Jan 2025 09:43:32 -0300 Subject: [PATCH 2/8] [BugFix] br_cgu_beneficios_cidadao (#916) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix tables beneficios cidadao * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix path * add return in partition and fix path * add dataset in list * consertando a partição e alterando caminho * testando na cloud novo_bolsa_familia * diminuindo o valor do chunksize * movendo o read_csvc para dentro de task * colocando a partição dentro do tratamento * add append in parquet * concatenando os chunks * add flows and fix update_metadata * exclude files in cgu beneficios and add flows * remove breakpoint * fix partition and used dask * testando pipeline com polars * refactoring partition * remove dask and polars * fix input and output * deactivate licitacao_empenho * último commit * remove os * Revert "último commit" This reverts commit 9a22f118d044519ae52f2c8506dd76908d39aa4e. * consertando o último commit --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../br_cgu_beneficios_cidadao/__init__.py | 0 .../br_cgu_beneficios_cidadao/constants.py | 115 ----- .../br_cgu_beneficios_cidadao/flows.py | 442 ++---------------- .../br_cgu_beneficios_cidadao/tasks.py | 131 ------ .../br_cgu_beneficios_cidadao/utils.py | 391 ---------------- .../br_cgu_licitacao_contrato/flows.py | 2 +- pipelines/utils/crawler_cgu/constants.py | 128 +++++ pipelines/utils/crawler_cgu/flows.py | 99 ++++ pipelines/utils/crawler_cgu/tasks.py | 104 ++++- pipelines/utils/crawler_cgu/utils.py | 56 ++- pipelines/utils/utils.py | 3 +- 11 files changed, 408 insertions(+), 1063 deletions(-) delete mode 100644 pipelines/datasets/br_cgu_beneficios_cidadao/__init__.py delete mode 100644 pipelines/datasets/br_cgu_beneficios_cidadao/constants.py delete mode 100644 pipelines/datasets/br_cgu_beneficios_cidadao/tasks.py delete mode 100644 pipelines/datasets/br_cgu_beneficios_cidadao/utils.py diff --git a/pipelines/datasets/br_cgu_beneficios_cidadao/__init__.py b/pipelines/datasets/br_cgu_beneficios_cidadao/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/pipelines/datasets/br_cgu_beneficios_cidadao/constants.py b/pipelines/datasets/br_cgu_beneficios_cidadao/constants.py deleted file mode 100644 index c870d8131..000000000 --- a/pipelines/datasets/br_cgu_beneficios_cidadao/constants.py +++ /dev/null @@ -1,115 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Constant values for the datasets projects -""" - -from enum import Enum - -import numpy as np - - -class constants(Enum): # pylint: disable=c0103 - """ - Constant values for the br_cgu_bolsa_familia project - """ - - # Prefect tasks retry policy - TASK_MAX_RETRIES = 5 - TASK_RETRY_DELAY = 10 - - CHROME_DRIVER = "https://chromedriver.storage.googleapis.com/114.0.5735.90/chromedriver_linux64.zip" - - PATH = "/tmp/data/br_cgu_beneficios_cidadao" - - TMP_DATA_DIR = "/tmp/data/br_cgu_beneficios_cidadao/tmp" - - ROOT_URL = ( - "https://portaldatransparencia.gov.br/download-de-dados/novo-bolsa-familia" - ) - ROOT_URL_GARANTIA_SAFRA = ( - "https://portaldatransparencia.gov.br/download-de-dados/garantia-safra" - ) - ROOT_URL_BPC = "https://portaldatransparencia.gov.br/download-de-dados/bpc" - - MAIN_URL_NOVO_BOLSA_FAMILIA = "https://dadosabertos-download.cgu.gov.br/PortalDaTransparencia/saida/novo-bolsa-familia/" - - MAIN_URL_GARANTIA_SAFRA = "https://dadosabertos-download.cgu.gov.br/PortalDaTransparencia/saida/garantia-safra/" - - MAIN_URL_BPC = ( - "https://dadosabertos-download.cgu.gov.br/PortalDaTransparencia/saida/bpc/" - ) - - DTYPES_NOVO_BOLSA_FAMILIA = { - "MÊS COMPETÊNCIA": str, - "MÊS REFERÊNCIA": str, - "UF": str, - "CÓDIGO MUNICÍPIO SIAFI": str, - "NOME MUNICÍPIO": str, - "CPF FAVORECIDO": str, - "NIS FAVORECIDO": str, - "NOME FAVORECIDO": str, - "VALOR PARCELA": np.float64, - } - DTYPES_GARANTIA_SAFRA = { - "MÊS REFERÊNCIA": str, - "UF": str, - "CÓDIGO MUNICÍPIO SIAFI": str, - "NOME MUNICÍPIO": str, - "NIS FAVORECIDO": str, - "NOME FAVORECIDO": str, - "VALOR PARCELA": np.float64, - } - - DTYPES_BPC = { - "MÊS COMPETÊNCIA": str, - "MÊS REFERÊNCIA": str, - "UF": str, - "CÓDIGO MUNICÍPIO SIAFI": str, - "NOME MUNICÍPIO": str, - "NIS BENEFICIÁRIO": str, - "CPF BENEFICIÁRIO": str, - "NOME BENEFICIÁRIO": str, - "NIS REPRESENTANTE LEGAL": str, - "CPF REPRESENTANTE LEGAL": str, - "NOME REPRESENTANTE LEGAL": str, - "NÚMERO BENEFÍCIO": str, - "BENEFÍCIO CONCEDIDO JUDICIALMENTE": str, - "VALOR PARCELA": np.float64, - } - - RENAMER_NOVO_BOLSA_FAMILIA = { - "MÊS COMPETÊNCIA": "mes_competencia", - "MÊS REFERÊNCIA": "mes_referencia", - "UF": "sigla_uf", - "CÓDIGO MUNICÍPIO SIAFI": "id_municipio_siafi", - "NOME MUNICÍPIO": "municipio", - "CPF FAVORECIDO": "cpf", - "NIS FAVORECIDO": "nis", - "NOME FAVORECIDO": "nome", - "VALOR PARCELA": "valor", - } - RENAMER_GARANTIA_SAFRA = { - "MÊS REFERÊNCIA": "mes_referencia", - "UF": "sigla_uf", - "CÓDIGO MUNICÍPIO SIAFI": "id_municipio_siafi", - "NOME MUNICÍPIO": "municipio", - "NIS FAVORECIDO": "nis", - "NOME FAVORECIDO": "nome", - "VALOR PARCELA": "valor", - } - RENAMER_BPC = { - "MÊS COMPETÊNCIA": "mes_competencia", - "MÊS REFERÊNCIA": "mes_referencia", - "UF": "sigla_uf", - "CÓDIGO MUNICÍPIO SIAFI": "id_municipio_siafi", - "NOME MUNICÍPIO": "municipio", - "NIS BENEFICIÁRIO": "nis", - "CPF BENEFICIÁRIO": "cpf", - "NOME BENEFICIÁRIO": "nome", - "NIS REPRESENTANTE LEGAL": "nis_representante", - "CPF REPRESENTANTE LEGAL": "cpf_representante", - "NOME REPRESENTANTE LEGAL": "nome_representante", - "NÚMERO BENEFÍCIO": "numero", - "BENEFÍCIO CONCEDIDO JUDICIALMENTE": "concedido_judicialmente", - "VALOR PARCELA": "valor", - } diff --git a/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py b/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py index ea44d3ef8..5f436c039 100644 --- a/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py +++ b/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py @@ -1,419 +1,35 @@ # -*- coding: utf-8 -*- -""" -Flows for br_cgu_bolsa_familia -""" -from datetime import timedelta - -from prefect import Parameter, case +from copy import deepcopy from prefect.run_configs import KubernetesRun from prefect.storage import GCS -from prefect.tasks.prefect import create_flow_run, wait_for_flow_run -from pipelines.datasets.br_cgu_beneficios_cidadao.constants import constants as constants_cgu from pipelines.constants import constants +from pipelines.utils.crawler_cgu.flows import flow_cgu_beneficios_cidadao from pipelines.datasets.br_cgu_beneficios_cidadao.schedules import ( - every_day_bpc, - every_day_garantia_safra, every_day_novo_bolsa_familia, -) -from pipelines.datasets.br_cgu_beneficios_cidadao.tasks import ( - get_source_max_date, - scrape_download_page, - get_updated_files, - crawler_beneficios_cidadao, -) -from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.tasks import ( - check_if_data_is_outdated, - update_django_metadata, - task_get_api_most_recent_date, -) -from pipelines.utils.tasks import ( # update_django_metadata, - create_table_and_upload_to_gcs, - get_current_flow_labels, - rename_current_flow_run_dataset_table, -) - -### ### -# NOVO BOLSA FAMÍLIA # -### ### -with Flow( - name="br_cgu_beneficios_cidadao.novo_bolsa_familia", - code_owners=[ - "equipe_pipelines", - ], -) as datasets_br_cgu_bolsa_familia_flow: - dataset_id = Parameter( - "dataset_id", default="br_cgu_beneficios_cidadao", required=False - ) - table_id = Parameter("table_id", default="novo_bolsa_familia", required=False) - - historical_data = Parameter("historical_data", default=True, required=False) - - update_metadata = Parameter("update_metadata", default=False, required=False) - url = Parameter("url", default=constants_cgu.MAIN_URL_NOVO_BOLSA_FAMILIA.value, required=False) - year = Parameter("year", default="2023", required=False) - - materialization_mode = Parameter( - "materialization_mode", default="dev", required=False - ) - - materialize_after_dump = Parameter( - "materialize_after_dump", default=True, required=False - ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) - - rename_flow_run = rename_current_flow_run_dataset_table( - prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id - ) - - files_and_dates_dataframe = scrape_download_page(table_id= table_id) - - source_max_date = get_source_max_date(files_df=files_and_dates_dataframe, upstream_tasks=[files_and_dates_dataframe]) - - update = check_if_data_is_outdated( - dataset_id=dataset_id, - table_id=table_id, - data_source_max_date=source_max_date, - date_format="%Y-%m", - upstream_tasks=[source_max_date,files_and_dates_dataframe ], - ) - - with case(update, True): - table_last_date = task_get_api_most_recent_date(dataset_id = dataset_id, - table_id = table_id, - date_format ="%Y-%m", - upstream_tasks=[update]) - - download_files_list = get_updated_files(files_df = files_and_dates_dataframe, - table_last_date = table_last_date, - upstream_tasks=[table_last_date]) - - output_filepath = crawler_beneficios_cidadao( - table_id = table_id, - url = url, - files_df = files_and_dates_dataframe, - historical_data=historical_data, - files=download_files_list, - year=year, - upstream_tasks=[download_files_list,table_last_date], - ) - - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_filepath, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - source_format="parquet", - wait=output_filepath, - ) - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - "dbt_command": "run/test", - "disable_elementary": False, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - upstream_tasks = [wait_upload_table] - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id=table_id, - date_column_name={ - "year": "ano_competencia", - "month": "mes_competencia", - }, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - -datasets_br_cgu_bolsa_familia_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -datasets_br_cgu_bolsa_familia_flow.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value -) -datasets_br_cgu_bolsa_familia_flow.schedule = every_day_novo_bolsa_familia - - -### ### -# GARANTIA SAFRA # -### ### - -with Flow( - name="br_cgu_beneficios_cidadao.garantia_safra", - code_owners=[ - "equipe_pipelines", - ], -) as datasets_br_cgu_garantia_safra_flow: - dataset_id = Parameter( - "dataset_id", default="br_cgu_beneficios_cidadao", required=False - ) - - table_id = Parameter("table_id", default="garantia_safra", required=False) - - historical_data = Parameter("historical_data", default=False, required=False) - - update_metadata = Parameter("update_metadata", default=False, required=False) - - materialization_mode = Parameter( - "materialization_mode", default="dev", required=False - ) - - materialize_after_dump = Parameter( - "materialize_after_dump", default=False, required=False - ) - url = Parameter("url", default=constants_cgu.MAIN_URL_GARANTIA_SAFRA.value, required=False) - dbt_alias = Parameter("dbt_alias", default=True, required=False) - - year = Parameter("year", default="2023", required=False) - - rename_flow_run = rename_current_flow_run_dataset_table( - prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id - ) - - files_and_dates_dataframe = scrape_download_page(table_id= table_id) - - source_max_date = get_source_max_date(files_df=files_and_dates_dataframe, upstream_tasks=[files_and_dates_dataframe]) - - update = check_if_data_is_outdated( - dataset_id=dataset_id, - table_id=table_id, - data_source_max_date=source_max_date, - date_format="%Y-%m", - upstream_tasks=[source_max_date,files_and_dates_dataframe ], - ) - - with case(update, True): - table_last_date = task_get_api_most_recent_date(dataset_id = dataset_id, - table_id = table_id, - date_format ="%Y-%m", - upstream_tasks=[update]) - - download_files_list = get_updated_files(files_df = files_and_dates_dataframe, - table_last_date = table_last_date, - upstream_tasks=[table_last_date]) - - output_filepath = crawler_beneficios_cidadao( - table_id = table_id, - url = url, - files_df = files_and_dates_dataframe, - historical_data=historical_data, - files=download_files_list, - year=year, - upstream_tasks=[download_files_list,table_last_date], - ) - - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_filepath, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - source_format="csv", - wait=output_filepath, - ) - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - "dbt_command": "run/test", - "disable_elementary": False, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - upstream_tasks = [wait_upload_table] - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id=table_id, - date_column_name={ - "year": "ano_referencia", - "month": "mes_referencia", - }, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - ) - -datasets_br_cgu_garantia_safra_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -datasets_br_cgu_garantia_safra_flow.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value -) -datasets_br_cgu_garantia_safra_flow.schedule = every_day_garantia_safra -### ### -# BPC # -### ### - -with Flow( - name="br_cgu_beneficios_cidadao.bpc", - code_owners=[ - "equipe_pipelines", - ], -) as datasets_br_cgu_bpc_flow: - dataset_id = Parameter( - "dataset_id", default="br_cgu_beneficios_cidadao", required=False - ) - table_id = Parameter("table_id", default="bpc", required=False) - historical_data = Parameter("historical_data", default=False, required=False) - url = Parameter("url", default=constants_cgu.MAIN_URL_BPC.value, required=False) - update_metadata = Parameter("update_metadata", default=False, required=False) - materialization_mode = Parameter( - "materialization_mode", default="dev", required=False - ) - - materialize_after_dump = Parameter( - "materialize_after_dump", default=False, required=False - ) - - dbt_alias = Parameter("dbt_alias", default=True, required=False) - - year = Parameter("year", default="2023", required=False) - - rename_flow_run = rename_current_flow_run_dataset_table( - prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id - ) - - files_and_dates_dataframe = scrape_download_page(table_id= table_id) - - source_max_date = get_source_max_date(files_df=files_and_dates_dataframe, upstream_tasks=[files_and_dates_dataframe]) - - update = check_if_data_is_outdated( - dataset_id=dataset_id, - table_id=table_id, - data_source_max_date=source_max_date, - date_format="%Y-%m", - upstream_tasks=[source_max_date,files_and_dates_dataframe ], - ) - - with case(update, True): - table_last_date = task_get_api_most_recent_date(dataset_id = dataset_id, - table_id = table_id, - date_format ="%Y-%m", - upstream_tasks=[update]) - - download_files_list = get_updated_files(files_df = files_and_dates_dataframe, - table_last_date = table_last_date, - upstream_tasks=[table_last_date]) - - output_filepath = crawler_beneficios_cidadao( - table_id = table_id, - url = url, - files_df = files_and_dates_dataframe, - historical_data=historical_data, - files=download_files_list, - year=year, - upstream_tasks=[download_files_list,table_last_date], - ) - - wait_upload_table = create_table_and_upload_to_gcs( - data_path=output_filepath, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - source_format="csv", - wait=output_filepath, - ) - - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - "dbt_command": "run/test", - "disable_elementary": False, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - upstream_tasks = [wait_upload_table] - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id=table_id, - date_column_name={ - "year": "ano_competencia", - "month": "mes_competencia", - }, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], - ) - -datasets_br_cgu_bpc_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -datasets_br_cgu_bpc_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -datasets_br_cgu_bpc_flow.schedule = every_day_bpc + every_day_garantia_safra, + every_day_bpc, + ) + +# ! - > Flow: br_cgu_beneficios_cidadao__novo_bolsa_familia +br_cgu_beneficios_cidadao__novo_bolsa_familia = deepcopy(flow_cgu_beneficios_cidadao) +br_cgu_beneficios_cidadao__novo_bolsa_familia.name = "br_cgu_beneficios_cidadao.novo_bolsa_familia" +br_cgu_beneficios_cidadao__novo_bolsa_familia.code_owners = ["trick"] +br_cgu_beneficios_cidadao__novo_bolsa_familia.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_beneficios_cidadao__novo_bolsa_familia.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_beneficios_cidadao__novo_bolsa_familia.schedule = every_day_novo_bolsa_familia, + +# ! - > Flow: br_cgu_beneficios_cidadao__garantia_safra +br_cgu_beneficios_cidadao__garantia_safra = deepcopy(flow_cgu_beneficios_cidadao) +br_cgu_beneficios_cidadao__garantia_safra.name = "br_cgu_beneficios_cidadao.garantia_safra" +br_cgu_beneficios_cidadao__garantia_safra.code_owners = ["trick"] +br_cgu_beneficios_cidadao__garantia_safra.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_beneficios_cidadao__garantia_safra.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_beneficios_cidadao__garantia_safra.schedule = every_day_garantia_safra + +# ! - > br_cgu_beneficios_cidadao__bpc +br_cgu_beneficios_cidadao__bpc = deepcopy(flow_cgu_beneficios_cidadao) +br_cgu_beneficios_cidadao__bpc.name = "br_cgu_beneficios_cidadao.bpc" +br_cgu_beneficios_cidadao__bpc.code_owners = ["trick"] +br_cgu_beneficios_cidadao__bpc.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_beneficios_cidadao__bpc.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_beneficios_cidadao__bpc.schedule = every_day_bpc \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_beneficios_cidadao/tasks.py b/pipelines/datasets/br_cgu_beneficios_cidadao/tasks.py deleted file mode 100644 index a4f8f8428..000000000 --- a/pipelines/datasets/br_cgu_beneficios_cidadao/tasks.py +++ /dev/null @@ -1,131 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Tasks for br_cgu_beneficios_cidadao -""" - -from datetime import timedelta - -import pandas as pd -from prefect import task -from pipelines.constants import constants as constants_root -from pipelines.datasets.br_cgu_beneficios_cidadao.constants import constants -from pipelines.datasets.br_cgu_beneficios_cidadao.utils import ( - download_unzip_csv, - extract_dates, - parquet_partition, -) -from pipelines.utils.utils import log -from typing import List, Union -from datetime import datetime, date - - -@task -def print_last_file(file): - log(f"Arquivo baixado --> {file}") - -@task( - max_retries=constants_root.TASK_MAX_RETRIES.value, - retry_delay=timedelta(seconds=constants_root.TASK_RETRY_DELAY.value), -) -def scrape_download_page(table_id): - dates = extract_dates(table=table_id) - - return dates - -@task -def get_updated_files(files_df, table_last_date): - files_df['ano_mes'] = files_df['ano'] + '-' + files_df['mes_numero'] + '-01' - files_df['ano_mes'] = files_df['ano_mes'].apply(lambda x: datetime.strptime(x, "%Y-%m-%d").date()) - return files_df[files_df['ano_mes'] > table_last_date]['urls'].to_list() - - -@task( - max_retries=constants_root.TASK_MAX_RETRIES.value, - retry_delay=timedelta(seconds=constants_root.TASK_RETRY_DELAY.value), -) -def get_source_max_date(files_df) -> list: - """ - Encontra a data mais recente em um DataFrame e retorna a data e a URL correspondente. - - Parâmetros: - - table_id (str): O identificador da tabela que contém os dados. - - Retorna: - Uma lista contendo a data mais recente e a URL correspondente. - - Exemplo de uso: - date, url = crawl_last_date("minha_tabela") - """ - #dates = extract_dates(table=table_id) - files_df["data"] = pd.to_datetime( - files_df["ano"].astype(str) + "-" + files_df["mes_numero"].astype(str) + "-01" - ) - max_date = files_df["data"].max() - - return max_date - - -@task( - max_retries=constants_root.TASK_MAX_RETRIES.value, - retry_delay=timedelta(seconds=constants_root.TASK_RETRY_DELAY.value), -) # noqa -def crawler_beneficios_cidadao(table_id: str, - url: str, - files_df: pd.DataFrame, - historical_data: bool, - files: Union[List[str], None] = None, - year: str = "2023" - ) -> str: - """ - Baixa e processa dados do portal da transparência relacionados aos benefícios do cidadão. - - Args: - table_id (str): ID da tabela para identificar os dados. - files_df (pd.DataFrame): DataFrame contendo informações sobre os arquivos a serem baixados. - historical_data (bool): Indica se os dados históricos devem ser baixados. - files (List[str], optional): Lista de URLs dos arquivos mais recentes, requeridos se historical_data for False. - year (str, optional): Ano dos dados históricos a serem baixados. - url (str): URL base para baixar os arquivos CSV. - - Returns: - str: Caminho do diretório onde os dados processados foram armazenados. - - Raises: - ValueError: Se historical_data for True e files for None. - - Note: - Esta função baixa arquivos CSV relacionados aos benefícios do cidadão, - opcionalmente filtrados por ano, e os processa em formato parquet. - - Example: - >>> crawler_beneficios_cidadao(table_id='my_table', - ... files_df=my_files_df, - ... historical_data=True, - ... url='https://example.com/beneficios_cidadao/') - - """ - if historical_data: - - endpoints = files_df[files_df["ano"] == year]["urls"].to_list() - - log("BAIXANDO DADOS HISTÓRICOS") - log(f"ENDPOINTS >> {endpoints}") - - download_unzip_csv( - url=url, - files=endpoints, - id=table_id, - ) - else: - log("BAIXANDO DADOS MAIS RECENTES") - download_unzip_csv( - url=url, - files=files, - id=table_id, - ) - - parquet_partition( - path=f"/tmp/data/br_cgu_beneficios_cidadao/{table_id}/input/", - table=table_id, - ) - return f"/tmp/data/br_cgu_beneficios_cidadao/{table_id}/output/" \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_beneficios_cidadao/utils.py b/pipelines/datasets/br_cgu_beneficios_cidadao/utils.py deleted file mode 100644 index 26b20c83c..000000000 --- a/pipelines/datasets/br_cgu_beneficios_cidadao/utils.py +++ /dev/null @@ -1,391 +0,0 @@ -# -*- coding: utf-8 -*- -import os -import zipfile - -import pandas as pd -import requests -from bs4 import BeautifulSoup -from selenium import webdriver -from selenium.webdriver.chrome.service import Service as ChromeService -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import Select -from tqdm import tqdm -from webdriver_manager.chrome import ChromeDriverManager - -from pipelines.datasets.br_cgu_beneficios_cidadao.constants import constants -from pipelines.utils.utils import log, to_partitions - - -def download_unzip_csv( - url: str, files, chunk_size: int = 128, mkdir: bool = True, id="teste" -) -> str: - """ - Downloads and unzips a .csv file from a given list of files and saves it to a local directory. - Parameters: - ----------- - url: str - The base URL from which to download the files. - files: list or str - The .zip file names or a single .zip file name to download the csv file from. - chunk_size: int, optional - The size of each chunk to download in bytes. Default is 128 bytes. - mkdir: bool, optional - Whether to create a new directory for the downloaded file. Default is False. - Returns: - -------- - str - The path to the directory where the downloaded file was saved. - """ - - if mkdir: - os.makedirs(f"/tmp/data/br_cgu_beneficios_cidadao/{id}/input/", exist_ok=True) - - request_headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36", - } - - if isinstance(files, list): - for file in files: - log(f"Baixando o arquivo {file}") - download_url = f"{url}{file}" - save_path = f"/tmp/data/br_cgu_beneficios_cidadao/{id}/input/{file}" - - r = requests.get( - download_url, headers=request_headers, stream=True, timeout=50 - ) - with open(save_path, "wb") as fd: - for chunk in tqdm(r.iter_content(chunk_size=chunk_size)): - fd.write(chunk) - - try: - with zipfile.ZipFile(save_path) as z: - z.extractall(f"/tmp/data/br_cgu_beneficios_cidadao/{id}/input") - log("Dados extraídos com sucesso!") - - except zipfile.BadZipFile: - log(f"O arquivo {file} não é um arquivo ZIP válido.") - - os.system( - f'cd /tmp/data/br_cgu_beneficios_cidadao/{id}/input; find . -type f ! -iname "*.csv" -delete' - ) - - elif isinstance(files, str): - log(f"Baixando o arquivo {files}") - download_url = f"{url}{files}" - save_path = f"/tmp/data/br_cgu_beneficios_cidadao/{id}/input/{files}" - - r = requests.get(download_url, headers=request_headers, stream=True, timeout=10) - with open(save_path, "wb") as fd: - for chunk in tqdm(r.iter_content(chunk_size=chunk_size)): - fd.write(chunk) - - try: - with zipfile.ZipFile(save_path) as z: - z.extractall(f"/tmp/data/br_cgu_beneficios_cidadao/{id}/input") - log("Dados extraídos com sucesso!") - - except zipfile.BadZipFile: - log(f"O arquivo {files} não é um arquivo ZIP válido.") - - os.system( - f'cd /tmp/data/br_cgu_beneficios_cidadao/{id}/input; find . -type f ! -iname "*.csv" -delete' - ) - - else: - raise ValueError("O argumento 'files' possui um tipo inadequado.") - - return f"/tmp/data/br_cgu_beneficios_cidadao/{id}/input/" - - -def extract_dates(table: str) -> pd.DataFrame: - """ - Extrai datas e URLs de download do site do Portal da Transparência e retorna um DataFrame. - - Parâmetros: - - table (str): O nome da tabela de dados desejada (possíveis valores: "novo_bolsa_familia", "garantia_safra", "bpc"). - - Retorna: - - Um DataFrame contendo as colunas "ano", "mes_numero", "mes_nome" e "urls". - - Exemplo de uso: - data_frame = extract_dates("novo_bolsa_familia") - """ - if not os.path.exists(constants.PATH.value): - os.makedirs(constants.PATH.value, exist_ok=True) - - if not os.path.exists(constants.TMP_DATA_DIR.value): - os.makedirs(constants.TMP_DATA_DIR.value, exist_ok=True) - options = webdriver.ChromeOptions() - - # https://github.com/SeleniumHQ/selenium/issues/11637 - prefs = { - "download.default_directory": constants.TMP_DATA_DIR.value, - "download.prompt_for_download": False, - "download.directory_upgrade": True, - "safebrowsing.enabled": True, - } - options.add_experimental_option( - "prefs", - prefs, - ) - - options.add_argument("--headless") - options.add_argument("--test-type") - options.add_argument("--disable-gpu") - options.add_argument("--no-first-run") - options.add_argument("--no-sandbox") - options.add_argument("--disable-dev-shm-usage") - options.add_argument("--no-default-browser-check") - options.add_argument("--ignore-certificate-errors") - options.add_argument("--start-maximized") - options.add_argument( - "user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36" - ) - - driver = webdriver.Chrome( - service=ChromeService(ChromeDriverManager().install()), options=options - ) - - if table == "novo_bolsa_familia": - driver.get(constants.ROOT_URL.value) - driver.implicitly_wait(15) - elif table == "garantia_safra": - driver.get(constants.ROOT_URL_GARANTIA_SAFRA.value) - driver.implicitly_wait(15) - else: - driver.get(constants.ROOT_URL_BPC.value) - driver.implicitly_wait(15) - - page_source = driver.page_source - BeautifulSoup(page_source, "html.parser") - - select_anos = Select(driver.find_element(By.ID, "links-anos")) - anos_meses = {} - - # Iterar sobre as opções dentro do select de anos - for option_ano in select_anos.options: - valor_ano = option_ano.get_attribute("value") - ano = option_ano.text - - select_anos.select_by_value(valor_ano) - - driver.implicitly_wait(10) - - select_meses = Select(driver.find_element(By.ID, "links-meses")) - - meses_dict = {} - - # Iterar sobre as opções dentro do select de meses - for option_mes in select_meses.options: - valor_mes = option_mes.get_attribute("value") - nome_mes = option_mes.text.capitalize() - meses_dict[valor_mes] = nome_mes - - anos_meses[ano] = meses_dict - - driver.quit() - - anos = [] - meses_numeros = [] - meses_nomes = [] - - # Iterar sobre o dicionário para extrair os dados - for ano, meses in anos_meses.items(): - for mes_numero, mes_nome in meses.items(): - anos.append(ano) - meses_numeros.append(mes_numero) - meses_nomes.append(mes_nome) - - # Criar o DataFrame - data = {"ano": anos, "mes_numero": meses_numeros, "mes_nome": meses_nomes} - df = pd.DataFrame(data) - df["urls"] = None - for index, row in df.iterrows(): - if table == "novo_bolsa_familia": - df["urls"][index] = f"{row.ano}{row.mes_numero}_NovoBolsaFamilia.zip" - elif table == "garantia_safra": - df["urls"][index] = f"{row.ano}{row.mes_numero}_GarantiaSafra.zip" - else: - df["urls"][index] = f"{row.ano}{row.mes_numero}_BPC.zip" - return df - - -def parquet_partition_2(path: str, table: str): - # dfs = [] - for nome_arquivo in os.listdir(path): - if nome_arquivo.endswith(".csv"): - log(f"Carregando o arquivo: {nome_arquivo}") - - df = None - if table == "novo_bolsa_familia": - with pd.read_csv( - f"{path}{nome_arquivo}", - sep=";", - encoding="latin-1", - dtype=constants.DTYPES_NOVO_BOLSA_FAMILIA.value, - chunksize=1000000, - decimal=",", - ) as reader: - for chunk in tqdm(reader): - chunk.rename( - columns=constants.RENAMER_NOVO_BOLSA_FAMILIA.value, - inplace=True, - ) - if df is None: - df = chunk - else: - df = pd.concat([df, chunk], axis=0) - elif table == "garantia_safra": - with pd.read_csv( - f"{path}{nome_arquivo}", - sep=";", - encoding="latin-1", - dtype=constants.DTYPES_GARANTIA_SAFRA.value, - chunksize=1000000, - decimal=",", - ) as reader: - for chunk in tqdm(reader): - chunk.rename( - columns=constants.RENAMER_GARANTIA_SAFRA.value, - inplace=True, - ) - if df is None: - df = chunk - else: - df = pd.concat([df, chunk], axis=0) - else: - with pd.read_csv( - f"{path}{nome_arquivo}", - sep=";", - encoding="latin-1", - dtype=constants.DTYPES_BPC.value, - chunksize=1000000, - decimal=",", - na_values="", - ) as reader: - for chunk in tqdm(reader): - chunk.rename( - columns=constants.RENAMER_BPC.value, - inplace=True, - ) - if df is None: - df = chunk - else: - df = pd.concat([df, chunk], axis=0) - - log("Lendo dataset") - os.makedirs( - f"/tmp/data/br_cgu_beneficios_cidadao/{table}/output/", exist_ok=True - ) - if table == "novo_bolsa_familia": - to_partitions( - df, - partition_columns=["mes_competencia", "sigla_uf"], - savepath=f"/tmp/data/br_cgu_beneficios_cidadao/{table}/output/", - file_type="parquet", - ) - elif table == "bpc": - to_partitions( - df, - partition_columns=["mes_competencia"], - savepath=f"/tmp/data/br_cgu_beneficios_cidadao/{table}/output/", - file_type="csv", - ) - else: - to_partitions( - df, - partition_columns=["mes_referencia"], - savepath=f"/tmp/data/br_cgu_beneficios_cidadao/{table}/output/", - file_type="parquet", - ) - - log("Partição feita.") - - return f"/tmp/data/br_cgu_beneficios_cidadao/{table}/output/" - - -def parquet_partition(path: str, table: str) -> str: - """ - Carrega arquivos CSV, realiza transformações e cria partições em um formato específico, retornando o caminho de saída. - - Parâmetros: - - path (str): O caminho para os arquivos a serem processados. - - table (str): O nome da tabela (possíveis valores: "novo_bolsa_familia", "garantia_safra", "bpc"). - - Retorna: - - str: O caminho do diretório de saída onde as partições foram criadas. - - Exemplo de uso: - output_path = parquet_partition("/caminho/para/arquivos/", "novo_bolsa_familia") - """ - - output_directory = f"/tmp/data/br_cgu_beneficios_cidadao/{table}/output/" - - for nome_arquivo in os.listdir(path): - if nome_arquivo.endswith(".csv"): - log(f"Carregando o arquivo: {nome_arquivo}") - - df = None - with pd.read_csv( - f"{path}{nome_arquivo}", - sep=";", - encoding="latin-1", - chunksize=1000000, - decimal=",", - na_values="" if table != "bpc" else None, - dtype=( - constants.DTYPES_NOVO_BOLSA_FAMILIA.value - if table == "novo_bolsa_familia" - else ( - constants.DTYPES_GARANTIA_SAFRA.value - if table == "garantia_safra" - else constants.DTYPES_BPC.value - ) - ), - ) as reader: - for chunk in tqdm(reader): - chunk.rename( - columns=( - constants.RENAMER_NOVO_BOLSA_FAMILIA.value - if table == "novo_bolsa_familia" - else ( - constants.RENAMER_GARANTIA_SAFRA.value - if table == "garantia_safra" - else constants.RENAMER_BPC.value - ) - ), - inplace=True, - ) - if df is None: - df = chunk - else: - df = pd.concat([df, chunk], axis=0) - - log("Lendo dataset") - os.makedirs(output_directory, exist_ok=True) - - if table == "novo_bolsa_familia": - to_partitions( - df, - partition_columns=["mes_competencia", "sigla_uf"], - savepath=output_directory, - file_type="parquet", - ) - elif table == "bpc": - to_partitions( - df, - partition_columns=["mes_competencia"], - savepath=output_directory, - file_type="csv", - ) - else: - to_partitions( - df, - partition_columns=["mes_referencia"], - savepath=output_directory, - file_type="parquet", - ) - - log("Partição feita.") - - return output_directory diff --git a/pipelines/datasets/br_cgu_licitacao_contrato/flows.py b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py index a410a9bb4..f50c9fadd 100644 --- a/pipelines/datasets/br_cgu_licitacao_contrato/flows.py +++ b/pipelines/datasets/br_cgu_licitacao_contrato/flows.py @@ -64,7 +64,7 @@ br_cgu_licitacao_contrato__licitacao_empenho.code_owners = ["trick"] br_cgu_licitacao_contrato__licitacao_empenho.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_cgu_licitacao_contrato__licitacao_empenho.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -br_cgu_licitacao_contrato__licitacao_empenho.schedule = (every_day_licitacao_empenho) +# br_cgu_licitacao_contrato__licitacao_empenho.schedule = (every_day_licitacao_empenho) # ! ------------------ Licitação Item ------------------ diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index 0f31a39fd..85db92438 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -4,6 +4,7 @@ """ from enum import Enum +import numpy as np class constants(Enum): # pylint: disable=c0103 @@ -176,3 +177,130 @@ class constants(Enum): # pylint: disable=c0103 }, } + TABELA_BENEFICIOS_CIDADAO = { + "novo_bolsa_familia" : { + "INPUT" : "/tmp/input/novo_bolsa_familia/", + "OUTPUT" : "/tmp/output/novo_bolsa_familia/", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/novo-bolsa-familia/" + }, + + "bpc" : { + "INPUT" : "/tmp/input/bpc/", + "OUTPUT" : "/tmp/output/bpc/", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/bpc/" + }, + + "garantia_safra" : { + "INPUT" : "/tmp/input/garantia_safra/", + "OUTPUT" : "/tmp/output/garantia_safra/", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/garantia-safra/"} + } + + DTYPES_NOVO_BOLSA_FAMILIA = { + "MÊS COMPETÊNCIA": str, + "MÊS REFERÊNCIA": str, + "UF": str, + "CÓDIGO MUNICÍPIO SIAFI": str, + "NOME MUNICÍPIO": str, + "CPF FAVORECIDO": str, + "NIS FAVORECIDO": str, + "NOME FAVORECIDO": str, + "VALOR PARCELA": np.float64, + } + DTYPES_GARANTIA_SAFRA = { + "MÊS REFERÊNCIA": str, + "UF": str, + "CÓDIGO MUNICÍPIO SIAFI": str, + "NOME MUNICÍPIO": str, + "NIS FAVORECIDO": str, + "NOME FAVORECIDO": str, + "VALOR PARCELA": np.float64, + } + + DTYPES_BPC = { + "MÊS COMPETÊNCIA": str, + "MÊS REFERÊNCIA": str, + "UF": str, + "CÓDIGO MUNICÍPIO SIAFI": str, + "NOME MUNICÍPIO": str, + "NIS BENEFICIÁRIO": str, + "CPF BENEFICIÁRIO": str, + "NOME BENEFICIÁRIO": str, + "NIS REPRESENTANTE LEGAL": str, + "CPF REPRESENTANTE LEGAL": str, + "NOME REPRESENTANTE LEGAL": str, + "NÚMERO BENEFÍCIO": str, + "BENEFÍCIO CONCEDIDO JUDICIALMENTE": str, + "VALOR PARCELA": np.float64, + } + + RENAMER_NOVO_BOLSA_FAMILIA = { + "MÊS COMPETÊNCIA": "mes_competencia", + "MÊS REFERÊNCIA": "mes_referencia", + "UF": "sigla_uf", + "CÓDIGO MUNICÍPIO SIAFI": "id_municipio_siafi", + "NOME MUNICÍPIO": "municipio", + "CPF FAVORECIDO": "cpf", + "NIS FAVORECIDO": "nis", + "NOME FAVORECIDO": "nome", + "VALOR PARCELA": "valor", + } + RENAMER_GARANTIA_SAFRA = { + "MÊS REFERÊNCIA": "mes_referencia", + "UF": "sigla_uf", + "CÓDIGO MUNICÍPIO SIAFI": "id_municipio_siafi", + "NOME MUNICÍPIO": "municipio", + "NIS FAVORECIDO": "nis", + "NOME FAVORECIDO": "nome", + "VALOR PARCELA": "valor", + } + RENAMER_BPC = { + "MÊS COMPETÊNCIA": "mes_competencia", + "MÊS REFERÊNCIA": "mes_referencia", + "UF": "sigla_uf", + "CÓDIGO MUNICÍPIO SIAFI": "id_municipio_siafi", + "NOME MUNICÍPIO": "municipio", + "NIS BENEFICIÁRIO": "nis", + "CPF BENEFICIÁRIO": "cpf", + "NOME BENEFICIÁRIO": "nome", + "NIS REPRESENTANTE LEGAL": "nis_representante", + "CPF REPRESENTANTE LEGAL": "cpf_representante", + "NOME REPRESENTANTE LEGAL": "nome_representante", + "NÚMERO BENEFÍCIO": "numero", + "BENEFÍCIO CONCEDIDO JUDICIALMENTE": "concedido_judicialmente", + "VALOR PARCELA": "valor", + } + + DICT_FOR_TABLE = { + "novo_bolsa_familia": { + "dataset_id":"br_cgu_beneficios_cidadao", + "table_id": "novo_bolsa_familia", + "date_column_name": {"year": "ano_competencia", "month": "mes_competencia"}, + "date_format": "%Y-%m", + "coverage_type": "part_bdpro", + "time_delta": {"months": 6}, + "prefect_mode": "prod", + "bq_project": "basedosdados" + }, + "safra_garantia": { + "dataset_id":"br_cgu_beneficios_cidadao", + "table_id": "safra_garantia", + "date_column_name": {"year": "ano_referencia", "month": "mes_referencia", + }, + "date_format": "%Y-%m", + "coverage_type": "part_bdpro", + "time_delta": {"months": 6}, + "prefect_mode": "prod", + "bq_project": "basedosdados" + }, + "bpc": { + "dataset_id":"br_cgu_beneficios_cidadao", + "table_id": "bpc", + "date_column_name": {"year": "ano_competencia", "month": "mes_competencia"}, + "date_format": "%Y-%m", + "coverage_type": "part_bdpro", + "time_delta": {"months": 6}, + "prefect_mode": "prod", + "bq_project": "basedosdados", + } + } \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 99d28f291..60b0532ae 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -15,7 +15,10 @@ partition_data, get_current_date_and_download_file, verify_all_url_exists_to_download, + read_and_partition_beneficios_cidadao, + dict_for_table ) +from pipelines.utils.crawler_cgu.constants import constants as cgu_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.metadata.tasks import update_django_metadata, check_if_data_is_outdated @@ -319,3 +322,99 @@ flow_cgu_licitacao_contrato.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +# ! ================================== CGU - Benefícios Cidadão ===================================== + +with Flow(name="CGU - Benefícios Cidadão", code_owners=["trick"]) as flow_cgu_beneficios_cidadao: + + dataset_id = Parameter( + "dataset_id", default="br_cgu_beneficios_cidadao", required=True + ) + table_id = Parameter( + "table_id", required=True + ) + #### + # Relative_month = 1 means that the data will be downloaded for the current month + #### + relative_month = Parameter("relative_month", default=1, required=False) + materialization_mode = Parameter("materialization_mode", default="dev", required=False) + materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) + rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) + + data_source_max_date = get_current_date_and_download_file( + table_id=table_id, + dataset_id=dataset_id, + relative_month=relative_month, + ) + + dados_desatualizados = check_if_data_is_outdated( + dataset_id=dataset_id, + table_id=table_id, + data_source_max_date=data_source_max_date, + date_format="%Y-%m", + upstream_tasks=[data_source_max_date], + ) + + with case(dados_desatualizados, True): + filepath = read_and_partition_beneficios_cidadao( + table_id=table_id, + upstream_tasks=[data_source_max_date], + ) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + upstream_tasks=[filepath], + ) + + with case(materialize_after_dump, True): + + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run", + "disable_elementary": False, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + upstream_tasks=[materialization_flow], + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name=dict_for_table(table_id), + date_format="%Y-%m", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) + +flow_cgu_beneficios_cidadao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +flow_cgu_beneficios_cidadao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 99b4a47ee..55da697d7 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -8,6 +8,8 @@ import basedosdados as bd import requests import pandas as pd +from tqdm import tqdm +import gc from dateutil.relativedelta import relativedelta from pipelines.utils.utils import log, to_partitions, download_and_unzip_file from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url @@ -16,6 +18,7 @@ last_date_in_metadata, read_and_clean_csv, build_urls, + partition_data_beneficios_cidadao, ) from pipelines.utils.crawler_cgu.constants import constants from pipelines.utils.crawler_cgu.utils import download_file @@ -41,9 +44,8 @@ def partition_data(table_id: str, dataset_id : str) -> str: if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato"]: log("---------------------------- Read data ----------------------------") df = read_csv(dataset_id = dataset_id, table_id = table_id) - log(df.head()) if dataset_id == "br_cgu_cartao_pagamento": - log("---------------------------- Partiting data -----------------------") + log(" ---------------------------- Partiting data -----------------------") to_partitions( data = df, partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], @@ -78,6 +80,93 @@ def partition_data(table_id: str, dataset_id : str) -> str: return constants.TABELA_SERVIDORES.value[table_id]['OUTPUT'] +@task +# https://stackoverflow.com/questions/26124417/how-to-convert-a-csv-file-to-parquet +def read_and_partition_beneficios_cidadao(table_id): + + """ + Carrega arquivos CSV, realiza transformações e cria partições em um formato específico, retornando o caminho de saída. + + Parâmetros: + - path (str): O caminho para os arquivos a serem processados. + - table (str): O nome da tabela (possíveis valores: "novo_bolsa_familia", "garantia_safra", "bpc"). + + Retorna: + - str: O caminho do diretório de saída onde as partições foram criadas. + + Exemplo de uso: + output_path = parquet_partition("/caminho/para/arquivos/", "novo_bolsa_familia") + """ + constants_cgu_beneficios_cidadao = constants.TABELA_BENEFICIOS_CIDADAO.value[table_id] + for nome_arquivo in os.listdir(constants_cgu_beneficios_cidadao['INPUT']): + for nome_arquivo in os.listdir(constants_cgu_beneficios_cidadao['INPUT']): + if nome_arquivo.endswith(".csv"): + log(f"Carregando o arquivo: {nome_arquivo}") + + df = None + with pd.read_csv( + f"{constants_cgu_beneficios_cidadao['INPUT']}{nome_arquivo}", + sep=";", + encoding="latin-1", + chunksize=500000, + decimal=",", + na_values="" if table_id != "bpc" else None, + dtype=( + constants.DTYPES_NOVO_BOLSA_FAMILIA.value + if table_id == "novo_bolsa_familia" + else ( + constants.DTYPES_GARANTIA_SAFRA.value + if table_id == "garantia_safra" + else constants.DTYPES_BPC.value + ) + ), + ) as reader: + number = 0 + for chunk in tqdm(reader): + chunk.rename( + columns=( + constants.RENAMER_NOVO_BOLSA_FAMILIA.value + if table_id == "novo_bolsa_familia" + else ( + constants.RENAMER_GARANTIA_SAFRA.value + if table_id == "garantia_safra" + else constants.RENAMER_BPC.value + ) + ), + inplace=True, + ) + os.makedirs(constants_cgu_beneficios_cidadao['OUTPUT'], exist_ok=True) + number += 1 + log(f"Chunk {number} carregando.") + if table_id == "novo_bolsa_familia": + partition_data_beneficios_cidadao(table_id = table_id, + df = chunk, + coluna1 = "mes_competencia", + coluna2 = "sigla_uf", + counter = number) + elif table_id == "bpc": + to_partitions( + df, + partition_columns=["mes_competencia"], + savepath=constants_cgu_beneficios_cidadao['OUTPUT'], + file_type="csv", + ) + else: + to_partitions( + df, + partition_columns=["mes_referencia"], + savepath=constants_cgu_beneficios_cidadao['OUTPUT'], + file_type="parquet", + ) + + del chunk + gc.collect() + + log("Partição feita.") + + return constants_cgu_beneficios_cidadao['OUTPUT'] + + @task def get_current_date_and_download_file(table_id : str, dataset_id : str, @@ -147,3 +236,14 @@ def verify_all_url_exists_to_download(dataset_id, table_id, relative_month) -> b log(f"A URL {url=} existe!") return True + +@task +def dict_for_table(table_id: str) -> dict: + + DICT_FOR_TABLE = { + "novo_bolsa_familia": {"year": "ano_competencia", "month": "mes_competencia"}, + "safra_garantia": {"year": "ano_referencia", "month": "mes_referencia"}, + "bpc": {"year": "ano_competencia", "month": "mes_competencia"}, + } + + return DICT_FOR_TABLE[table_id] \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index ac243adbf..bf5c072dc 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -53,7 +53,7 @@ def build_urls(dataset_id: str, url: str, year: int, month: int, table_id: str) log(f"{dataset_id=}") - if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato"]: + if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato", "br_cgu_beneficios_cidadao"]: log(f"{url}{year}{str(month).zfill(2)}/") return f"{url}{year}{str(month).zfill(2)}/" @@ -65,6 +65,7 @@ def build_urls(dataset_id: str, url: str, year: int, month: int, table_id: str) list_url.append(url_completa) return list_url + def build_input(table_id): """ Builds a list of input directories based on the given table ID. @@ -111,13 +112,15 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ str: The last date in the API if the URL is not found. """ - if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato"]: + if dataset_id in ["br_cgu_cartao_pagamento", "br_cgu_licitacao_contrato", "br_cgu_beneficios_cidadao"]: if dataset_id == "br_cgu_cartao_pagamento": value_constants = constants.TABELA.value[table_id] # ! CGU - Cartão de Pagamento elif dataset_id == "br_cgu_licitacao_contrato": value_constants = constants.TABELA_LICITACAO_CONTRATO.value[table_id] - + elif dataset_id == "br_cgu_beneficios_cidadao": + value_constants = constants.TABELA_BENEFICIOS_CIDADAO.value[table_id] input = value_constants["INPUT"] + if not os.path.exists(input): os.makedirs(input) @@ -156,8 +159,7 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ return last_date_in_api elif dataset_id == "br_cgu_servidores_executivo_federal": - - constants_cgu_servidores = constants.TABELA_SERVIDORES.value[table_id] # ! CGU - Servidores Públicos do Executivo Federal + constants_cgu = constants.TABELA_SERVIDORES.value[table_id] # ! CGU - Servidores Públicos do Executivo Federal url = build_urls( dataset_id, @@ -171,18 +173,17 @@ def download_file(dataset_id: str, table_id: str, year: int, month: int, relativ log(input_dirs) for urls, input_dir in zip(url, input_dirs): if requests.get(urls).status_code == 200: - destino = f"{constants_cgu_servidores['INPUT']}/{input_dir}" + destino = f"{constants_cgu['INPUT']}/{input_dir}" download_and_unzip_file(urls, destino) last_date_in_api, next_date_in_api = last_date_in_metadata( - dataset_id="br_cgu_servidores_executivo_federal", + dataset_id=dataset_id, table_id=table_id, relative_month=relative_month, ) return next_date_in_api - # Função para carregar o dataframe @lru_cache(maxsize=1) # Cache para evitar recarregar a tabela def load_municipio() -> None: @@ -279,6 +280,7 @@ def read_csv( return df + def last_date_in_metadata( dataset_id: str, table_id: str, relative_month ) -> datetime.date: @@ -455,3 +457,41 @@ def get_source(table_name: str, source: str) -> str: } return ORIGINS[table_name][source] + +def partition_data_beneficios_cidadao(table_id: str, df, coluna1: str, coluna2: str, counter) -> str: + if table_id == "novo_bolsa_familia": + unique_anos = df[coluna1].unique().tolist() + unique_meses = df[coluna2].unique().tolist() + + for ano_completencia in unique_anos: + for mes_completencia in unique_meses: + path_partition = f"{constants.TABELA_BENEFICIOS_CIDADAO.value[table_id]['OUTPUT']}/{coluna1}={ano_completencia}/{coluna2}={mes_completencia}" + + if not os.path.exists(path_partition): + os.makedirs(path_partition) + + # Mudando o filter do Polars para o loc do Pandas + df_partition = df.loc[(df[coluna1] == ano_completencia) & (df[coluna2] == mes_completencia)] + # Usando drop com axis=1 para remover colunas no Pandas + df_partition = df_partition.drop([coluna1, coluna2], axis=1) + + # Usando to_parquet do Pandas ao invés do write_parquet do Polars + df_partition.to_parquet(f"{path_partition}/data_{counter}.parquet") + + else: + unique_meses = df[coluna1].unique().tolist() + + for mes in unique_meses: + path_partition = f"{constants.TABELA_BENEFICIOS_CIDADAO.value[table_id]['OUTPUT']}/{coluna1}={mes}/" + + if not os.path.exists(path_partition): + os.makedirs(path_partition) + + # Mudando o filter do Polars para o loc do Pandas + df_partition = df.loc[df[coluna1] == mes] + # Usando drop com axis=1 para remover colunas no Pandas + df_partition = df_partition.drop([coluna1], axis=1) + + # Usando to_parquet do Pandas ao invés do write_parquet do Polars + df_partition.to_parquet(f"{path_partition}/data_{counter}.parquet") + diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 707ed7faf..68a9b3c4c 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -4,7 +4,6 @@ """ import base64 import json - # pylint: disable=too-many-arguments import logging import re @@ -459,7 +458,7 @@ def to_partitions( # append data to parquet file_filter_save_path = Path(filter_save_path) / "data.parquet" df_filter.to_parquet( - file_filter_save_path, index=False, compression="gzip" + file_filter_save_path, index=False, compression="gzip", ) else: raise BaseException("Data need to be a pandas DataFrame") From a186c1ca57818be0597cb47cf07e4aa54c1dfcbd Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Tue, 21 Jan 2025 12:59:20 -0300 Subject: [PATCH 3/8] refactor: cafir crawler --- pipelines/datasets/br_rf_cafir/constants.py | 15 ++- pipelines/datasets/br_rf_cafir/flows.py | 26 +++-- pipelines/datasets/br_rf_cafir/tasks.py | 30 +++--- pipelines/datasets/br_rf_cafir/utils.py | 101 ++++++++++---------- 4 files changed, 96 insertions(+), 76 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/constants.py b/pipelines/datasets/br_rf_cafir/constants.py index 8fb220ba6..99bb28315 100644 --- a/pipelines/datasets/br_rf_cafir/constants.py +++ b/pipelines/datasets/br_rf_cafir/constants.py @@ -8,7 +8,20 @@ class constants(Enum): - URL = ["https://dadosabertos.rfb.gov.br/dados/cafir/"] + URL = ["https://arquivos.receitafederal.gov.br/cafir/"] + + HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3", + "Sec-GPC": "1", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "same-origin", + "Sec-Fetch-User": "?1", + "Priority": "u=0, i", + } PATH = [ "/tmp/input/br_rf_cafir", diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index e6adaaf00..90d31541e 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -13,7 +13,11 @@ from pipelines.constants import constants from pipelines.datasets.br_rf_cafir.constants import constants as br_rf_cafir_constants from pipelines.datasets.br_rf_cafir.schedules import schedule_br_rf_cafir_imoveis_rurais -from pipelines.datasets.br_rf_cafir.tasks import parse_data, parse_files_parse_date +from pipelines.datasets.br_rf_cafir.tasks import ( + decide_files_to_download, + parse_api_metadata, + parse_data +) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants @@ -46,15 +50,22 @@ prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - info = parse_files_parse_date(url=br_rf_cafir_constants.URL.value[0]) - log_task("Checando se os dados estão desatualizados") + df_metadata = parse_api_metadata( + url=br_rf_cafir_constants.URL.value[0], + headers=br_rf_cafir_constants.HEADERS.value + ) + + arquivos, data_atualizacao = decide_files_to_download( + df=df_metadata, + upstream_tasks=[df_metadata], + ) is_outdated = check_if_data_is_outdated( dataset_id=dataset_id, table_id=table_id, - data_source_max_date=info[0], + data_source_max_date=data_atualizacao, date_format="%Y-%m-%d", - upstream_tasks=[info], + upstream_tasks=[arquivos], ) with case(is_outdated, False): @@ -65,8 +76,9 @@ file_path = parse_data( url=br_rf_cafir_constants.URL.value[0], - other_task_output=info, - upstream_tasks=[info, is_outdated], + file_list=arquivos, + data_atualizacao=data_atualizacao, + upstream_tasks=[arquivos, is_outdated], ) wait_upload_table = create_table_and_upload_to_gcs( diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 1af1c1b1f..b96aad371 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -13,9 +13,10 @@ from pipelines.datasets.br_rf_cafir.constants import constants as br_rf_cafir_constants from pipelines.datasets.br_rf_cafir.utils import ( download_csv_files, - parse_date_parse_files, preserve_zeros, remove_accent, + parse_api_metadata, + decide_files_to_download, remove_non_ascii_from_df, strip_string, ) @@ -27,37 +28,32 @@ max_retries=2, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def parse_files_parse_date(url) -> tuple[list[datetime], list[str]]: - """Extrai os nomes dos arquivos e a data de disponibilização dos dados no FTP +def parse_api_metadata(url: str, headers:dict) -> pd.DataFrame: + return parse_api_metadata(url=url, headers=headers) - Args: - url (string): URL do FTP - - Returns: - Tuple: Retorna uma tupla com duas listas. A primeira contém uma lista de datas de atualização dos dados e a segunda contém uma lista com os nomes dos arquivos. - """ - log("######## download_files_parse_data ########") - - date_files = parse_date_parse_files(url) - - return date_files +@task( + max_retries=2, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.date = None, data_maxima: bool = True) -> tuple[list[str],list[datetime]]: + return decide_files_to_download(df=df, data_especifica=data_especifica, data_maxima=data_maxima) @task( max_retries=3, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def parse_data(url: str, other_task_output: tuple[list[datetime], list[str]]) -> str: +def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date]) -> str: """Essa task faz o download dos arquivos do FTP, faz o parse dos dados e salva os arquivos em um diretório temporário. Returns: str: Caminho do diretório temporário """ - date = other_task_output[0] + date = data_atualizacao log(f"###### Extraindo dados para data: {date}") - files_list = other_task_output[1] + files_list = file_list log(f"###### Extraindo files: {files_list}") # inicializa counter para ser usado na nomeação dos arquivos repetindo o padrão de divulgação dos dados diff --git a/pipelines/datasets/br_rf_cafir/utils.py b/pipelines/datasets/br_rf_cafir/utils.py index 776a6cfb7..f3c84a29c 100644 --- a/pipelines/datasets/br_rf_cafir/utils.py +++ b/pipelines/datasets/br_rf_cafir/utils.py @@ -7,7 +7,7 @@ import unicodedata from datetime import datetime import time - +from typing import Tuple import pandas as pd import requests from bs4 import BeautifulSoup @@ -105,67 +105,66 @@ def remove_accent(input_str: pd.DataFrame, pattern: str = "all") -> pd.DataFrame return input_str -def parse_date_parse_files(url: str, retries: int = 3, backoff_factor: int = 2) -> Tuple[datetime, List[str]]: - """ - Faz o parse da data de atualização e dos links de download dos arquivos. +def parse_api_metadata(url: str, headers: dict = None) -> pd.DataFrame: - Args: - url (str): A URL do FTP do CAFIR da Receita Federal. - retries (int): Número de tentativas em caso de falha por timeout. - backoff_factor (int): Fator de espera exponencial entre tentativas. + log('Fazendo request para a url: ', url) - Returns: - Tuple[datetime, List[str]]: Retorna uma tupla com a data de atualização e os nomes dos arquivos. - """ - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - "Accept-Language": "pt-BR,pt;q=0.8,en-US;q=0.5,en;q=0.3", - "Sec-GPC": "1", - "Upgrade-Insecure-Requests": "1", - "Sec-Fetch-Dest": "document", - "Sec-Fetch-Mode": "navigate", - "Sec-Fetch-Site": "same-origin", - "Sec-Fetch-User": "?1", - "Priority": "u=0, i", - } + response = requests.get(url, headers=headers) + response.raise_for_status() - xpath_release_date = "tr td:nth-of-type(3)" - attempt = 0 + soup = BeautifulSoup(response.text, "html.parser") + elementos = soup.find_all("a") - while attempt < retries: - try: - response = requests.get(url, headers=headers, timeout=(10, 30)) + log('Extraindo nomes de arquivos e datas de atualização') - # Checa se a requisição foi bem-sucedida - if response.status_code == 200: - soup = BeautifulSoup(response.text, "html.parser") + nomes_arquivos = [arquivo.find_parent('td') for arquivo in elementos if arquivo.has_attr('href') and 'csv' in arquivo['href']] + data_atualizacao_arquivos = [data_atualizacao.find_next_sibling('td') for data_atualizacao in nomes_arquivos if data_atualizacao.find_next_sibling('td').get('align') == 'right'] + data_atualizacao_arquivos_formatada = [datetime.strptime(a.text.strip(), "%Y-%m-%d %H:%M").date() for a in data_atualizacao_arquivos] - # 1. Parsing da data de atualização - td_elements = soup.select(xpath_release_date) - td_texts = [td.get_text(strip=True)[:10] for td in td_elements] - release_data = datetime.strptime(td_texts[3], "%Y-%m-%d").date() - log(f"Release date: {release_data}") - # 2. Parsing dos nomes dos arquivos - a_elements = soup.find_all("a") - href_values = [a["href"] for a in a_elements if a.has_attr("href")] - files_name = [href for href in href_values if "csv" in href] - log(f"Files name: {files_name}") + if len(nomes_arquivos_text) != len(data_atualizacao_arquivos_formatada): + raise ValueError( + f"A quantidade de arquivos ({len(nomes_arquivos_text)}) difere da quantidade de datas ({len(data_atualizacao_arquivos_formatada)}). Verifique o FTP da Receita Federal {url}" + ) - return release_data, files_name + df = pd.DataFrame( + { + 'nome_arquivo':nomes_arquivos, + 'data_atualizacao':data_atualizacao_arquivos_formatada + } + ) + log('Extração finalizada') + return df + +def decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.date = None, data_maxima: bool = True) -> tuple[list[str],list[datetime]]: + """ + Decide quais arquivos baixar a depender da necessidade de atualização - else: - log(f"Erro na requisição: {response.status_code}") - raise requests.RequestException(f"HTTP {response.status_code}") + Parâmetros: + df (pd.DataFrame): DataFrame contendo informações dos arquivos, incluindo a data de atualização e o nome do arquivo. + data_especifica (datetime.date, opcional): Data específica para filtrar os arquivos. + data_maxima (bool): Se True, retorna os arquivos com a data de atualização mais recente. Padrão é True. - except (requests.Timeout, requests.ConnectionError) as e: - attempt += 1 - wait_time = backoff_factor ** attempt - log(f"Erro: {e}. Tentando novamente em {wait_time} segundos...") - time.sleep(wait_time) + Retorna: + tuple: Uma tupla contendo uma lista de nomes de arquivos que atendem aos critérios fornecidos e a data correspondente. - raise TimeoutError("Falha após várias tentativas de conectar ao servidor.") + Levanta: + ValueError: Se não houver arquivos disponíveis para a data específica fornecida. + """ + #TODO: Aqui caberia um check da qte de arquivos seleciondos. As vezes alguns arquiivos são atualizados depois de outrs + if data_maxima: + max_date = df['data_atualizacao'].max() + log(f"Os arquivos serão selecionados utiliozando a data de atualização mais recente: {max_date}") + return df[df['data_atualizacao'] == max_date]['nome_arquivo'].tolist(), max_date + + elif data_especifica: + filtered_df = df[df['data_atualizacao'] == data_especifica] + if filtered_df.empty: + raise ValueError(f"Não há arquivos disponíveis para a data {data_especifica}. Verifique o FTP da Receita Federal.") + return filtered_df['nome_arquivo'].tolist(), data_especifica + + else: + raise ValueError("Critérios inválidos: deve-se selecionar pelo menos um dos parâmetros 'data_maxima', 'dados_historicos' ou 'data_especifica'.") From 97b48c2874124cf8cd29a90a99493b82047f29e6 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 22 Jan 2025 11:46:26 -0300 Subject: [PATCH 4/8] fix: out of flow context error --- pipelines/datasets/br_rf_cafir/flows.py | 8 ++++---- pipelines/datasets/br_rf_cafir/tasks.py | 4 ++-- pipelines/datasets/br_rf_cafir/utils.py | 13 ++++++++----- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 90d31541e..b13186032 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -14,8 +14,8 @@ from pipelines.datasets.br_rf_cafir.constants import constants as br_rf_cafir_constants from pipelines.datasets.br_rf_cafir.schedules import schedule_br_rf_cafir_imoveis_rurais from pipelines.datasets.br_rf_cafir.tasks import ( - decide_files_to_download, - parse_api_metadata, + task_decide_files_to_download, + task_parse_api_metadata, parse_data ) from pipelines.utils.constants import constants as utils_constants @@ -50,12 +50,12 @@ prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - df_metadata = parse_api_metadata( + df_metadata = task_parse_api_metadata( url=br_rf_cafir_constants.URL.value[0], headers=br_rf_cafir_constants.HEADERS.value ) - arquivos, data_atualizacao = decide_files_to_download( + arquivos, data_atualizacao = task_decide_files_to_download( df=df_metadata, upstream_tasks=[df_metadata], ) diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index b96aad371..b04d0f366 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -28,14 +28,14 @@ max_retries=2, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def parse_api_metadata(url: str, headers:dict) -> pd.DataFrame: +def task_parse_api_metadata(url: str, headers:dict) -> pd.DataFrame: return parse_api_metadata(url=url, headers=headers) @task( max_retries=2, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.date = None, data_maxima: bool = True) -> tuple[list[str],list[datetime]]: +def task_decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.date = None, data_maxima: bool = True) -> tuple[list[str],list[datetime]]: return decide_files_to_download(df=df, data_especifica=data_especifica, data_maxima=data_maxima) diff --git a/pipelines/datasets/br_rf_cafir/utils.py b/pipelines/datasets/br_rf_cafir/utils.py index f3c84a29c..a3a8a8f89 100644 --- a/pipelines/datasets/br_rf_cafir/utils.py +++ b/pipelines/datasets/br_rf_cafir/utils.py @@ -107,7 +107,7 @@ def remove_accent(input_str: pd.DataFrame, pattern: str = "all") -> pd.DataFrame def parse_api_metadata(url: str, headers: dict = None) -> pd.DataFrame: - log('Fazendo request para a url: ', url) + log(f'Fazendo request para a url: {url}') response = requests.get(url, headers=headers) response.raise_for_status() @@ -117,14 +117,16 @@ def parse_api_metadata(url: str, headers: dict = None) -> pd.DataFrame: log('Extraindo nomes de arquivos e datas de atualização') - nomes_arquivos = [arquivo.find_parent('td') for arquivo in elementos if arquivo.has_attr('href') and 'csv' in arquivo['href']] - data_atualizacao_arquivos = [data_atualizacao.find_next_sibling('td') for data_atualizacao in nomes_arquivos if data_atualizacao.find_next_sibling('td').get('align') == 'right'] + linhas_arquivos_datas = [arquivo.find_parent('td') for arquivo in elementos if arquivo.has_attr('href') and 'csv' in arquivo['href']] + nomes_arquivos = [arquivo.find('a').get('href') for arquivo in linhas_arquivos_datas] + + data_atualizacao_arquivos = [data_atualizacao.find_next_sibling('td') for data_atualizacao in linhas_arquivos_datas if data_atualizacao.find_next_sibling('td').get('align') == 'right'] data_atualizacao_arquivos_formatada = [datetime.strptime(a.text.strip(), "%Y-%m-%d %H:%M").date() for a in data_atualizacao_arquivos] - if len(nomes_arquivos_text) != len(data_atualizacao_arquivos_formatada): + if len(nomes_arquivos) != len(data_atualizacao_arquivos_formatada): raise ValueError( - f"A quantidade de arquivos ({len(nomes_arquivos_text)}) difere da quantidade de datas ({len(data_atualizacao_arquivos_formatada)}). Verifique o FTP da Receita Federal {url}" + f"A quantidade de arquivos ({len(nomes_arquivos)}) difere da quantidade de datas ({len(data_atualizacao_arquivos_formatada)}). Verifique o FTP da Receita Federal {url}" ) df = pd.DataFrame( @@ -133,6 +135,7 @@ def parse_api_metadata(url: str, headers: dict = None) -> pd.DataFrame: 'data_atualizacao':data_atualizacao_arquivos_formatada } ) + log(df) log('Extração finalizada') return df From 08da069afa92b8944286e11a4975ca89ec79d705 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 22 Jan 2025 15:41:35 -0300 Subject: [PATCH 5/8] feat: refactor parse_data function --- pipelines/datasets/br_rf_cafir/flows.py | 1 + pipelines/datasets/br_rf_cafir/tasks.py | 63 +++++---------- pipelines/datasets/br_rf_cafir/utils.py | 102 ++++++------------------ 3 files changed, 48 insertions(+), 118 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index b13186032..47087ab33 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -77,6 +77,7 @@ file_path = parse_data( url=br_rf_cafir_constants.URL.value[0], file_list=arquivos, + headers=br_rf_cafir_constants.HEADERS.value, data_atualizacao=data_atualizacao, upstream_tasks=[arquivos, is_outdated], ) diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index b04d0f366..21e90469b 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -14,10 +14,8 @@ from pipelines.datasets.br_rf_cafir.utils import ( download_csv_files, preserve_zeros, - remove_accent, parse_api_metadata, decide_files_to_download, - remove_non_ascii_from_df, strip_string, ) from pipelines.utils.utils import log @@ -43,7 +41,7 @@ def task_decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.da max_retries=3, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date]) -> str: +def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date], headers: dict) -> str: """Essa task faz o download dos arquivos do FTP, faz o parse dos dados e salva os arquivos em um diretório temporário. Returns: @@ -51,30 +49,29 @@ def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date]) """ date = data_atualizacao - log(f"###### Extraindo dados para data: {date}") + log(f"------ Extraindo dados para data: {date}") files_list = file_list - log(f"###### Extraindo files: {files_list}") + log(f"------ Os seguintes arquivos foram selecionados para download: {files_list}") - # inicializa counter para ser usado na nomeação dos arquivos repetindo o padrão de divulgação dos dados - counter = 0 - log(f"###### -----COUNTER: {counter}") - list_n_cols = [] + files_list = files_list[1:3] for file in files_list: - counter += 1 - log(f"###### X-----COUNTER: {counter}") + log(f"Baixando arquivo: {file} de {url}") # monta url complete_url = url + file + + # baixa arquivo download_csv_files( file_name=file, url=complete_url, + headers=headers, download_directory=br_rf_cafir_constants.PATH.value[0], ) @@ -91,58 +88,42 @@ def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date]) converters={ col: preserve_zeros for col in br_rf_cafir_constants.COLUMN_NAMES.value }, - encoding="latin-1", + encoding="ISO-8859-1", ) - list_n_cols.append(df.shape[1]) - - # remove acentos - df["nome"] = df["nome"].apply(remove_accent) - df["endereco"] = df["endereco"].apply(remove_accent) - - # remove não ascii - df = remove_non_ascii_from_df(df) - # tira os espacos em branco df = df.applymap(strip_string) - log(f"Saving file: {file}") + log(f"Salvando arquivo: {file}") # constroi diretório os.makedirs( br_rf_cafir_constants.PATH.value[1] + f"/imoveis_rurais/data={date}/", exist_ok=True, ) + + #NOTE: Com modificação do formato de divulgação do FTP os arquivos passaram a ser divulgados csvs particionados por UF + #A partir de 2025, a nomenclaruta dos no Storage arquivos mudou para: "imoveis_rurais_uf_numero.csv" no lugar de "imoveris_rurais_numero.csv" + save_path = ( br_rf_cafir_constants.PATH.value[1] + f"/imoveis_rurais/data={date}/" + "imoveis_rurais_" - + str(counter) + #extrai uf e numeração do nome do arquivo + + file.split(".")[-2] + ".csv" ) - log(f"save path: {save_path}") - # save new file as csv - df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8",escapechar='\\') - # resolve ASCII 0 no momento da leitura do BQ. Ler e salvar de novo. - df = pd.read_csv(save_path, dtype=str) df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8",escapechar='\\') - log(f"----- Removendo o arquivo: {os.listdir(br_rf_cafir_constants.PATH.value[0])}") - - # remove o arquivo de input - os.system("rm -rf " + br_rf_cafir_constants.PATH.value[0] + "/" + "*") + log(f"Arquivo salvo: {save_path.split('/')[-1]}") + del df - log(f"list_n_cols: O NUMERO DE COLUNAS É {list_n_cols}") + log(f"----- Removendo o arquivo: {os.listdir(br_rf_cafir_constants.PATH.value[0])} do diretório de input") - # gera paths - files_path = ( - br_rf_cafir_constants.PATH.value[1] - + "/" - + br_rf_cafir_constants.TABLE.value[0] - + "/" - ) + # remove o arquivo de input + os.remove(os.path.join(br_rf_cafir_constants.PATH.value[0], file)) - return files_path + return br_rf_cafir_constants.PATH.value[1] + f"/imoveis_rurais/data={date}/" diff --git a/pipelines/datasets/br_rf_cafir/utils.py b/pipelines/datasets/br_rf_cafir/utils.py index a3a8a8f89..525ee2ac7 100644 --- a/pipelines/datasets/br_rf_cafir/utils.py +++ b/pipelines/datasets/br_rf_cafir/utils.py @@ -31,82 +31,18 @@ def strip_string(x: pd.DataFrame) -> pd.DataFrame: return x -def remove_non_ascii_from_df(df: pd.DataFrame) -> pd.DataFrame: - """Remove caracteres não ascii de um dataframe codificando a coluna e decodificando em seguida - - Returns: - pd.DataFrame: Um dataframe +def parse_api_metadata(url: str, headers: dict = None) -> pd.DataFrame: """ - - return df.applymap( - lambda x: ( - x.encode("ascii", "ignore").decode("ascii") if isinstance(x, str) else x - ) - ) - - -def remove_accent(input_str: pd.DataFrame, pattern: str = "all") -> pd.DataFrame: - """Remove acentos e caracteres especiais do encoding LATIN-1 para a coluna selecionada - #creditos para -> https://stackoverflow.com/questions/39148759/remove-accents-from-a-dataframe-column-in-r - + Faz uma requisição para a URL fornecida e extrai metadados de arquivos CSV. + Args: + url (str): A URL da API para fazer a requisição. + headers (dict, opcional): Cabeçalhos HTTP para incluir na requisição. Padrão é None. Returns: - pd.Dataframe: Dataframe com coluna sem acentos e caracteres especiais + pd.DataFrame: Um DataFrame contendo os nomes dos arquivos e suas respectivas datas de atualização. + Raises: + ValueError: Se a quantidade de arquivos extraídos for diferente da quantidade de datas de atualização. """ - if not isinstance(input_str, str): - input_str = str(input_str) - - patterns = set(pattern) - - if "Ç" in patterns: - patterns.remove("Ç") - patterns.add("ç") - - symbols = { - "acute": "áéíóúÁÉÍÓÚýÝ", - "grave": "àèìòùÀÈÌÒÙ", - "circunflex": "âêîôûÂÊÎÔÛ", - "tilde": "ãõÃÕñÑ", - "umlaut": "äëïöüÄËÏÖÜÿ", - "cedil": "çÇ", - } - - nude_symbols = { - "acute": "aeiouAEIOUyY", - "grave": "aeiouAEIOU", - "circunflex": "aeiouAEIOU", - "tilde": "aoAOnN", - "umlaut": "aeiouAEIOUy", - "cedil": "cC", - } - - accent_types = ["´", "`", "^", "~", "¨", "ç"] - - if any( - pattern in {"all", "al", "a", "todos", "t", "to", "tod", "todo"} - for pattern in patterns - ): - return "".join( - [ - c if unicodedata.category(c) != "Mn" else "" - for c in unicodedata.normalize("NFD", input_str) - ] - ) - - for p in patterns: - if p in accent_types: - input_str = "".join( - [ - c if c not in symbols[p] else nude_symbols[p][symbols[p].index(c)] - for c in input_str - ] - ) - - return input_str - - -def parse_api_metadata(url: str, headers: dict = None) -> pd.DataFrame: - log(f'Fazendo request para a url: {url}') response = requests.get(url, headers=headers) @@ -135,7 +71,7 @@ def parse_api_metadata(url: str, headers: dict = None) -> pd.DataFrame: 'data_atualizacao':data_atualizacao_arquivos_formatada } ) - log(df) + log('Extração finalizada') return df @@ -154,7 +90,7 @@ def decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.date = Levanta: ValueError: Se não houver arquivos disponíveis para a data específica fornecida. """ - #TODO: Aqui caberia um check da qte de arquivos seleciondos. As vezes alguns arquiivos são atualizados depois de outrs + if data_maxima: max_date = df['data_atualizacao'].max() log(f"Os arquivos serão selecionados utiliozando a data de atualização mais recente: {max_date}") @@ -167,11 +103,23 @@ def decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.date = return filtered_df['nome_arquivo'].tolist(), data_especifica else: - raise ValueError("Critérios inválidos: deve-se selecionar pelo menos um dos parâmetros 'data_maxima', 'dados_historicos' ou 'data_especifica'.") + raise ValueError("Critérios inválidos: deve-se selecionar pelo menos um dos parâmetros: 'data_maxima' ou 'data_especifica'.") -def download_csv_files(url, file_name, download_directory): +def download_csv_files(url:str, file_name:str, download_directory:str, headers:dict) -> None: + """ + Faz o download de um arquivo CSV a partir de uma URL e salva em um diretório especificado. + + Args: + url (str): A URL do arquivo CSV a ser baixado. + file_name (str): O nome do arquivo a ser salvo. + download_directory (str): O diretório onde o arquivo será salvo. + headers (dict): Cabeçalhos HTTP a serem enviados com a requisição. + + Returns: + None + """ # cria diretório os.makedirs(download_directory, exist_ok=True) @@ -182,7 +130,7 @@ def download_csv_files(url, file_name, download_directory): file_path = os.path.join(download_directory, file_name) # faz request - response = requests.get(url) + response = requests.get(url, headers=headers) if response.status_code == 200: # Salva no diretório especificado From 237d5e5effcf4878ffd59bb4cb782a0d37045e19 Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 22 Jan 2025 17:04:50 -0300 Subject: [PATCH 6/8] update file selection --- pipelines/datasets/br_rf_cafir/tasks.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index 21e90469b..e3318b4cf 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -54,9 +54,6 @@ def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date], files_list = file_list log(f"------ Os seguintes arquivos foram selecionados para download: {files_list}") - - files_list = files_list[1:3] - for file in files_list: @@ -65,8 +62,6 @@ def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date], # monta url complete_url = url + file - - # baixa arquivo download_csv_files( file_name=file, @@ -114,7 +109,6 @@ def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date], + ".csv" ) - df.to_csv(save_path, index=False, sep=",", na_rep="", encoding="utf-8",escapechar='\\') log(f"Arquivo salvo: {save_path.split('/')[-1]}") From b8fb73f2a08d472f4225ba03f6e6c072bdb0a0bb Mon Sep 17 00:00:00 2001 From: folhesgabriel Date: Wed, 22 Jan 2025 21:54:00 -0300 Subject: [PATCH 7/8] fix: create_table_and_upload_to_gcs input dir --- pipelines/datasets/br_rf_cafir/flows.py | 4 ++-- pipelines/datasets/br_rf_cafir/tasks.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 47087ab33..8b41ac622 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -16,7 +16,7 @@ from pipelines.datasets.br_rf_cafir.tasks import ( task_decide_files_to_download, task_parse_api_metadata, - parse_data + task_download_files ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -74,7 +74,7 @@ with case(is_outdated, True): log_task("Existem atualizações! A run será inciada") - file_path = parse_data( + file_path = task_download_files( url=br_rf_cafir_constants.URL.value[0], file_list=arquivos, headers=br_rf_cafir_constants.HEADERS.value, diff --git a/pipelines/datasets/br_rf_cafir/tasks.py b/pipelines/datasets/br_rf_cafir/tasks.py index e3318b4cf..9c8239218 100644 --- a/pipelines/datasets/br_rf_cafir/tasks.py +++ b/pipelines/datasets/br_rf_cafir/tasks.py @@ -41,7 +41,7 @@ def task_decide_files_to_download(df: pd.DataFrame, data_especifica: datetime.da max_retries=3, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date], headers: dict) -> str: +def task_download_files(url: str, file_list: list[str], data_atualizacao:[datetime.date], headers: dict) -> str: """Essa task faz o download dos arquivos do FTP, faz o parse dos dados e salva os arquivos em um diretório temporário. Returns: @@ -120,4 +120,4 @@ def parse_data(url: str, file_list: list[str], data_atualizacao:[datetime.date], # remove o arquivo de input os.remove(os.path.join(br_rf_cafir_constants.PATH.value[0], file)) - return br_rf_cafir_constants.PATH.value[1] + f"/imoveis_rurais/data={date}/" + return br_rf_cafir_constants.PATH.value[1] + f"/imoveis_rurais" From 579ce37b8ca51495786a5621731834725eb7060d Mon Sep 17 00:00:00 2001 From: Pedro Castro Date: Thu, 23 Jan 2025 14:54:46 -0300 Subject: [PATCH 8/8] basedosdados 2.0.0b26 (#928) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- poetry.lock | 110 ++++++++++++++++++++++++++----------------------- pyproject.toml | 2 +- 2 files changed, 59 insertions(+), 53 deletions(-) diff --git a/poetry.lock b/poetry.lock index 9097e35bc..8c62f2749 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "aioftp" @@ -247,30 +247,36 @@ files = [ [[package]] name = "basedosdados" -version = "2.0.0b5" +version = "2.0.0b26" description = "Organizar e facilitar o acesso a dados brasileiros através de tabelas públicas no BigQuery." optional = false -python-versions = ">=3.8,<3.11" +python-versions = "<4,>=3.9" files = [ - {file = "basedosdados-2.0.0b5-py3-none-any.whl", hash = "sha256:a15fdbaa41d621094dabde3f2f14bcc4842b87f59f16a7a6c7a293351f841be5"}, - {file = "basedosdados-2.0.0b5.tar.gz", hash = "sha256:e4bdf3a0018f7208aa57adac9bb9340d07076a3c18d875e80c488cb4b80f0959"}, + {file = "basedosdados-2.0.0b26-py3-none-any.whl", hash = "sha256:49356b5402ce6622750615687a442ca16517dc8f856bda264b9dfd1a8092c958"}, + {file = "basedosdados-2.0.0b26.tar.gz", hash = "sha256:ddb73added39ac0bdad0fdaecb282a6ddc904c8f1f8fbfde9e3be48d0858461b"}, ] [package.dependencies] -google-api-python-client = ">=2.86.0,<3.0.0" -google-cloud-bigquery = ">=3.10.0,<4.0.0" -google-cloud-bigquery-connection = ">=1.12.0,<2.0.0" -google-cloud-bigquery-storage = ">=2.19.1,<3.0.0" -google-cloud-storage = ">=2.9.0,<3.0.0" -gql = ">=3.4.1,<4.0.0" +google-api-python-client = ">=2.86,<3.0" +google-cloud-bigquery = ">=3.10,<4.0" +google-cloud-bigquery-connection = ">=1.12,<2.0" +google-cloud-bigquery-storage = ">=2.19,<3.0" +google-cloud-storage = ">=2.9,<3.0" +gql = {version = ">=3.4,<4.0", optional = true, markers = "extra == \"all\" or extra == \"upload\""} loguru = ">=0.7.0,<0.8.0" -pandas = ">=2.0.1,<3.0.0" -pandas-gbq = ">=0.19.2,<0.20.0" -pandavro = ">=1.7.2,<2.0.0" -pydata-google-auth = ">=1.8.0,<2.0.0" -requests-toolbelt = ">=1.0.0,<2.0.0" -tomlkit = ">=0.11.8,<0.12.0" -tqdm = ">=4.65.0,<5.0.0" +numpy = "<2.0.0" +pandas = ">=2.0,<3.0" +pandas-gbq = ">=0.19,<0.20" +pandavro = {version = ">=1.7,<2.0", optional = true, markers = "extra == \"all\" or extra == \"avro\""} +pydata-google-auth = ">=1.8,<2.0" +requests-toolbelt = {version = ">=1,<2", optional = true, markers = "extra == \"all\" or extra == \"upload\""} +tomlkit = ">=0.11,<0.12" +tqdm = ">=4,<5" + +[package.extras] +all = ["gql (>=3.4,<4.0)", "pandavro (>=1.7,<2.0)", "requests-toolbelt (>=1,<2)"] +avro = ["pandavro (>=1.7,<2.0)"] +upload = ["gql (>=3.4,<4.0)", "requests-toolbelt (>=1,<2)"] [[package]] name = "beautifulsoup4" @@ -1052,42 +1058,42 @@ files = [ [[package]] name = "fastavro" -version = "1.9.7" +version = "1.10.0" description = "Fast read/write of AVRO files" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "fastavro-1.9.7-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:cc811fb4f7b5ae95f969cda910241ceacf82e53014c7c7224df6f6e0ca97f52f"}, - {file = "fastavro-1.9.7-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fb8749e419a85f251bf1ac87d463311874972554d25d4a0b19f6bdc56036d7cf"}, - {file = "fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b2f9bafa167cb4d1c3dd17565cb5bf3d8c0759e42620280d1760f1e778e07fc"}, - {file = "fastavro-1.9.7-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:e87d04b235b29f7774d226b120da2ca4e60b9e6fdf6747daef7f13f218b3517a"}, - {file = "fastavro-1.9.7-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:b525c363e267ed11810aaad8fbdbd1c3bd8837d05f7360977d72a65ab8c6e1fa"}, - {file = "fastavro-1.9.7-cp310-cp310-win_amd64.whl", hash = "sha256:6312fa99deecc319820216b5e1b1bd2d7ebb7d6f221373c74acfddaee64e8e60"}, - {file = "fastavro-1.9.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:ec8499dc276c2d2ef0a68c0f1ad11782b2b956a921790a36bf4c18df2b8d4020"}, - {file = "fastavro-1.9.7-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:76d9d96f98052615ab465c63ba8b76ed59baf2e3341b7b169058db104cbe2aa0"}, - {file = "fastavro-1.9.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:919f3549e07a8a8645a2146f23905955c35264ac809f6c2ac18142bc5b9b6022"}, - {file = "fastavro-1.9.7-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:9de1fa832a4d9016724cd6facab8034dc90d820b71a5d57c7e9830ffe90f31e4"}, - {file = "fastavro-1.9.7-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1d09227d1f48f13281bd5ceac958650805aef9a4ef4f95810128c1f9be1df736"}, - {file = "fastavro-1.9.7-cp311-cp311-win_amd64.whl", hash = "sha256:2db993ae6cdc63e25eadf9f93c9e8036f9b097a3e61d19dca42536dcc5c4d8b3"}, - {file = "fastavro-1.9.7-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:4e1289b731214a7315884c74b2ec058b6e84380ce9b18b8af5d387e64b18fc44"}, - {file = "fastavro-1.9.7-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eac69666270a76a3a1d0444f39752061195e79e146271a568777048ffbd91a27"}, - {file = "fastavro-1.9.7-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9be089be8c00f68e343bbc64ca6d9a13e5e5b0ba8aa52bcb231a762484fb270e"}, - {file = "fastavro-1.9.7-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:d576eccfd60a18ffa028259500df67d338b93562c6700e10ef68bbd88e499731"}, - {file = "fastavro-1.9.7-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:ee9bf23c157bd7dcc91ea2c700fa3bd924d9ec198bb428ff0b47fa37fe160659"}, - {file = "fastavro-1.9.7-cp312-cp312-win_amd64.whl", hash = "sha256:b6b2ccdc78f6afc18c52e403ee68c00478da12142815c1bd8a00973138a166d0"}, - {file = "fastavro-1.9.7-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:7313def3aea3dacface0a8b83f6d66e49a311149aa925c89184a06c1ef99785d"}, - {file = "fastavro-1.9.7-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:536f5644737ad21d18af97d909dba099b9e7118c237be7e4bd087c7abde7e4f0"}, - {file = "fastavro-1.9.7-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2af559f30383b79cf7d020a6b644c42ffaed3595f775fe8f3d7f80b1c43dfdc5"}, - {file = "fastavro-1.9.7-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:edc28ab305e3c424de5ac5eb87b48d1e07eddb6aa08ef5948fcda33cc4d995ce"}, - {file = "fastavro-1.9.7-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:ec2e96bdabd58427fe683329b3d79f42c7b4f4ff6b3644664a345a655ac2c0a1"}, - {file = "fastavro-1.9.7-cp38-cp38-win_amd64.whl", hash = "sha256:3b683693c8a85ede496ebebe115be5d7870c150986e34a0442a20d88d7771224"}, - {file = "fastavro-1.9.7-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:58f76a5c9a312fbd37b84e49d08eb23094d36e10d43bc5df5187bc04af463feb"}, - {file = "fastavro-1.9.7-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:56304401d2f4f69f5b498bdd1552c13ef9a644d522d5de0dc1d789cf82f47f73"}, - {file = "fastavro-1.9.7-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2fcce036c6aa06269fc6a0428050fcb6255189997f5e1a728fc461e8b9d3e26b"}, - {file = "fastavro-1.9.7-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:17de68aae8c2525f5631d80f2b447a53395cdc49134f51b0329a5497277fc2d2"}, - {file = "fastavro-1.9.7-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:7c911366c625d0a997eafe0aa83ffbc6fd00d8fd4543cb39a97c6f3b8120ea87"}, - {file = "fastavro-1.9.7-cp39-cp39-win_amd64.whl", hash = "sha256:912283ed48578a103f523817fdf0c19b1755cea9b4a6387b73c79ecb8f8f84fc"}, - {file = "fastavro-1.9.7.tar.gz", hash = "sha256:13e11c6cb28626da85290933027cd419ce3f9ab8e45410ef24ce6b89d20a1f6c"}, + {file = "fastavro-1.10.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:1a9fe0672d2caf0fe54e3be659b13de3cad25a267f2073d6f4b9f8862acc31eb"}, + {file = "fastavro-1.10.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:86dd0410770e0c99363788f0584523709d85e57bb457372ec5c285a482c17fe6"}, + {file = "fastavro-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:190e80dc7d77d03a6a8597a026146b32a0bbe45e3487ab4904dc8c1bebecb26d"}, + {file = "fastavro-1.10.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:bf570d63be9155c3fdc415f60a49c171548334b70fff0679a184b69c29b6bc61"}, + {file = "fastavro-1.10.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e07abb6798e95dccecaec316265e35a018b523d1f3944ad396d0a93cb95e0a08"}, + {file = "fastavro-1.10.0-cp310-cp310-win_amd64.whl", hash = "sha256:37203097ed11d0b8fd3c004904748777d730cafd26e278167ea602eebdef8eb2"}, + {file = "fastavro-1.10.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:d183c075f527ab695a27ae75f210d4a86bce660cda2f85ae84d5606efc15ef50"}, + {file = "fastavro-1.10.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7a95a2c0639bffd7c079b59e9a796bfc3a9acd78acff7088f7c54ade24e4a77"}, + {file = "fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0a678153b5da1b024a32ec3f611b2e7afd24deac588cb51dd1b0019935191a6d"}, + {file = "fastavro-1.10.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:67a597a5cfea4dddcf8b49eaf8c2b5ffee7fda15b578849185bc690ec0cd0d8f"}, + {file = "fastavro-1.10.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1fd689724760b17f69565d8a4e7785ed79becd451d1c99263c40cb2d6491f1d4"}, + {file = "fastavro-1.10.0-cp311-cp311-win_amd64.whl", hash = "sha256:4f949d463f9ac4221128a51e4e34e2562f401e5925adcadfd28637a73df6c2d8"}, + {file = "fastavro-1.10.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:cfe57cb0d72f304bd0dcc5a3208ca6a7363a9ae76f3073307d095c9d053b29d4"}, + {file = "fastavro-1.10.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:74e517440c824cb65fb29d3e3903a9406f4d7c75490cef47e55c4c82cdc66270"}, + {file = "fastavro-1.10.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:203c17d44cadde76e8eecb30f2d1b4f33eb478877552d71f049265dc6f2ecd10"}, + {file = "fastavro-1.10.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6575be7f2b5f94023b5a4e766b0251924945ad55e9a96672dc523656d17fe251"}, + {file = "fastavro-1.10.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:fe471deb675ed2f01ee2aac958fbf8ebb13ea00fa4ce7f87e57710a0bc592208"}, + {file = "fastavro-1.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:567ff515f2a5d26d9674b31c95477f3e6022ec206124c62169bc2ffaf0889089"}, + {file = "fastavro-1.10.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:82263af0adfddb39c85f9517d736e1e940fe506dfcc35bc9ab9f85e0fa9236d8"}, + {file = "fastavro-1.10.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:566c193109ff0ff84f1072a165b7106c4f96050078a4e6ac7391f81ca1ef3efa"}, + {file = "fastavro-1.10.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e400d2e55d068404d9fea7c5021f8b999c6f9d9afa1d1f3652ec92c105ffcbdd"}, + {file = "fastavro-1.10.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:9b8227497f71565270f9249fc9af32a93644ca683a0167cfe66d203845c3a038"}, + {file = "fastavro-1.10.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:8e62d04c65461b30ac6d314e4197ad666371e97ae8cb2c16f971d802f6c7f514"}, + {file = "fastavro-1.10.0-cp313-cp313-win_amd64.whl", hash = "sha256:86baf8c9740ab570d0d4d18517da71626fe9be4d1142bea684db52bd5adb078f"}, + {file = "fastavro-1.10.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5bccbb6f8e9e5b834cca964f0e6ebc27ebe65319d3940b0b397751a470f45612"}, + {file = "fastavro-1.10.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b0132f6b0b53f61a0a508a577f64beb5de1a5e068a9b4c0e1df6e3b66568eec4"}, + {file = "fastavro-1.10.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ca37a363b711202c6071a6d4787e68e15fa3ab108261058c4aae853c582339af"}, + {file = "fastavro-1.10.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:cf38cecdd67ca9bd92e6e9ba34a30db6343e7a3bedf171753ee78f8bd9f8a670"}, + {file = "fastavro-1.10.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:f4dd10e0ed42982122d20cdf1a88aa50ee09e5a9cd9b39abdffb1aa4f5b76435"}, + {file = "fastavro-1.10.0-cp39-cp39-win_amd64.whl", hash = "sha256:aaef147dc14dd2d7823246178fd06fc5e477460e070dc6d9e07dd8193a6bc93c"}, + {file = "fastavro-1.10.0.tar.gz", hash = "sha256:47bf41ac6d52cdfe4a3da88c75a802321321b37b663a900d12765101a5d6886f"}, ] [package.extras] @@ -5574,4 +5580,4 @@ heapdict = "*" [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.11" -content-hash = "6a7b387c35a33e52abe28d92b14e7846f303c84443f693ab8574017ee9cfebb0" +content-hash = "dc24520a262709e84e08da4dfff76b86ab93bfe6477b19f8f6c0e93be1f42215" diff --git a/pyproject.toml b/pyproject.toml index 84d1c13e5..813aff1d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ Jinja2 = "3.0.3" MarkupSafe = "2.0.1" PyYAML = "6.0" Unidecode = "^1.3.4" -basedosdados = "2.0.0b5" +basedosdados = {version = "2.0.0b26", extras = ["all"]} beautifulsoup4 = "4.11.1" cachetools = "4.2.4" certifi = "2021.10.8"