Skip to content

Commit b9ee094

Browse files
authored
Merge branch 'master' into staging/sme_frequencia
2 parents 453d34b + 87ffa65 commit b9ee094

File tree

3 files changed

+85
-1
lines changed

3 files changed

+85
-1
lines changed

pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py

+54
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,25 @@
6363

6464
bilhetagem_transacao_captura.schedule = every_minute
6565

66+
67+
bilhetagem_transacao_riocard_captura = deepcopy(default_capture_flow)
68+
bilhetagem_transacao_riocard_captura.name = (
69+
"SMTR: Bilhetagem Transação RioCard - Captura"
70+
)
71+
bilhetagem_transacao_riocard_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
72+
bilhetagem_transacao_riocard_captura.run_config = KubernetesRun(
73+
image=emd_constants.DOCKER_IMAGE.value,
74+
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
75+
)
76+
77+
bilhetagem_transacao_riocard_captura = set_default_parameters(
78+
flow=bilhetagem_transacao_riocard_captura,
79+
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value
80+
| constants.BILHETAGEM_TRANSACAO_RIOCARD_CAPTURE_PARAMS.value,
81+
)
82+
83+
bilhetagem_transacao_riocard_captura.schedule = every_minute
84+
6685
# BILHETAGEM FISCALIZAÇÃO - CAPTURA A CADA 5 MINUTOS #
6786

6887
bilhetagem_fiscalizacao_captura = deepcopy(default_capture_flow)
@@ -179,6 +198,26 @@
179198
default_parameters=bilhetagem_materializacao_transacao_parameters,
180199
)
181200

201+
202+
bilhetagem_materializacao_transacao_riocard = deepcopy(default_materialization_flow)
203+
bilhetagem_materializacao_transacao_riocard.name = (
204+
"SMTR: Bilhetagem Transação RioCard - Materialização"
205+
)
206+
bilhetagem_materializacao_transacao_riocard.storage = GCS(
207+
emd_constants.GCS_FLOWS_BUCKET.value
208+
)
209+
bilhetagem_materializacao_transacao_riocard.run_config = KubernetesRun(
210+
image=emd_constants.DOCKER_IMAGE.value,
211+
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
212+
)
213+
214+
bilhetagem_materializacao_transacao_riocard = set_default_parameters(
215+
flow=bilhetagem_materializacao_transacao_riocard,
216+
default_parameters=constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_RIOCARD_PARAMS.value,
217+
)
218+
219+
bilhetagem_materializacao_transacao_riocard.schedule = every_day_hour_five
220+
182221
# Ordem Pagamento
183222

184223
bilhetagem_materializacao_ordem_pagamento = deepcopy(default_materialization_flow)
@@ -318,6 +357,21 @@
318357
raise_final_state=True,
319358
)
320359

360+
run_recaptura_transacao_riocard = create_flow_run(
361+
flow_name=bilhetagem_recaptura.name,
362+
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
363+
labels=LABELS,
364+
parameters=constants.BILHETAGEM_TRANSACAO_RIOCARD_CAPTURE_PARAMS.value,
365+
upstream_tasks=[wait_recaptura_transacao_true],
366+
)
367+
368+
wait_recaptura_transacao_riocard_true = wait_for_flow_run(
369+
run_recaptura_transacao_riocard,
370+
stream_states=True,
371+
stream_logs=True,
372+
raise_final_state=True,
373+
)
374+
321375
# Recaptura Fiscalização
322376

323377
run_recaptura_fiscalizacao = create_flow_run(

pipelines/rj_smtr/constants.py

+30
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,25 @@ class constants(Enum): # pylint: disable=c0103
777777
"interval_minutes": 1,
778778
}
779779

780+
BILHETAGEM_TRANSACAO_RIOCARD_CAPTURE_PARAMS = {
781+
"table_id": "transacao_riocard",
782+
"partition_date_only": False,
783+
"extract_params": {
784+
"database": "transacao_db",
785+
"query": """
786+
SELECT
787+
*
788+
FROM
789+
transacao_riocard
790+
WHERE
791+
data_processamento >= '{start}'
792+
AND data_processamento < '{end}'
793+
""",
794+
},
795+
"primary_key": ["id"],
796+
"interval_minutes": 1,
797+
}
798+
780799
BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS = {
781800
"table_id": "fiscalizacao",
782801
"partition_date_only": False,
@@ -1210,6 +1229,17 @@ class constants(Enum): # pylint: disable=c0103
12101229
"exclude": "integracao matriz_integracao stops_gtfs2 routes_gtfs2 feed_info_gtfs2",
12111230
}
12121231

1232+
BILHETAGEM_MATERIALIZACAO_TRANSACAO_RIOCARD_PARAMS = {
1233+
"dataset_id": "dashboard_controle_vinculo_jae_riocard",
1234+
"table_id": "veiculo_indicadores_dia",
1235+
"upstream": True,
1236+
"dbt_vars": {
1237+
"run_date": {},
1238+
"version": {},
1239+
},
1240+
"exclude": "+gps_sppo +sppo_licenciamento +gps_validador",
1241+
}
1242+
12131243
BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
12141244
"dataset_id": BILHETAGEM_DATASET_ID,
12151245
"table_id": "ordem_pagamento_dia",

pipelines/rj_smtr/tasks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def create_dbt_run_vars(
224224
day_datetime=timestamp,
225225
)
226226

227-
final_vars.append([d.copy() for d in date_var])
227+
final_vars += [d.copy() for d in date_var]
228228

229229
log(f"run_date created: {date_var}")
230230

0 commit comments

Comments
 (0)