Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add endpoint for vaccine reports in Vitacare API #216

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class constants(Enum):
ENDPOINT = {
"posicao": "/reports/pharmacy/stocks",
"movimento": "/reports/pharmacy/movements",
"vacina": "/reports/vacinas/listagemvacina",
}

DATASET_ID = "brutos_prontuario_vitacare"
19 changes: 10 additions & 9 deletions pipelines/datalake/extract_load/vitacare_api/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# pylint: disable= C0301. fixme
# pylint: disable= C0301. C0103
"""
Tasks for dump_api_vitacare
"""
Expand Down Expand Up @@ -85,7 +85,7 @@ def extract_data_from_api(
)

if response["status_code"] != 200:
raise ValueError(
raise FAIL(
f"Failed to extract data from API: {response['status_code']} - {response['body']}"
)

Expand All @@ -108,15 +108,15 @@ def extract_data_from_api(

else:
target_day = datetime.strptime(target_day, "%Y-%m-%d").date()
if endpoint == "movimento" and (
if endpoint in ("movimento", "vacina") and (
target_day.weekday() == 6
or prefect.context.task_run_count >= 2 # pylint: disable=no-member
):
logger.info("No data was retrieved for the target day")
return {"has_data": False}

logger.error("Failed Request: no data was retrieved")
raise ValueError(f"Empty response for ({cnes}, {target_day}, {endpoint})")
raise FAIL(f"Empty response for ({cnes}, {target_day}, {endpoint})")


@task
Expand Down Expand Up @@ -183,7 +183,8 @@ def transform_data(file_path: str, table_id: str) -> str:

add_load_date_column.run(input_path=csv_file_path, sep=";")

fix_payload_column_order(filepath=csv_file_path, table_id=table_id)
if table_id in ("estoque_posicao", "estoque_movimento"):
fix_payload_column_order(filepath=csv_file_path, table_id=table_id)

return csv_file_path

Expand Down Expand Up @@ -290,7 +291,7 @@ def create_parameter_list(
& (table_data["prontuario_estoque_tem_dado"] == "sim")
]

if endpoint in ["movimento", "posicao"]:
if endpoint in ["movimento", "posicao", "vacina"]:
results["data"] = convert_str_to_date(target_date)

else:
Expand All @@ -316,7 +317,7 @@ def create_parameter_list(

vitacare_flow_parameters = []

if endpoint in ["movimento", "posicao"]:
if endpoint in ["movimento", "posicao", "vacina"]:

results_tuples = results[["id_cnes", "data"]].apply(tuple, axis=1).tolist()

Expand Down Expand Up @@ -486,12 +487,12 @@ def get_flow_name(endpoint: str):
Returns:
str: The flow name.
"""
if endpoint in ["movimento", "posicao"]:
if endpoint in ["movimento", "posicao", "vacina"]:
flow_name = "DataLake - Extração e Carga de Dados - VitaCare"
elif endpoint == "backup_prontuario":
flow_name = "DataLake - Extração e Carga de Dados - VitaCare DB"
else:
err_msg = "Invalid endpoint. Expected 'movimento', 'posicao' or 'backup_prontuario'"
err_msg = "Invalid endpoint. Expected 'movimento', 'posicao', 'vacina' or 'backup_prontuario'" # noqa: E501
log(err_msg, level="error")
raise FAIL(err_msg)
return flow_name
Loading