diff --git a/pipelines/constants.py b/pipelines/constants.py index c26f7592..5f3f9d86 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -249,6 +249,67 @@ class constants(Enum): # pylint: disable=c0103 }, } + GTFS_TABLE_CAPTURE_PARAMS_ANTIGO = [ + { + "table_id": "shapes", + "primary_key": ["shape_id", "shape_pt_sequence"], + }, + { + "table_id": "agency", + "primary_key": ["agency_id"], + }, + { + "table_id": "calendar_dates", + "primary_key": ["service_id", "date"], + }, + { + "table_id": "calendar", + "primary_key": ["service_id"], + }, + { + "table_id": "feed_info", + "primary_key": ["feed_publisher_name"], + }, + { + "table_id": "frequencies", + "primary_key": ["trip_id", "start_time"], + }, + { + "table_id": "routes", + "primary_key": ["route_id"], + }, + { + "table_id": "stops", + "primary_key": ["stop_id"], + }, + { + "table_id": "trips", + "primary_key": ["trip_id"], + }, + { + "table_id": "fare_attributes", + "primary_key": ["fare_id"], + }, + { + "table_id": "fare_rules", + "primary_key": [], + }, + { + "table_id": "ordem_servico", + "primary_key": ["servico", "tipo_os"], + "extract_params": {"filename": "ordem_servico"}, + }, + { + "table_id": "ordem_servico_trajeto_alternativo", + "primary_key": ["servico", "tipo_os", "evento"], + "extract_params": {"filename": "ordem_servico_trajeto_alternativo"}, + }, + { + "table_id": "stop_times", + "primary_key": ["trip_id", "stop_sequence"], + }, + ] + # # SUBSÍDIO RECURSOS VIAGENS INDIVIDUAIS # SUBSIDIO_SPPO_RECURSOS_DATASET_ID = "migracao_br_rj_riodejaneiro_recurso" # SUBSIDIO_SPPO_RECURSO_API_BASE_URL = "https://api.movidesk.com/public/v1/tickets?" diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index bdd8bc59..36de2005 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -5,9 +5,14 @@ DBT: 2024-07-15 """ +from copy import deepcopy +from datetime import timedelta + from prefect import Parameter, case, task from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.control_flow import merge +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from prefect.utilities.edges import unmapped from prefeitura_rio.pipelines_utils.custom import Flow @@ -18,6 +23,7 @@ handler_skip_if_running, ) +from pipelines.capture.templates.flows import create_default_capture_flow from pipelines.constants import constants # SMTR Imports # @@ -27,13 +33,17 @@ get_raw_drive_files, update_last_captured_os, ) +from pipelines.migration.flows import default_capture_flow, default_materialization_flow from pipelines.migration.tasks import ( create_date_hour_partition, create_local_partition_path, fetch_dataset_sha, + get_current_flow_labels, get_current_flow_mode, get_current_timestamp, + get_flow_project, get_join_dict, + get_scheduled_start_times, rename_current_flow_run_now_time, run_dbt_model, transform_raw_to_nested_structure, @@ -41,11 +51,10 @@ upload_raw_data_to_gcs, upload_staging_data_to_gcs, ) +from pipelines.migration.utils import set_default_parameters from pipelines.schedules import every_5_minutes from pipelines.tasks import get_scheduled_timestamp, parse_timestamp_to_string - -# from pipelines.capture.templates.flows import create_default_capture_flow - +from pipelines.treatment.templates.flows import create_default_materialization_flow # Imports # @@ -53,7 +62,6 @@ # EMD Imports # -# from pipelines.rj_smtr.flows import default_capture_flow, default_materialization_flow # SETUP dos Flows @@ -64,18 +72,18 @@ # ) # Captura Antiga -# gtfs_captura = deepcopy(default_capture_flow) -# gtfs_captura.name = "SMTR: GTFS - Captura (subflow)" -# gtfs_captura.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# gtfs_captura.run_config = KubernetesRun( -# image=constants.DOCKER_IMAGE.value, -# labels=[constants.RJ_SMTR_AGENT_LABEL.value], -# ) -# gtfs_captura.state_handlers = [handler_initialize_sentry, handler_inject_bd_credentials] -# gtfs_captura = set_default_parameters( -# flow=gtfs_captura, -# default_parameters=constants.GTFS_GENERAL_CAPTURE_PARAMS.value, -# ) +gtfs_captura = deepcopy(default_capture_flow) +gtfs_captura.name = "SMTR: GTFS - Captura (subflow)" +gtfs_captura.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +gtfs_captura.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_SMTR_AGENT_LABEL.value], +) +gtfs_captura.state_handlers = [handler_initialize_sentry, handler_inject_bd_credentials] +gtfs_captura = set_default_parameters( + flow=gtfs_captura, + default_parameters=constants.GTFS_GENERAL_CAPTURE_PARAMS.value, +) # Captura Antiga # gtfs_materializacao = create_default_materialization_flow( @@ -84,18 +92,18 @@ # overwrite_flow_param_values=constants.GTFS_MATERIALIZACAO_PARAMS.value, # ) -# gtfs_materializacao = deepcopy(default_materialization_flow) -# gtfs_materializacao.name = "SMTR: GTFS - Materialização (subflow)" -# gtfs_materializacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# gtfs_materializacao.run_config = KubernetesRun( -# image=constants.DOCKER_IMAGE.value, -# labels=[constants.RJ_SMTR_AGENT_LABEL.value], -# ) -# gtfs_materializacao.state_handlers = [handler_initialize_sentry, handler_inject_bd_credentials] -# gtfs_materializacao = set_default_parameters( -# flow=gtfs_materializacao, -# default_parameters=constants.GTFS_MATERIALIZACAO_PARAMS.value, -# ) +gtfs_materializacao = deepcopy(default_materialization_flow) +gtfs_materializacao.name = "SMTR: GTFS - Materialização (subflow)" +gtfs_materializacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +gtfs_materializacao.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_SMTR_AGENT_LABEL.value], +) +gtfs_materializacao.state_handlers = [handler_initialize_sentry, handler_inject_bd_credentials] +gtfs_materializacao = set_default_parameters( + flow=gtfs_materializacao, + default_parameters=constants.GTFS_MATERIALIZACAO_PARAMS.value, +) with Flow("SMTR: GTFS - Captura/Tratamento") as gtfs_captura_nova: capture = Parameter("capture", default=True) @@ -201,131 +209,89 @@ gtfs_captura_nova.schedule = every_5_minutes -# with Flow( -# "SMTR: GTFS - Captura/Tratamento", -# # code_owners=["rodrigo", "carolinagomes"], -# ) as gtfs_captura_tratamento: -# # SETUP -# data_versao_gtfs = Parameter("data_versao_gtfs", default=None) -# capture = Parameter("capture", default=True) -# materialize = Parameter("materialize", default=True) - -# timestamp = get_current_timestamp() - -# rename_flow_run = rename_current_flow_run_now_time( -# prefix=gtfs_captura_tratamento.name + " " + data_versao_gtfs + " ", -# now_time=timestamp, -# ) - -# LABELS = get_current_flow_labels() -# PROJECT = get_flow_project() - -# with case(capture, True): - -# # Captura nova -# run_captura = create_flow_run.map( -# flow_name=unmapped(gtfs_captura_nova.name), -# project_name=unmapped(PROJECT), -# parameters=None, -# labels=unmapped(LABELS), -# scheduled_start_time=get_scheduled_start_times( -# timestamp=timestamp, -# parameters=None, -# intervals={"agency": timedelta(minutes=5)}, -# ), -# ) - -# wait_captura_true = wait_for_flow_run.map( -# run_captura, -# stream_states=unmapped(True), -# stream_logs=unmapped(True), -# raise_final_state=unmapped(True), -# ) - -# # Captura antiga - -# # gtfs_capture_parameters = [ -# # {"timestamp": data_versao_gtfs, **d} for d in -# constants.GTFS_TABLE_CAPTURE_PARAMS.value -# # ] - -# # run_captura = create_flow_run.map( -# # flow_name=unmapped(gtfs_captura.name), -# # project_name=unmapped(PROJECT), -# # parameters=gtfs_capture_parameters, -# # labels=unmapped(LABELS), -# # scheduled_start_time=get_scheduled_start_times( -# # timestamp=timestamp, -# # parameters=gtfs_capture_parameters, -# # intervals={"agency": timedelta(minutes=11)}, -# # ), -# # ) - -# # wait_captura_true = wait_for_flow_run.map( -# # run_captura, -# # stream_states=unmapped(True), -# # stream_logs=unmapped(True), -# # raise_final_state=unmapped(True), -# # ) - -# with case(capture, False): -# wait_captura_false = task( -# lambda: [None], checkpoint=False, name="assign_none_to_previous_runs" -# )() - -# wait_captura = merge(wait_captura_true, wait_captura_false) - -# with case(materialize, True): -# gtfs_materializacao_parameters = { -# "dbt_vars": { -# "data_versao_gtfs": data_versao_gtfs, -# "version": {}, -# }, -# } -# gtfs_materializacao_parameters_new = { -# "dataset_id": "gtfs", -# "dbt_vars": { -# "data_versao_gtfs": data_versao_gtfs, -# "version": {}, -# }, -# } - -# run_materializacao = create_flow_run( -# flow_name=gtfs_materializacao.name, -# project_name=PROJECT, -# parameters=gtfs_materializacao_parameters, -# labels=LABELS, -# upstream_tasks=[wait_captura], -# ) - -# run_materializacao_new_dataset_id = create_flow_run( -# flow_name=gtfs_materializacao.name, -# project_name=PROJECT, -# parameters=gtfs_materializacao_parameters_new, -# labels=LABELS, -# upstream_tasks=[wait_captura], -# ) - -# wait_materializacao = wait_for_flow_run( -# run_materializacao, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) - -# wait_materializacao_new_dataset_id = wait_for_flow_run( -# run_materializacao_new_dataset_id, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) - -# gtfs_captura_tratamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# gtfs_captura_tratamento.run_config = KubernetesRun( -# image=constants.DOCKER_IMAGE.value, -# labels=[constants.RJ_SMTR_AGENT_LABEL.value], -# ) -# gtfs_captura_tratamento.state_handlers = [ -# handler_inject_bd_credentials, -# handler_initialize_sentry, -# ] +with Flow( + "SMTR: GTFS - Captura/Tratamento GCS", + # code_owners=["rodrigo", "carolinagomes"], +) as gtfs_captura_tratamento: + # SETUP + data_versao_gtfs = Parameter("data_versao_gtfs", default=None) + capture = Parameter("capture", default=True) + materialize = Parameter("materialize", default=True) + project_name = Parameter("project_name", default=constants.PREFECT_DEFAULT_PROJECT.value) + + timestamp = get_current_timestamp() + + rename_flow_run = rename_current_flow_run_now_time( + prefix=gtfs_captura_tratamento.name + " " + data_versao_gtfs + " ", + now_time=timestamp, + ) + + LABELS = get_current_flow_labels() + + with case(capture, True): + gtfs_capture_parameters = [ + {"timestamp": data_versao_gtfs, **d} + for d in constants.GTFS_TABLE_CAPTURE_PARAMS_ANTIGO.value + ] + + run_captura = create_flow_run.map( + flow_name=unmapped(gtfs_captura.name), + project_name=unmapped(project_name), + parameters=gtfs_capture_parameters, + labels=unmapped(LABELS), + scheduled_start_time=get_scheduled_start_times( + timestamp=timestamp, + parameters=gtfs_capture_parameters, + intervals={"agency": timedelta(minutes=11)}, + ), + run_config=unmapped(gtfs_captura_tratamento.run_config), + ) + + wait_captura_true = wait_for_flow_run.map( + run_captura, + stream_states=unmapped(True), + stream_logs=unmapped(True), + raise_final_state=unmapped(True), + ) + + with case(capture, False): + wait_captura_false = task( + lambda: [None], checkpoint=False, name="assign_none_to_previous_runs" + )() + + wait_captura = merge(wait_captura_true, wait_captura_false) + + with case(materialize, True): + dbt_vars = { + "dbt_vars": { + "data_versao_gtfs": data_versao_gtfs, + "version": {}, + }, + } + + gtfs_materializacao_parameters = dbt_vars + + run_materializacao = create_flow_run( + flow_name=gtfs_materializacao.name, + project_name=project_name, + parameters=gtfs_materializacao_parameters, + labels=LABELS, + upstream_tasks=[wait_captura], + ) + + wait_materializacao = wait_for_flow_run( + run_materializacao, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + +gtfs_captura_tratamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +gtfs_captura_tratamento.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_SMTR_AGENT_LABEL.value], +) +gtfs_captura_tratamento.state_handlers = [ + handler_inject_bd_credentials, + handler_initialize_sentry, +]