From a318e4f1dd1b64b9c4eb22344511c7a56e1dee66 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Thu, 11 Jan 2024 18:52:32 -0300 Subject: [PATCH] inclue prefect flowa --- pipelines/datasets/br_bd_metadados/flows.py | 84 +++++++++++- .../datasets/br_bd_metadados/schedules.py | 21 ++- pipelines/datasets/br_bd_metadados/tasks.py | 121 +++++++++++++----- 3 files changed, 189 insertions(+), 37 deletions(-) diff --git a/pipelines/datasets/br_bd_metadados/flows.py b/pipelines/datasets/br_bd_metadados/flows.py index f9f4ee84e..d34df9548 100755 --- a/pipelines/datasets/br_bd_metadados/flows.py +++ b/pipelines/datasets/br_bd_metadados/flows.py @@ -12,10 +12,12 @@ from pipelines.constants import constants from pipelines.datasets.br_bd_metadados.schedules import ( - every_day_prefect, + every_day_prefect_flow_runs, + every_day_prefect_flows, ) from pipelines.datasets.br_bd_metadados.tasks import ( - crawler, + crawler_flow_runs, + crawler_flows, ) from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow @@ -31,7 +33,7 @@ code_owners=[ "lauris", ], -) as bd_prefect: +) as bd_prefect_flow_runs: # Parameters dataset_id = Parameter("dataset_id", default="br_bd_metadados", required=True) table_id = Parameter("table_id", default="prefect_flow_runs", required=True) @@ -49,7 +51,7 @@ prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) - filepath = crawler() + filepath = crawler_flow_runs() # pylint: disable=C0103 wait_upload_table = create_table_and_upload_to_gcs( @@ -89,6 +91,74 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) -bd_prefect.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -bd_prefect.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -bd_prefect.schedule = every_day_prefect +bd_prefect_flow_runs.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +bd_prefect_flow_runs.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +bd_prefect_flow_runs.schedule = every_day_prefect_flow_runs + + +with Flow( + name="br_bd_metadados.prefect_flows", + code_owners=[ + "lauris", + ], +) as bd_prefect_flows: + # Parameters + dataset_id = Parameter("dataset_id", default="br_bd_metadados", required=True) + table_id = Parameter("table_id", default="prefect_flows", required=True) + + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + + materialize_after_dump = Parameter( + "materialize_after_dump", default=True, required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id + ) + + filepath = crawler_flows() + + # pylint: disable=C0103 + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + +bd_prefect_flows.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +bd_prefect_flows.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +bd_prefect_flows.schedule = every_day_prefect_flows diff --git a/pipelines/datasets/br_bd_metadados/schedules.py b/pipelines/datasets/br_bd_metadados/schedules.py index c932fe69e..ecceb05af 100755 --- a/pipelines/datasets/br_bd_metadados/schedules.py +++ b/pipelines/datasets/br_bd_metadados/schedules.py @@ -10,7 +10,7 @@ from pipelines.constants import constants -every_day_prefect = Schedule( +every_day_prefect_flow_runs = Schedule( clocks=[ IntervalClock( interval=timedelta(days=1), @@ -28,3 +28,22 @@ ), ], ) + +every_day_prefect_flows = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2022, 9, 20, 10, 00), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_bd_metadados", + "table_id": "prefect_flows", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + }, + ), + ], +) diff --git a/pipelines/datasets/br_bd_metadados/tasks.py b/pipelines/datasets/br_bd_metadados/tasks.py index 9ba771367..8614a4833 100755 --- a/pipelines/datasets/br_bd_metadados/tasks.py +++ b/pipelines/datasets/br_bd_metadados/tasks.py @@ -5,6 +5,7 @@ from datetime import datetime, timedelta import os import pandas as pd +from pipelines.utils.utils import log from prefect import task from prefect import Client @@ -16,37 +17,37 @@ max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) -def crawler(): +def crawler_flow_runs(): client = Client() - body = """{ - flow_run(where: {_and: [{state: {_nin: ["Scheduled", "Cancelled"]}}]}) { - id - name - start_time - end_time - labels - flow { - archived - name - project{ - name - } - } - parameters - state - state_message - task_runs { - state - task { - name - } - } - logs(where: {level: {_eq: "ERROR"}}) { - message - } - } - }""" + body = """{flow_run(where: {_and: [{state: {_nin: ["Scheduled", "Cancelled"]}}]}) { + id + name + start_time + end_time + labels + flow { + flow_group_id + archived + name + project{ + name + } + } + parameters + state + state_message + task_runs { + state + task { + name + } + } + logs(where: {level: {_eq: "ERROR"}}) { + message + } + } + }""" r = client.graphql( query= body @@ -84,3 +85,65 @@ def crawler(): selected_df.to_csv(full_filepath, index=False) return full_filepath + +@task +def crawler_flows(): + client = Client() + + body = """{ + flow(where: {archived: {_eq: false}, is_schedule_active: {_eq: true}}) { + name + version + created + schedule + flow_group_id + project{ + name + } + flow_group { + flows_aggregate { + aggregate { + min { + created + } + } + } + } + } + } + """ + + r = client.graphql( + query= body + ) + + df = pd.json_normalize(r,record_path=['data','flow'],sep='_') + + df_schedule_clocks = pd.json_normalize(df['schedule_clocks'].str[0],max_level=0, sep='_').add_prefix('schedule_') + df.drop(columns=['schedule_clocks',], inplace=True) + df_schedule_clocks.drop(columns=['schedule___version__'], inplace=True) + + df_parameters = pd.json_normalize(df_schedule_clocks['schedule_parameter_defaults']).add_prefix('schedule_parameters_') + standard_params = ['schedule_parameters_table_id', + 'schedule_parameters_dbt_alias', + 'schedule_parameters_dataset_id', + 'schedule_parameters_update_metadata', + 'schedule_parameters_materialization_mode', + 'schedule_parameters_materialize_after_dump'] + + df_final = pd.concat([df,df_schedule_clocks,df_parameters[standard_params]],axis = 1) + + + table_id = 'prefect_flows' + + # Define the folder path for storing the file + folder = f"tmp/{table_id}/" + # Create the folder if it doesn't exist + os.system(f"mkdir -p {folder}") + # Define the full file path for the CSV file + full_filepath = f"{folder}/{table_id}.csv" + # Save the DataFrame as a CSV file + df_final.to_csv(full_filepath, index=False) + + return full_filepath +