Skip to content

Commit

Permalink
refactoring: taking check for updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tricktx committed Jan 12, 2024
1 parent 654b8ba commit e6fc1d5
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 63 deletions.
6 changes: 3 additions & 3 deletions pipelines/datasets/br_camara_dados_abertos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class constants(Enum):
# ------------------------------------------------------------> PROPOSIÇÃO

TABLE_LIST_PROPOSICAO = {
"microdados": "proposicoes",
"autor": "proposicoesAutores",
"tema": "proposicoesTemas",
"proposicao_microdados": "proposicoes",
"proposicao_autor": "proposicoesAutores",
"proposicao_tema": "proposicoesTemas",
}
23 changes: 9 additions & 14 deletions pipelines/datasets/br_camara_dados_abertos/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from pipelines.datasets.br_camara_dados_abertos.tasks import (
download_files_and_get_max_date,
download_files_and_get_max_date_deputados,
download_files_and_get_max_date_proposicao,
make_partitions,
save_data_proposicao,
treat_and_save_table,
Expand Down Expand Up @@ -628,20 +627,16 @@
update_metadata = Parameter("update_metadata", default=True, required=False)

filepath_proposicao_microdados = save_data_proposicao.map(
table_id=[
"proposicao_microdados",
"proposicao_autor",
"proposicao_tema",
],
table_id=table_id,
upstream_tasks=[rename_flow_run],
)

wait_upload_table = create_table_and_upload_to_gcs.map(
data_path=filepath_proposicao_microdados,
dataset_id=dataset_id,
dataset_id=unmapped(dataset_id),
table_id=table_id,
dump_mode="append",
wait=filepath_proposicao_microdados,
dump_mode=unmapped("append"),
wait=unmapped(filepath_proposicao_microdados),
)

with case(materialize_after_dump, True):
Expand All @@ -651,17 +646,17 @@
flow_name=unmapped(utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value),
project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value),
parameters={
"dataset_id": dataset_id,
"dataset_id": unmapped(dataset_id),
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"mode": unmapped(materialization_mode),
"dbt_alias": unmapped(dbt_alias),
},
labels=unmapped(current_flow_labels),
run_name=unmapped(f"Materialize {dataset_id}.{table_id[0]}"),
run_name=unmapped(f"Materialize {dataset_id}.{table_id}"),
)

wait_for_materialization = wait_for_flow_run.map(
unmapped(materialization_flow),
materialization_flow,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
Expand Down
18 changes: 3 additions & 15 deletions pipelines/datasets/br_camara_dados_abertos/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
constants as constants_camara,
)
from pipelines.datasets.br_camara_dados_abertos.utils import (
download_and_read_data_proposicao,
get_data,
get_data_deputados,
get_data_proposicao,
read_and_clean_camara_dados_abertos,
read_and_clean_data_deputados,
read_data_proposicao,
)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -94,22 +93,11 @@ def treat_and_save_table(table_id):
# -------------------------------------------------------------------> PROPOSIÇÃO


@task
def download_files_and_get_max_date_proposicao():
df = get_data_proposicao()
df["dataApresentacao"] = pd.to_datetime(df["dataApresentacao"])
max_data_proposicao = df["dataApresentacao"].max()

return max_data_proposicao


@task
def save_data_proposicao(table_id: str):
df = read_data_proposicao(table_id=table_id)
df = download_and_read_data_proposicao()
df.to_csv(
f"{constants.OUTPUT_PATH.value}{table_id}_{constants.ANOS.value}.csv",
index=False,
sep=",",
f"{constants_camara.OUTPUT_PATH.value}{table_id}/data.csv", sep=",", index=False
)

return constants.OUTPUT_PATH.value
44 changes: 13 additions & 31 deletions pipelines/datasets/br_camara_dados_abertos/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def get_data_deputados():
# ----------------------------------------------------------------------------------- > Proposição


def download_csvs_camara_proposicao() -> None:
def download_csvs_camara_proposicao(table_id: str) -> None:
"""
Downloads CSV files from the Camara de Proposicao API.
Expand All @@ -183,41 +183,23 @@ def download_csvs_camara_proposicao() -> None:
os.makedirs(constants.INPUT_PATH.value)

for anos in constants.ANOS.value:
for key, valor in constants.TABLE_LIST_PROPOSICAO.value.items():
url_2 = f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{anos}.csv"

response = requests.get(url_2)
if response.status_code == 200:
with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f:
f.write(response.content)
print(f"download complete {valor}")

elif response.status_code >= 400 and response.status_code <= 599:
raise Exception(
f"Erro de requisição: status code {response.status_code}"
)
valor = constants.TABLE_LIST_PROPOSICAO.value[table_id]
url_2 = (
f"http://dadosabertos.camara.leg.br/arquivos/{valor}/csv/{valor}-{anos}.csv"
)

response = requests.get(url_2)
if response.status_code == 200:
with open(f"{constants.INPUT_PATH.value}{valor}.csv", "wb") as f:
f.write(response.content)
print(f"download complete {valor}")

def get_data_proposicao() -> pd.DataFrame:
"""
Reads the data for a given table_id from the Camara dos Deputados dataset.
elif response.status_code >= 400 and response.status_code <= 599:
raise Exception(f"Erro de requisição: status code {response.status_code}")

Parameters:
table_id (str): The ID of the table to read.

Returns:
pd.DataFrame: The DataFrame containing the data.
"""
def download_and_read_data_proposicao(table_id: str) -> pd.DataFrame:
download_csvs_camara_proposicao()
df = pd.read_csv(
f'{constants.INPUT_PATH.value}{constants.TABLE_LIST_PROPOSICAO.value["microdados"]}.csv',
sep=";",
)

return df


def read_data_proposicao(table_id: str) -> pd.DataFrame:
df = pd.read_csv(
f"{constants.INPUT_PATH.value}{constants.TABLE_LIST_PROPOSICAO.value[table_id]}.csv",
sep=";",
Expand Down

0 comments on commit e6fc1d5

Please sign in to comment.