Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cria flow de captura e tratamento de viagens da Conecta #133

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8f224d6
cria modelo viagem_informada
akaBotelho Aug 6, 2024
a14cb84
Merge branch 'main' into staging/viagem-conecta
mergify[bot] Aug 6, 2024
238eb36
cria flows
akaBotelho Aug 6, 2024
931cf66
add schedule every_day_hour_ten
akaBotelho Aug 6, 2024
66bdc4d
altera target dev
akaBotelho Aug 6, 2024
36fb848
add import flows br_rj_riodejaneiro_viagem_conecta
akaBotelho Aug 6, 2024
fb5bd1a
adiciona parametro version
akaBotelho Aug 6, 2024
91fb7fc
altera nome das variáveis
akaBotelho Aug 6, 2024
590c5c3
adiciona sufixo _conecta nos arquivos e referencias
akaBotelho Aug 6, 2024
f092e59
altera source
akaBotelho Aug 6, 2024
82e8279
altera data dbt
akaBotelho Aug 6, 2024
fb4d397
corrige source
akaBotelho Aug 6, 2024
ae1a81f
comenta flow dbt
akaBotelho Aug 6, 2024
b51bc53
altera dbt_project para dev
akaBotelho Aug 6, 2024
101522f
altera source para dev
akaBotelho Aug 6, 2024
4f2ce5f
corrige source
akaBotelho Aug 6, 2024
ef88c14
corrige staging_viagem_informada_conecta
akaBotelho Aug 6, 2024
df12153
adiciona changelogs
akaBotelho Aug 6, 2024
b8e33a8
corrige changelog
akaBotelho Aug 6, 2024
4290e07
altera para D+2
akaBotelho Aug 7, 2024
9a22ec1
altera para prod
akaBotelho Aug 7, 2024
e6cb941
Merge branch 'main' into staging/viagem-conecta
akaBotelho Aug 7, 2024
382da50
Merge branch 'main' into staging/viagem-conecta
mergify[bot] Aug 8, 2024
577e00b
altera parametros
akaBotelho Aug 8, 2024
b0e0088
captura d-2
akaBotelho Aug 8, 2024
d9f0aff
altera schedules
akaBotelho Aug 8, 2024
fc96a32
altera para dev
akaBotelho Aug 8, 2024
af459a5
Merge branch 'staging/viagem-conecta' of https://github.com/prefeitur…
akaBotelho Aug 8, 2024
b6fa8c8
cria schedules every_day_hour_ten_minute_five, every_day_hour_ten_thi…
akaBotelho Aug 9, 2024
47b22a1
altera schedules dos flows br_rj_riodejaneiro_viagem_conecta
akaBotelho Aug 9, 2024
9117088
altera incremental_filter
akaBotelho Aug 9, 2024
739691d
Merge branch 'main' into staging/viagem-conecta
mergify[bot] Aug 9, 2024
c2e0f65
Merge branch 'main' into staging/viagem-conecta
mergify[bot] Aug 14, 2024
8d29216
altera para prod
akaBotelho Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ class constants(Enum): # pylint: disable=c0103
VIAGEM_ZIRIX_RAW_DATASET_ID = "br_rj_riodejaneiro_viagem_zirix"
ZIRIX_BASE_URL = "https://integration.systemsatx.com.br/Globalbus/SMTR"

CONECTA_API_SECRET_PATH = "conecta_api"
VIAGEM_CONECTA_RAW_DATASET_ID = "br_rj_riodejaneiro_viagem_conecta"
CONECTA_BASE_URL = "https://ccomobility.com.br/webservices/binder/wsconecta"

CONTROLE_FINANCEIRO_DATASET_ID = "controle_financeiro"

######################################
Expand Down
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pipelines.migration.br_rj_riodejaneiro_recursos.flows import * # noqa
from pipelines.migration.br_rj_riodejaneiro_stpl_gps.flows import * # noqa
from pipelines.migration.br_rj_riodejaneiro_stu.flows import * # noqa
from pipelines.migration.br_rj_riodejaneiro_viagem_conecta.flows import * # noqa
from pipelines.migration.br_rj_riodejaneiro_viagem_zirix.flows import * # noqa
from pipelines.migration.controle_financeiro.flows import * # noqa
from pipelines.migration.projeto_subsidio_sppo.flows import * # noqa
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - viagem_conecta

## [1.0.0] - 2024-08-07

### Adicionado

