Skip to content

Commit 1ae4df1

Browse files
authored
Merge branch 'master' into staging/dump_alertario
2 parents 1f36510 + 59e5402 commit 1ae4df1

File tree

4 files changed

+87
-56
lines changed

4 files changed

+87
-56
lines changed

pipelines/rj_smtr/br_rj_riodejaneiro_recursos/flows.py

+58-45
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
1010
from prefect import Parameter, case, task
1111
from prefect.tasks.control_flow import merge
12-
12+
from prefect.utilities.edges import unmapped
1313

1414
# EMD Imports #
1515

@@ -33,9 +33,7 @@
3333
# CAPTURA DOS TICKETS #
3434

3535
sppo_recurso_captura = deepcopy(default_capture_flow)
36-
sppo_recurso_captura.name = (
37-
"SMTR: Subsídio Recursos Viagens Individuais - Captura (subflow)"
38-
)
36+
sppo_recurso_captura.name = "SMTR: Subsídio Recursos - Captura (subflow)"
3937
sppo_recurso_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
4038
sppo_recurso_captura.run_config = KubernetesRun(
4139
image=emd_constants.DOCKER_IMAGE.value,
@@ -47,9 +45,7 @@
4745
)
4846
# RECAPTURA DOS TICKETS #
4947
sppo_recurso_recaptura = deepcopy(default_capture_flow)
50-
sppo_recurso_recaptura.name = (
51-
"SMTR: Subsídio Recursos Viagens Individuais - Recaptura (subflow)"
52-
)
48+
sppo_recurso_recaptura.name = "SMTR: Subsídio Recursos - Recaptura (subflow)"
5349
sppo_recurso_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
5450
sppo_recurso_recaptura.run_config = KubernetesRun(
5551
image=emd_constants.DOCKER_IMAGE.value,
@@ -64,9 +60,7 @@
6460
# MATERIALIZAÇÃO DOS TICKETS #
6561

6662
sppo_recurso_materializacao = deepcopy(default_materialization_flow)
67-
sppo_recurso_materializacao.name = (
68-
"SMTR: Subsídio Recursos Viagens Individuais - Materialização (subflow)"
69-
)
63+
sppo_recurso_materializacao.name = "SMTR: Subsídio Recursos - Materialização (subflow)"
7064
sppo_recurso_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
7165
sppo_recurso_materializacao.run_config = KubernetesRun(
7266
image=emd_constants.DOCKER_IMAGE.value,
@@ -79,66 +73,86 @@
7973
)
8074

8175
with Flow(
82-
"SMTR: Subsídio Recursos Viagens Individuais - Captura/Tratamento",
76+
"SMTR: Subsídio Recursos - Captura/Tratamento",
8377
code_owners=["carolinagomes", "rafaelpinheiro"],
8478
) as subsidio_sppo_recurso:
8579
capture = Parameter("capture", default=True)
8680
materialize = Parameter("materialize", default=True)
8781
recapture = Parameter("recapture", default=True)
8882
data_recurso = Parameter("data_recurso", default=None)
83+
table_id = Parameter("table_id", default=None)
8984
interval_minutes = Parameter("interval_minutes", default=1440)
9085
timestamp = get_current_timestamp(data_recurso, return_str=True)
86+
exclude = Parameter("exclude", default=None)
9187

9288
rename_flow_run = rename_current_flow_run_now_time(
9389
prefix=subsidio_sppo_recurso.name + " ",
9490
now_time=timestamp,
9591
)
96-
recurso_capture_parameters = {
97-
"data_recurso": timestamp,
98-
**constants.SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS.value["extract_params"],
99-
}
10092

10193
LABELS = get_current_flow_labels()
10294

95+
recursos_capture_parameters = [
96+
{
97+
"table_id": v,
98+
"extract_params": {
99+
"data_recurso": timestamp,
100+
**constants.SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS.value[
101+
"extract_params"
102+
],
103+
},
104+
}
105+
for v in constants.SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS.value
106+
]
107+
108+
table_params = task(
109+
lambda tables, exclude: [t for t in tables if t["table_id"] not in exclude]
110+
if exclude is not None
111+
else tables,
112+
checkpoint=False,
113+
name="get_tables_to_run",
114+
)(tables=constants.SUBSIDIO_SPPO_RECURSOS_TABLE_IDS.value, exclude=exclude)
115+
103116
# Captura dos dados #
104117
with case(capture, True):
105-
run_captura = create_flow_run(
106-
flow_name=sppo_recurso_captura.name,
107-
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
108-
parameters={"extract_params": recurso_capture_parameters},
109-
labels=LABELS,
118+
run_captura = create_flow_run.map(
119+
flow_name=unmapped(sppo_recurso_captura.name),
120+
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
121+
parameters=recursos_capture_parameters,
122+
labels=unmapped(LABELS),
110123
)
111124

112-
wait_captura_true = wait_for_flow_run(
125+
wait_captura_true = wait_for_flow_run.map(
113126
run_captura,
114-
stream_states=True,
115-
stream_logs=True,
116-
raise_final_state=True,
127+
stream_states=unmapped(True),
128+
stream_logs=unmapped(True),
129+
raise_final_state=unmapped(True),
117130
)
118131

119132
with case(capture, False):
120133
wait_captura_false = task(
121-
lambda: [None], checkpoint=False, name="assign_none_to_previous_runs"
134+
lambda: [None],
135+
checkpoint=False,
136+
name="assign_none_to_previous_runs",
122137
)()
123138

124139
wait_captura = merge(wait_captura_true, wait_captura_false)
125140

126141
# Recaptura dos dados #
127142

128143
with case(recapture, True):
129-
run_recaptura = create_flow_run(
130-
flow_name=sppo_recurso_recaptura.name,
131-
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
132-
labels=LABELS,
144+
run_recaptura = create_flow_run.map(
145+
flow_name=unmapped(sppo_recurso_recaptura.name),
146+
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
147+
parameters=recursos_capture_parameters,
148+
labels=unmapped(LABELS),
133149
)
134150

135-
run_recaptura.set_upstream(wait_captura)
136-
137-
wait_recaptura_true = wait_for_flow_run(
151+
wait_recaptura_true = wait_for_flow_run.map(
138152
run_recaptura,
139-
stream_states=True,
140-
stream_logs=True,
141-
raise_final_state=True,
153+
stream_states=unmapped(True),
154+
stream_logs=unmapped(True),
155+
raise_final_state=unmapped(True),
142156
)
143157

144158
with case(recapture, False):
@@ -151,20 +165,19 @@
151165
# Materialização dos dados #
152166

153167
with case(materialize, True):
154-
run_materializacao = create_flow_run(
155-
flow_name=sppo_recurso_materializacao.name,
156-
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
157-
labels=LABELS,
168+
run_materializacao = create_flow_run.map(
169+
flow_name=unmapped(sppo_recurso_materializacao.name),
170+
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
171+
labels=unmapped(LABELS),
172+
parameters=table_params,
158173
upstream_tasks=[wait_captura],
159174
)
160175

161-
run_materializacao.set_upstream(wait_recaptura)
162-
163-
wait_materializacao_true = wait_for_flow_run(
176+
wait_materializacao_true = wait_for_flow_run.map(
164177
run_materializacao,
165-
stream_states=True,
166-
stream_logs=True,
167-
raise_final_state=True,
178+
stream_states=unmapped(True),
179+
stream_logs=unmapped(True),
180+
raise_final_state=unmapped(True),
168181
)
169182

170183
with case(materialize, False):

pipelines/rj_smtr/constants.py

+17-9
Original file line numberDiff line numberDiff line change
@@ -645,23 +645,26 @@ class constants(Enum): # pylint: disable=c0103
645645
},
646646
]
647647

648-
# SUBSÍDIO RECURSOS VIAGENS INDIVIDUAIS
648+
# SUBSÍDIO RECURSOS
649+
650+
SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS = {
651+
"recursos_sppo_viagens_individuais": "Viagem Individual",
652+
"recursos_sppo_bloqueio_via": "Bloqueio da via",
653+
"recursos_sppo_reprocessamento": "Reprocessamento",
654+
}
649655

650656
SUBSIDIO_SPPO_RECURSOS_DATASET_ID = "br_rj_riodejaneiro_recursos"
651657
SUBSIDIO_SPPO_RECURSO_API_BASE_URL = "https://api.movidesk.com/public/v1/tickets"
652658
SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH = "sppo_subsidio_recursos_api"
653-
SUBSIDIO_SPPO_RECURSO_SERVICE = (
654-
"serviceFirstLevel eq 'Viagem Individual - Recurso Viagens Subsídio'"
655-
)
656659
SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS = {
657660
"partition_date_only": True,
658-
"table_id": "recursos_sppo_viagens_individuais",
659661
"dataset_id": SUBSIDIO_SPPO_RECURSOS_DATASET_ID,
660662
"extract_params": {
661663
"token": "",
662664
"$select": "id,protocol,createdDate,lastUpdate",
663-
"$filter": "{service} and (lastUpdate ge {start} and lastUpdate lt {end} \
664-
or createdDate ge {start} and createdDate lt {end})",
665+
"$filter": "serviceFirstLevel eq '{service} - Recurso Viagens Subsídio' \
666+
and (lastUpdate ge {start} and lastUpdate lt {end} or createdDate ge {start} \
667+
and createdDate lt {end})",
665668
"$expand": "customFieldValues,customFieldValues($expand=items)",
666669
"$orderby": "createdDate asc",
667670
},
@@ -670,9 +673,14 @@ class constants(Enum): # pylint: disable=c0103
670673
"primary_key": ["protocol"],
671674
}
672675

676+
SUBSIDIO_SPPO_RECURSOS_TABLE_IDS = [
677+
{"table_id": "recursos_sppo_viagens_individuais"},
678+
{"table_id": "recursos_sppo_bloqueio_via"},
679+
{"table_id": "recursos_sppo_reprocessamento"},
680+
]
681+
673682
SUBSIDIO_SPPO_RECURSOS_MATERIALIZACAO_PARAMS = {
674-
"dataset_id": SUBSIDIO_SPPO_RECURSOS_DATASET_ID,
675-
"table_id": SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS["table_id"],
683+
"dataset_id": "br_rj_riodejaneiro_recursos",
676684
"upstream": True,
677685
"dbt_vars": {
678686
"date_range": {

pipelines/rj_smtr/tasks.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Dict, List, Union, Iterable, Any
1212
import io
1313

14+
1415
from basedosdados import Storage, Table
1516
import basedosdados as bd
1617
from dbt_client import DbtClient
@@ -761,6 +762,7 @@ def create_request_params(
761762
request_params = {"zip_filename": extract_params["filename"]}
762763

763764
elif dataset_id == constants.SUBSIDIO_SPPO_RECURSOS_DATASET_ID.value:
765+
request_params = {}
764766
data_recurso = extract_params.get("data_recurso", timestamp)
765767
if isinstance(data_recurso, str):
766768
data_recurso = datetime.fromisoformat(data_recurso)
@@ -772,12 +774,17 @@ def create_request_params(
772774
)
773775
end = datetime.strftime(data_recurso, "%Y-%m-%dT%H:%M:%S.%MZ")
774776
log(f" Start date {start}, end date {end}")
777+
778+
service = constants.SUBSIDIO_SPPO_RECURSO_TABLE_CAPTURE_PARAMS.value[table_id]
779+
775780
recurso_params = {
776781
"start": start,
777782
"end": end,
778-
"service": constants.SUBSIDIO_SPPO_RECURSO_SERVICE.value,
783+
"service": service,
779784
}
785+
780786
extract_params["$filter"] = extract_params["$filter"].format(**recurso_params)
787+
781788
request_params = extract_params
782789

783790
request_url = constants.SUBSIDIO_SPPO_RECURSO_API_BASE_URL.value

pipelines/rj_smtr/utils.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,10 @@ def read_raw_data(filepath: str, reader_args: dict = None) -> tuple[str, pd.Data
10621062
return error, data
10631063

10641064

1065-
def get_raw_recursos(request_url: str, request_params: dict) -> tuple[str, str, str]:
1065+
def get_raw_recursos(
1066+
request_url: str,
1067+
request_params: dict,
1068+
) -> tuple[str, str, str]:
10661069
"""
10671070
Returns a dataframe with recursos data from movidesk api.
10681071
"""

0 commit comments

Comments
 (0)