Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dados] br_camara_dados_abertos.proposicao #622

Merged
merged 29 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7464543
structure de pipeline br_camara_dados_abertos.proposicao
tricktx Jan 6, 2024
93d5fe8
Merge branch 'main' into staging/camara_proposicao
mergify[bot] Jan 8, 2024
c114643
refactoring code to map
tricktx Jan 11, 2024
654b8ba
pipeline test without check if data
tricktx Jan 11, 2024
e6fc1d5
refactoring: taking check for updates
tricktx Jan 12, 2024
3186bfc
fix: Fix file path
tricktx Jan 12, 2024
3675ba2
Merge branch 'main' into staging/camara_proposicao
mergify[bot] Jan 12, 2024
ff0db7f
Merge branch 'main' into staging/camara_proposicao
mergify[bot] Jan 12, 2024
f86c51d
Merge branch 'main' into staging/camara_proposicao
mergify[bot] Jan 12, 2024
dad88d3
feat: Add output_path_list task to generate a list of output paths
tricktx Jan 12, 2024
eedf2dc
fix: Add support for dynamic year in ANOS constant]
tricktx Jan 15, 2024
348b039
fix: update constants
tricktx Jan 15, 2024
266e955
update: update task rename to_csv
tricktx Jan 15, 2024
28a3fb7
Merge branch 'main' into staging/camara_proposicao
mergify[bot] Jan 15, 2024
ee5acd7
fix: file naming issue in save_data_proposicao function
tricktx Jan 16, 2024
ce0f212
feat: add list_dict_to_materialization in task
tricktx Jan 16, 2024
393678c
Merge branch 'main' into staging/camara_proposicao
mergify[bot] Jan 17, 2024
38e1e09
feat: test in cloud
tricktx Jan 18, 2024
41e016d
feat: add list in update metadata
tricktx Jan 18, 2024
9aaa4c6
fix: bq_project basedosdados-dev
tricktx Jan 18, 2024
7a9d36d
feat: add upstream_tasks
tricktx Jan 18, 2024
a61c0a8
feat: setting upstream_tasks in unmapped
tricktx Jan 18, 2024
18b6414
feat: add date_column_name in update metadata
tricktx Jan 18, 2024
60c2e42
feat: add time_delta in update metadata
tricktx Jan 18, 2024
77fd8a7
fix: add parameters in update metadata
tricktx Jan 19, 2024
0410c35
fix: if the URL does not exist, get the previous year
tricktx Jan 19, 2024
4f1f3a9
add schedules
tricktx Jan 19, 2024
7765518
feat: add year in name
tricktx Jan 19, 2024
a2e7bd1
fix: name bq_project
tricktx Jan 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pipelines/datasets/br_camara_dados_abertos/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from enum import Enum

from dateutil.relativedelta import relativedelta


class constants(Enum):
TABLE_NAME_ARCHITECTURE = {
Expand All @@ -14,7 +17,9 @@ class constants(Enum):
INPUT_PATH = "/tmp/input/"
OUTPUT_PATH = "/tmp/output/"

ANOS = [2023]
ANOS = (datetime.now() - relativedelta(years=1)).year

ANOS_ATUAL = (datetime.now()).year

TABLE_LIST = {
"votacao_microdados": "votacoes",
Expand Down Expand Up @@ -53,3 +58,11 @@ class constants(Enum):
"deputado_ocupacao": "https://docs.google.com/spreadsheets/d/1Cj6WE3jk63p21IjrINeaYKoMSOGoDDf1XpY3UH8sct4/edit#gid=0",
"deputado_profissao": "https://docs.google.com/spreadsheets/d/12R2OY7eqUKxuojcpYYBsCiHyzUOLBBdObnkuv2JUMNI/edit#gid=0",
}

# ------------------------------------------------------------> PROPOSIÇÃO

TABLE_LIST_PROPOSICAO = {
"proposicao_microdados": "proposicoes",
"proposicao_autor": "proposicoesAutores",
"proposicao_tema": "proposicoesTemas",
}
105 changes: 104 additions & 1 deletion pipelines/datasets/br_camara_dados_abertos/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@

from datetime import timedelta

from prefect import Parameter, case
from prefect import Parameter, case, unmapped
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.datasets.br_camara_dados_abertos.constants import (
constants as constants_camara,
)
from pipelines.datasets.br_camara_dados_abertos.schedules import (
every_day_camara_dados_abertos,
every_day_camara_dados_abertos_deputados,
)
from pipelines.datasets.br_camara_dados_abertos.tasks import (
dict_list_parameters,
download_files_and_get_max_date,
download_files_and_get_max_date_deputados,
make_partitions,
output_path_list,
save_data_proposicao,
treat_and_save_table,
)
from pipelines.utils.constants import constants as utils_constants
Expand Down Expand Up @@ -584,3 +590,100 @@
br_camara_deputado.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_camara_deputado.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_camara_deputado.schedule = every_day_camara_dados_abertos_deputados


# ---------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------

# ------------------------------ TABLES PROPOSIÇÃO ---------------------------------------

with Flow(
name="br_camara_dados_abertos.proposicao", code_owners=["trick"]
) as br_camara_proposicao:
# Parameters
dataset_id = Parameter(
"dataset_id", default="br_camara_dados_abertos", required=True
)
table_id = Parameter(
"table_id",
default=[
"proposicao_microdados",
"proposicao_autor",
"proposicao_tema",
],
required=True,
)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ",
dataset_id=dataset_id,
table_id="Proposição",
wait=table_id,
)