- Cria flow `viagem_conecta_materializacao` para materialização dos dados de viagens de ônibus enviados pela API da Conecta (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/133)
Empty file.
37 changes: 37 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_viagem_conecta/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
"""
Constant values for rj_smtr br_rj_riodejaneiro_viagem_conecta
"""

from enum import Enum

from pipelines.constants import constants as smtr_constants


class constants(Enum): # pylint: disable=c0103
"""
Constant values for rj_smtr br_rj_riodejaneiro_viagem_conecta
"""

VIAGEM_CAPTURE_PARAMETERS = {
"dataset_id": smtr_constants.VIAGEM_CONECTA_RAW_DATASET_ID.value,
"table_id": "viagem_informada_conecta",
"partition_date_only": True,
"extract_params": {"delay_days": 2},
"primary_key": ["id_viagem"],
"interval_minutes": 1440,
"source_type": "api-json",
}

VIAGEM_MATERIALIZACAO_PARAMS = {
"dataset_id": smtr_constants.VIAGEM_CONECTA_RAW_DATASET_ID.value,
"table_id": "viagem_informada_conecta",
"upstream": True,
"dbt_vars": {
"run_date": {},
"version": {},
},
"source_dataset_ids": [smtr_constants.VIAGEM_CONECTA_RAW_DATASET_ID.value],
"source_table_ids": [VIAGEM_CAPTURE_PARAMETERS["table_id"]],
"capture_intervals_minutes": [VIAGEM_CAPTURE_PARAMETERS["interval_minutes"]],
}
83 changes: 83 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_viagem_conecta/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
"""
Flows for br_rj_riodejaneiro_viagem_conecta

DBT: 2024-08-09
"""
from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.constants import constants as smtr_constants
from pipelines.migration.br_rj_riodejaneiro_viagem_conecta.constants import constants
from pipelines.migration.flows import default_capture_flow, default_materialization_flow
from pipelines.migration.utils import set_default_parameters
from pipelines.schedules import (
every_day_hour_eleven,
every_day_hour_ten_minute_five,
every_day_hour_ten_thirty,
)

# Flows #

viagens_conecta_captura = deepcopy(default_capture_flow)
viagens_conecta_captura.name = "SMTR: Viagens Ônibus Conecta - Captura"
viagens_conecta_captura.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
viagens_conecta_captura.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)
viagens_conecta_captura.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]

viagens_conecta_captura = set_default_parameters(
flow=viagens_conecta_captura,
default_parameters=constants.VIAGEM_CAPTURE_PARAMETERS.value,
)

viagens_conecta_captura.schedule = every_day_hour_ten_minute_five


viagens_conecta_recaptura = deepcopy(default_capture_flow)
viagens_conecta_recaptura.name = "SMTR: Viagens Ônibus Conecta - Recaptura"
viagens_conecta_recaptura.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
viagens_conecta_recaptura.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)
viagens_conecta_recaptura.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]

viagens_conecta_recaptura = set_default_parameters(
flow=viagens_conecta_recaptura,
default_parameters=constants.VIAGEM_CAPTURE_PARAMETERS.value | {"recapture": True},
)

viagens_conecta_recaptura.schedule = every_day_hour_ten_thirty


viagem_conecta_materializacao = deepcopy(default_materialization_flow)
viagem_conecta_materializacao.name = "Viagem Conecta - Materialização"
viagem_conecta_materializacao.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
viagem_conecta_materializacao.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)

viagem_conecta_materializacao.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]

viagem_conecta_materializacao = set_default_parameters(
flow=viagem_conecta_materializacao,
default_parameters=constants.VIAGEM_MATERIALIZACAO_PARAMS.value,
)

