diff --git a/pipelines/constants.py b/pipelines/constants.py index a63e7c28..c906db6c 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -290,6 +290,10 @@ class constants(Enum): # pylint: disable=c0103 VIAGEM_ZIRIX_RAW_DATASET_ID = "br_rj_riodejaneiro_viagem_zirix" ZIRIX_BASE_URL = "https://integration.systemsatx.com.br/Globalbus/SMTR" + CONECTA_API_SECRET_PATH = "conecta_api" + VIAGEM_CONECTA_RAW_DATASET_ID = "br_rj_riodejaneiro_viagem_conecta" + CONECTA_BASE_URL = "https://ccomobility.com.br/webservices/binder/wsconecta" + CONTROLE_FINANCEIRO_DATASET_ID = "controle_financeiro" ###################################### diff --git a/pipelines/flows.py b/pipelines/flows.py index 1a915ff2..586ec07a 100644 --- a/pipelines/flows.py +++ b/pipelines/flows.py @@ -15,6 +15,7 @@ from pipelines.migration.br_rj_riodejaneiro_recursos.flows import * # noqa from pipelines.migration.br_rj_riodejaneiro_stpl_gps.flows import * # noqa from pipelines.migration.br_rj_riodejaneiro_stu.flows import * # noqa +from pipelines.migration.br_rj_riodejaneiro_viagem_conecta.flows import * # noqa from pipelines.migration.br_rj_riodejaneiro_viagem_zirix.flows import * # noqa from pipelines.migration.controle_financeiro.flows import * # noqa from pipelines.migration.projeto_subsidio_sppo.flows import * # noqa diff --git a/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/CHANGELOG.md new file mode 100644 index 00000000..1599d087 --- /dev/null +++ b/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - viagem_conecta + +## [1.0.0] - 2024-08-07 + +### Adicionado + +- Cria flow `viagem_conecta_materializacao` para materialização dos dados de viagens de ônibus enviados pela API da Conecta (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/133) \ No newline at end of file diff --git a/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/__init__.py b/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/constants.py b/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/constants.py new file mode 100644 index 00000000..b1d4f285 --- /dev/null +++ b/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/constants.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +""" +Constant values for rj_smtr br_rj_riodejaneiro_viagem_conecta +""" + +from enum import Enum + +from pipelines.constants import constants as smtr_constants + + +class constants(Enum): # pylint: disable=c0103 + """ + Constant values for rj_smtr br_rj_riodejaneiro_viagem_conecta + """ + + VIAGEM_CAPTURE_PARAMETERS = { + "dataset_id": smtr_constants.VIAGEM_CONECTA_RAW_DATASET_ID.value, + "table_id": "viagem_informada_conecta", + "partition_date_only": True, + "extract_params": {"delay_days": 2}, + "primary_key": ["id_viagem"], + "interval_minutes": 1440, + "source_type": "api-json", + } + + VIAGEM_MATERIALIZACAO_PARAMS = { + "dataset_id": smtr_constants.VIAGEM_CONECTA_RAW_DATASET_ID.value, + "table_id": "viagem_informada_conecta", + "upstream": True, + "dbt_vars": { + "run_date": {}, + "version": {}, + }, + "source_dataset_ids": [smtr_constants.VIAGEM_CONECTA_RAW_DATASET_ID.value], + "source_table_ids": [VIAGEM_CAPTURE_PARAMETERS["table_id"]], + "capture_intervals_minutes": [VIAGEM_CAPTURE_PARAMETERS["interval_minutes"]], + } diff --git a/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/flows.py b/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/flows.py new file mode 100644 index 00000000..fcd2d88a --- /dev/null +++ b/pipelines/migration/br_rj_riodejaneiro_viagem_conecta/flows.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_rj_riodejaneiro_viagem_conecta + +DBT: 2024-08-16 +""" +from copy import deepcopy + +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefeitura_rio.pipelines_utils.state_handlers import ( + handler_initialize_sentry, + handler_inject_bd_credentials, +) + +from pipelines.constants import constants as smtr_constants +from pipelines.migration.br_rj_riodejaneiro_viagem_conecta.constants import constants +from pipelines.migration.flows import default_capture_flow, default_materialization_flow +from pipelines.migration.utils import set_default_parameters +from pipelines.schedules import ( + every_day_hour_eleven, + every_day_hour_ten_minute_five, + every_day_hour_ten_thirty, +) + +# Flows # + +viagens_conecta_captura = deepcopy(default_capture_flow) +viagens_conecta_captura.name = "SMTR: Viagens Ônibus Conecta - Captura" +viagens_conecta_captura.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) +viagens_conecta_captura.run_config = KubernetesRun( + image=smtr_constants.DOCKER_IMAGE.value, + labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value], +) +viagens_conecta_captura.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry] + +viagens_conecta_captura = set_default_parameters( + flow=viagens_conecta_captura, + default_parameters=constants.VIAGEM_CAPTURE_PARAMETERS.value, +) + +viagens_conecta_captura.schedule = every_day_hour_ten_minute_five + + +viagens_conecta_recaptura = deepcopy(default_capture_flow) +viagens_conecta_recaptura.name = "SMTR: Viagens Ônibus Conecta - Recaptura" +viagens_conecta_recaptura.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) +viagens_conecta_recaptura.run_config = KubernetesRun( + image=smtr_constants.DOCKER_IMAGE.value, + labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value], +) +viagens_conecta_recaptura.state_handlers = [ + handler_inject_bd_credentials, + handler_initialize_sentry, +] + +viagens_conecta_recaptura = set_default_parameters( + flow=viagens_conecta_recaptura, + default_parameters=constants.VIAGEM_CAPTURE_PARAMETERS.value | {"recapture": True}, +) + +viagens_conecta_recaptura.schedule = every_day_hour_ten_thirty + + +viagem_conecta_materializacao = deepcopy(default_materialization_flow) +viagem_conecta_materializacao.name = "Viagem Conecta - Materialização" +viagem_conecta_materializacao.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value) +viagem_conecta_materializacao.run_config = KubernetesRun( + image=smtr_constants.DOCKER_IMAGE.value, + labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value], +) + +viagem_conecta_materializacao.state_handlers = [ + handler_inject_bd_credentials, + handler_initialize_sentry, +] + +viagem_conecta_materializacao = set_default_parameters( + flow=viagem_conecta_materializacao, + default_parameters=constants.VIAGEM_MATERIALIZACAO_PARAMS.value, +) + +viagem_conecta_materializacao.schedule = every_day_hour_eleven diff --git a/pipelines/migration/tasks.py b/pipelines/migration/tasks.py index 4868d341..ae504c51 100644 --- a/pipelines/migration/tasks.py +++ b/pipelines/migration/tasks.py @@ -828,6 +828,20 @@ def create_request_params( data_final: {request_params['data_final']}""" ) + elif dataset_id == constants.VIAGEM_CONECTA_RAW_DATASET_ID.value: + request_url = f"{constants.CONECTA_BASE_URL.value}/envioViagensConsolidadasSMTR" + delay_days = extract_params["delay_days"] + token = get_secret(constants.CONECTA_API_SECRET_PATH.value) + token_key = list(token)[0] + request_params = { + "data": (timestamp - timedelta(days=delay_days)).strftime("%Y-%m-%d"), + token_key: get_secret(constants.CONECTA_API_SECRET_PATH.value)[token_key], + } + log( + f"""Params: + data: {request_params['data']}""" + ) + elif dataset_id == constants.CONTROLE_FINANCEIRO_DATASET_ID.value: request_url = extract_params["base_url"] + extract_params["sheet_id"] diff --git a/pipelines/schedules.py b/pipelines/schedules.py index e8921874..4d5711c2 100644 --- a/pipelines/schedules.py +++ b/pipelines/schedules.py @@ -204,3 +204,51 @@ def generate_interval_schedule( ) ] ) + +every_day_hour_ten = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2024, 8, 1, 10, 0, tzinfo=timezone(constants.TIMEZONE.value)), + labels=[ + emd_constants.RJ_SMTR_AGENT_LABEL.value, + ], + ), + ] +) + +every_day_hour_ten_minute_five = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2024, 8, 1, 10, 5, tzinfo=timezone(constants.TIMEZONE.value)), + labels=[ + emd_constants.RJ_SMTR_AGENT_LABEL.value, + ], + ), + ] +) + +every_day_hour_ten_thirty = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2024, 8, 1, 10, 30, tzinfo=timezone(constants.TIMEZONE.value)), + labels=[ + emd_constants.RJ_SMTR_AGENT_LABEL.value, + ], + ), + ] +) + +every_day_hour_eleven = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2024, 8, 1, 11, 0, tzinfo=timezone(constants.TIMEZONE.value)), + labels=[ + emd_constants.RJ_SMTR_AGENT_LABEL.value, + ], + ), + ] +) diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index da65dc96..ac77c284 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -295,3 +295,10 @@ models: catalogo: +materialized: view +schema: catalogo + br_rj_riodejaneiro_viagem_conecta: + +materialized: incremental + +incremental_strategy: insert_overwrite + +schema: br_rj_riodejaneiro_viagem_conecta + staging: + +materialized: view + +schema: br_rj_riodejaneiro_viagem_conecta_staging \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_viagem_conecta/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_viagem_conecta/CHANGELOG.md new file mode 100644 index 00000000..60d758c0 --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_viagem_conecta/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog - viagem_conecta + +## [1.0.0] - 2024-08-07 + +### Adicionado + +- Cria modelos `staging_viagem_informada_conecta.sql` e `viagem_informada_conecta.sql` para tratamento dos dados de viagens de ônibus enviados pela API da Conecta (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/133) diff --git a/queries/models/br_rj_riodejaneiro_viagem_conecta/schema.yml b/queries/models/br_rj_riodejaneiro_viagem_conecta/schema.yml new file mode 100644 index 00000000..7878a1c9 --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_viagem_conecta/schema.yml @@ -0,0 +1,34 @@ +version: 2 + +models: + - name: viagem_informada_conecta + description: "Detalhes das viagens informadas pelas operadoras de ônibus" + columns: + - name: data + description: "Data da viagem (partição)" + - name: datetime_partida + description: "Data e hora da partida da viagem em GMT-3" + - name: datetime_chegada + description: "Data e hora da chegada da viagem em GMT-3" + - name: datetime_processamento + description: "Data e hora do processamento da viagem em GMT-3" + - name: datetime_captura + description: "Data e hora da captura da integração em GMT-3" + - name: id_veiculo + description: "Código identificador do veículo (número de ordem)" + - name: trip_id + description: "Identificador de uma viagem da tabela trips do GTFS" + - name: route_id + description: "Identificador de uma rota da tabela routes do GTFS" + - name: shape_id + description: "Identificador de um shape da tabela shapes do GTFS" + - name: servico + description: "Nome curto da linha operada pelo veículo com variação de serviço (ex: 010, 011SN, ...)" + - name: sentido + description: "Ida ou Volta" + - name: id_viagem + description: "Identificador de um shape da tabela shapes do GTFS" + - name: versao + description: "Código de controle de versão do dado (SHA Github)" + - name: datetime_ultima_atualizacao + description: "Última atualização (GMT-3)." \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_viagem_conecta/staging/staging_viagem_informada_conecta.sql b/queries/models/br_rj_riodejaneiro_viagem_conecta/staging/staging_viagem_informada_conecta.sql new file mode 100644 index 00000000..e5dfa6b7 --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_viagem_conecta/staging/staging_viagem_informada_conecta.sql @@ -0,0 +1,58 @@ +{{ + config( + alias="viagem_informada_conecta" + ) +}} + +SELECT + id_viagem, + data, + DATETIME(PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S%Ez', timestamp_captura), "America/Sao_Paulo") AS timestamp_captura, + DATE( + PARSE_DATETIME( + '%Y-%m-%d', + SUBSTRING( + SAFE_CAST(JSON_VALUE(content, '$.data_viagem') AS STRING), + 0, + 10 + ) + ) + ) AS data_viagem, + DATETIME( + PARSE_TIMESTAMP( + '%Y-%m-%dT%H:%M:%S', + REPLACE( + SAFE_CAST(JSON_VALUE(content, '$.datetime_chegada') AS STRING), + "Z", + "" + ) + ) + ) AS datetime_chegada, + DATETIME( + PARSE_TIMESTAMP( + '%Y-%m-%dT%H:%M:%S', + REPLACE( + SAFE_CAST(JSON_VALUE(content, '$.datetime_partida') AS STRING), + "Z", + "" + ) + ) + ) AS datetime_partida, + DATETIME( + PARSE_TIMESTAMP( + '%Y-%m-%dT%H:%M:%E*S', + REPLACE( + SAFE_CAST(JSON_VALUE(content, '$.datetime_processamento') AS STRING), + "Z", + "" + ) + ) + ) AS datetime_processamento, + SAFE_CAST(JSON_VALUE(content, '$.id_veiculo') AS STRING) AS id_veiculo, + SAFE_CAST(JSON_VALUE(content, '$.route_id') AS STRING) AS route_id, + SAFE_CAST(JSON_VALUE(content, '$.sentido') AS STRING) AS sentido, + SAFE_CAST(JSON_VALUE(content, '$.servico') AS STRING) AS servico, + SAFE_CAST(JSON_VALUE(content, '$.shape_id') AS STRING) AS shape_id, + SAFE_CAST(JSON_VALUE(content, '$.trip_id') AS STRING) AS trip_id +FROM + {{ source("br_rj_riodejaneiro_viagem_conecta_staging", "viagem_informada_conecta") }} \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_viagem_conecta/viagem_informada_conecta.sql b/queries/models/br_rj_riodejaneiro_viagem_conecta/viagem_informada_conecta.sql new file mode 100644 index 00000000..c37cc2ac --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_viagem_conecta/viagem_informada_conecta.sql @@ -0,0 +1,96 @@ +{{ + config( + materialized="incremental", + partition_by={ + "field":"data", + "data_type":"date", + "granularity": "day" + }, + incremental_strategy="insert_overwrite" + ) +}} + +{% set incremental_filter %} + DATE(data) = DATE("{{ var("run_date") }}") +{% endset %} + +{% set staging_viagem_informada = ref('staging_viagem_informada_conecta') %} +{% if execute %} + {% if is_incremental() %} + {% set partitions_query %} + SELECT DISTINCT + CONCAT("'", DATE(data_viagem), "'") AS data_viagem + FROM + {{ staging_viagem_informada }} + WHERE + {{ incremental_filter }} + {% endset %} + + {% set partitions = run_query(partitions_query).columns[0].values() %} + {% endif %} +{% endif %} + +WITH staging_data AS ( + SELECT + data_viagem AS data, + datetime_partida, + datetime_chegada, + datetime_processamento, + timestamp_captura AS datetime_captura, + id_veiculo, + trip_id, + route_id, + shape_id, + servico, + CASE + WHEN sentido = 'IDA' THEN 'Ida' + WHEN sentido = 'VOLTA' THEN 'Volta' + ELSE sentido + END AS sentido, + id_viagem + FROM + {{ staging_viagem_informada }} + {% if is_incremental() %} + WHERE + {{ incremental_filter }} + {% endif %} +), +complete_partitions AS ( + SELECT + *, + 0 AS priority + FROM + staging_data + + {% if is_incremental() and partitions|length > 0 %} + UNION ALL + + SELECT + * EXCEPT(versao, datetime_ultima_atualizacao), + 1 AS priority + FROM + {{ this }} + WHERE + data IN ({{ partitions|join(', ') }}) + {% endif %} +), +deduplicado AS ( + SELECT + * EXCEPT(rn, priority) + FROM + ( + SELECT + *, + ROW_NUMBER() OVER(PARTITION BY id_viagem ORDER BY datetime_captura DESC, priority) AS rn + FROM + complete_partitions + ) + WHERE + rn = 1 +) +SELECT + *, + '{{ var("version") }}' AS versao, + CURRENT_DATETIME("America/Sao_Paulo") as datetime_ultima_atualizacao +FROM + deduplicado \ No newline at end of file diff --git a/queries/models/sources.yml b/queries/models/sources.yml index 198b660a..479fc29c 100644 --- a/queries/models/sources.yml +++ b/queries/models/sources.yml @@ -154,3 +154,9 @@ sources: tables: - name: viagem_informada + + - name: br_rj_riodejaneiro_viagem_conecta_staging + database: rj-smtr-staging + + tables: + - name: viagem_informada_conecta \ No newline at end of file