update_metadata = Parameter("update_metadata", default=True, required=False)

filepath = save_data_proposicao.map(
table_id=table_id,
upstream_tasks=[unmapped(rename_flow_run)],
)
output_path_list = output_path_list(table_id)
wait_upload_table = create_table_and_upload_to_gcs.map(
data_path=output_path_list,
dataset_id=unmapped(dataset_id),
table_id=table_id,
dump_mode=unmapped("append"),
wait=unmapped(output_path_list),
upstream_tasks=[unmapped(filepath)],
)
parameters = dict_list_parameters(dataset_id, materialization_mode, dbt_alias)
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run.map(
flow_name=unmapped(utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value),
project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value),
parameters=parameters,
labels=unmapped(current_flow_labels),
run_name=f"Materialize {unmapped(dataset_id)}.{table_id}",
upstream_tasks=[unmapped(wait_upload_table)],
)

wait_for_materialization = wait_for_flow_run.map(
materialization_flow,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
update_django_metadata.map(
dataset_id=unmapped(dataset_id),
table_id=table_id,
date_format=["%Y-%m-%d", "%Y-%m-%d", "%Y-%m-%d"],
date_column_name=[{"date": "data"}, {"date": "data"}, {"date": "data"}],
coverage_type=["part_bdpro", "all_free", "all_free"],
prefect_mode=unmapped(materialization_mode),
time_delta=[{"months": 6}, {"months": 6}, {"months": 6}],
bq_project=unmapped("basedosdados-dev"),
historical_database=[True, False, False],
upstream_tasks=[unmapped(wait_for_materialization)],
)

br_camara_proposicao.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_camara_proposicao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
# br_camara_proposicao.schedule = every_day_camara_dados_abertos_deputados
77 changes: 77 additions & 0 deletions pipelines/datasets/br_camara_dados_abertos/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
constants as constants_camara,
)
from pipelines.datasets.br_camara_dados_abertos.utils import (
download_and_read_data_proposicao,
get_data,
get_data_deputados,
read_and_clean_camara_dados_abertos,
Expand Down Expand Up @@ -87,3 +88,79 @@ def treat_and_save_table(table_id):
log(f"{constants_camara.OUTPUT_PATH.value}{table_id}/data.csv")

return f"{constants_camara.OUTPUT_PATH.value}{table_id}/data.csv"


# -------------------------------------------------------------------> PROPOSIÇÃO


@task
def save_data_proposicao(table_id: str):
df = download_and_read_data_proposicao(table_id)
valor = constants_camara.TABLE_LIST_PROPOSICAO.value[table_id]
if not os.path.exists(f"{constants_camara.OUTPUT_PATH.value}{table_id}"):
os.makedirs(f"{constants_camara.OUTPUT_PATH.value}{table_id}")

if table_id == "proposicao_microdados":
df["ultimoStatus_despacho"] = df["ultimoStatus_despacho"].apply(
lambda x: str(x).replace(";", ",").replace("\n", "").replace("\r", "")
)
df["ementa"] = df["ementa"].apply(
lambda x: str(x).replace(";", ",").replace("\n", "").replace("\r", "")
)
df["ano"] = df.apply(
lambda x: x["dataApresentacao"][0:4] if x["ano"] == 0 else x["ano"], axis=1
)
df.to_csv(
f"{constants_camara.OUTPUT_PATH.value}{table_id}/{valor}_{constants_camara.ANOS.value}.csv",
sep=",",
index=False,
)

else:
df.to_csv(
f"{constants_camara.OUTPUT_PATH.value}{table_id}/{valor}_{constants_camara.ANOS.value}.csv",
sep=",",
index=False,
)


@task
def output_path_list(table_id_list):
output_path_list = []
for table_id in table_id_list:
output_path_list.append(f"{constants_camara.OUTPUT_PATH.value}{table_id}/")
return output_path_list


@task
def dict_list_parameters(dataset_id, materialization_mode, dbt_alias):
table_id = [
"proposicao_microdados",
"proposicao_autor",
"proposicao_tema",
]

parameters = [
dict(
dataset_id=dataset_id,
table_id=table_id[0],
mode=materialization_mode,
dbt_alias=dbt_alias,
dbt_command="run and test",
),
dict(
dataset_id=dataset_id,
table_id=table_id[1],
mode=materialization_mode,
dbt_alias=dbt_alias,
dbt_command="run and test",
),
dict(
dataset_id=dataset_id,
table_id=table_id[2],
mode=materialization_mode,
dbt_alias=dbt_alias,
dbt_command="run and test",
),
]
return parameters
73 changes: 62 additions & 11 deletions pipelines/datasets/br_camara_dados_abertos/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
# -------------------------------------------------------------------------------------> VOTACAO
def download_csvs_camara() -> None:
"""
Docs:
This function does download all csvs from archives of camara dos deputados.
The csvs saved in conteiners of docker.
Downloads CSV files from the Camara de Proposicao API.

This function iterates over the years and table list of chamber defined in the constants module,
and downloads the corresponding CSV files from the Camara de Proposicao API. The downloaded files are
saved in the input path specified in the constants module.

Raises:
Exception: If there is an error in the request, such as a non-successful status code.

return:
None
"""
log("Downloading csvs from camara dos deputados")
if not os.path.exists(constants.INPUT_PATH.value):
Expand Down Expand Up @@ -102,14 +105,17 @@ def read_and_clean_camara_dados_abertos(

def download_csvs_camara_deputado() -> None:
"""
Docs:
This function does download all csvs from archives of camara dos deputados.
The csvs saved in conteiners of docker.
Downloads CSV files from the Camara de Proposicao API.

This function iterates over the years and table list of congressperson defined in the constants module,
and downloads the corresponding CSV files from the Camara de Proposicao API. The downloaded files are
saved in the input path specified in the constants module.

Raises:
Exception: If there is an error in the request, such as a non-successful status code.

return:
None
"""
print("Downloading csvs from camara dos deputados")
log("Downloading csvs from camara dos deputados")
if not os.path.exists(constants.INPUT_PATH.value):
os.makedirs(constants.INPUT_PATH.value)

Expand Down Expand Up @@ -155,3 +161,48 @@ def get_data_deputados():
)

return df


# ----------------------------------------------------------------------------------- > Proposição


def download_csvs_camara_proposicao(table_id: str) -> None:
"""
Downloads CSV files from the Camara de Proposicao API.

This function iterates over the years and table list of propositions defined in the constants module,
and downloads the corresponding CSV files from the Camara de Proposicao API. The downloaded files are
saved in the input path specified in the constants module.

Raises:
Exception: If there is an error in the request, such as a non-successful status code.

"""
print("Downloading csvs from camara de proposição")
if not os.path.exists(constants.INPUT_PATH.value):
os.makedirs(constants.INPUT_PATH.value)

valor = constants.TABLE_LIST_PROPOSICAO.value[table_id]
url = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{constants.ANOS_ATUAL.value}.csv"
response = requests.get(url)
if response.status_code == 200:
with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f:
f.write(response.content)
log(f"download complete {valor}-{constants.ANOS_ATUAL.value}")

elif response.status_code >= 400 and response.status_code <= 599:
url_2 = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{constants.ANOS.value}.csv"
response = requests.get(url_2)
with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f:
f.write(response.content)
log(f"download complete {valor}-{constants.ANOS.value}")


def download_and_read_data_proposicao(table_id: str) -> pd.DataFrame:
download_csvs_camara_proposicao(table_id)
df = pd.read_csv(
f"{constants.INPUT_PATH.value}{constants.TABLE_LIST_PROPOSICAO.value[table_id]}.csv",
sep=";",
)

return df
Loading