viagem_conecta_materializacao.schedule = every_day_hour_eleven
14 changes: 14 additions & 0 deletions pipelines/migration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,20 @@ def create_request_params(
data_final: {request_params['data_final']}"""
)

elif dataset_id == constants.VIAGEM_CONECTA_RAW_DATASET_ID.value:
request_url = f"{constants.CONECTA_BASE_URL.value}/envioViagensConsolidadasSMTR"
delay_days = extract_params["delay_days"]
token = get_secret(constants.CONECTA_API_SECRET_PATH.value)
token_key = list(token)[0]
request_params = {
"data": (timestamp - timedelta(days=delay_days)).strftime("%Y-%m-%d"),
token_key: get_secret(constants.CONECTA_API_SECRET_PATH.value)[token_key],
}
log(
f"""Params:
data: {request_params['data']}"""
)

elif dataset_id == constants.CONTROLE_FINANCEIRO_DATASET_ID.value:
request_url = extract_params["base_url"] + extract_params["sheet_id"]

Expand Down
48 changes: 48 additions & 0 deletions pipelines/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,51 @@ def generate_interval_schedule(
)
]
)

every_day_hour_ten = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2024, 8, 1, 10, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)

every_day_hour_ten_minute_five = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2024, 8, 1, 10, 5, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)

every_day_hour_ten_thirty = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2024, 8, 1, 10, 30, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)

every_day_hour_eleven = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2024, 8, 1, 11, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)
7 changes: 7 additions & 0 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,10 @@ models:
catalogo:
+materialized: view
+schema: catalogo
br_rj_riodejaneiro_viagem_conecta:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: br_rj_riodejaneiro_viagem_conecta
staging:
+materialized: view
+schema: br_rj_riodejaneiro_viagem_conecta_staging_dev
7 changes: 7 additions & 0 deletions queries/models/br_rj_riodejaneiro_viagem_conecta/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - viagem_conecta

## [1.0.0] - 2024-08-07

### Adicionado

- Cria modelos `staging_viagem_informada_conecta.sql` e `viagem_informada_conecta.sql` para tratamento dos dados de viagens de ônibus enviados pela API da Conecta (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/133)
34 changes: 34 additions & 0 deletions queries/models/br_rj_riodejaneiro_viagem_conecta/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: 2

models:
- name: viagem_informada_conecta
description: "Detalhes das viagens informadas pelas operadoras de ônibus"
columns:
- name: data
description: "Data da viagem (partição)"
- name: datetime_partida
description: "Data e hora da partida da viagem em GMT-3"
- name: datetime_chegada
description: "Data e hora da chegada da viagem em GMT-3"
- name: datetime_processamento
description: "Data e hora do processamento da viagem em GMT-3"
- name: datetime_captura
description: "Data e hora da captura da integração em GMT-3"
- name: id_veiculo
description: "Código identificador do veículo (número de ordem)"
- name: trip_id
description: "Identificador de uma viagem da tabela trips do GTFS"
- name: route_id
description: "Identificador de uma rota da tabela routes do GTFS"
- name: shape_id
description: "Identificador de um shape da tabela shapes do GTFS"
- name: servico
description: "Nome curto da linha operada pelo veículo com variação de serviço (ex: 010, 011SN, ...)"
- name: sentido
description: "Ida ou Volta"
- name: id_viagem
description: "Identificador de um shape da tabela shapes do GTFS"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
- name: datetime_ultima_atualizacao
description: "Última atualização (GMT-3)."
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{{
config(
alias="viagem_informada_conecta"
)
}}

SELECT
id_viagem,
data,
DATETIME(PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S%Ez', timestamp_captura), "America/Sao_Paulo") AS timestamp_captura,
DATE(
PARSE_DATETIME(
'%Y-%m-%d',
SUBSTRING(
SAFE_CAST(JSON_VALUE(content, '$.data_viagem') AS STRING),
0,
10
)
)
) AS data_viagem,
DATETIME(
PARSE_TIMESTAMP(
'%Y-%m-%dT%H:%M:%S',
REPLACE(
SAFE_CAST(JSON_VALUE(content, '$.datetime_chegada') AS STRING),
"Z",
""
)
)
) AS datetime_chegada,
DATETIME(
PARSE_TIMESTAMP(
'%Y-%m-%dT%H:%M:%S',
REPLACE(
SAFE_CAST(JSON_VALUE(content, '$.datetime_partida') AS STRING),
"Z",
""
)
)
) AS datetime_partida,
DATETIME(
PARSE_TIMESTAMP(
'%Y-%m-%dT%H:%M:%E*S',
REPLACE(
SAFE_CAST(JSON_VALUE(content, '$.datetime_processamento') AS STRING),
"Z",
""
)
)
) AS datetime_processamento,
SAFE_CAST(JSON_VALUE(content, '$.id_veiculo') AS STRING) AS id_veiculo,
SAFE_CAST(JSON_VALUE(content, '$.route_id') AS STRING) AS route_id,
SAFE_CAST(JSON_VALUE(content, '$.sentido') AS STRING) AS sentido,
SAFE_CAST(JSON_VALUE(content, '$.servico') AS STRING) AS servico,
SAFE_CAST(JSON_VALUE(content, '$.shape_id') AS STRING) AS shape_id,
SAFE_CAST(JSON_VALUE(content, '$.trip_id') AS STRING) AS trip_id
FROM
{{ source("br_rj_riodejaneiro_viagem_conecta_staging_dev", "viagem_informada_conecta") }}
Loading
Loading