Skip to content

Commit

Permalink
inclue prefect flowa
Browse files Browse the repository at this point in the history
  • Loading branch information
laura-l-amaral committed Jan 11, 2024
1 parent 4294f57 commit a318e4f
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 37 deletions.
84 changes: 77 additions & 7 deletions pipelines/datasets/br_bd_metadados/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@

from pipelines.constants import constants
from pipelines.datasets.br_bd_metadados.schedules import (
every_day_prefect,
every_day_prefect_flow_runs,
every_day_prefect_flows,
)
from pipelines.datasets.br_bd_metadados.tasks import (
crawler,
crawler_flow_runs,
crawler_flows,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand All @@ -31,7 +33,7 @@
code_owners=[
"lauris",
],
) as bd_prefect:
) as bd_prefect_flow_runs:
# Parameters
dataset_id = Parameter("dataset_id", default="br_bd_metadados", required=True)
table_id = Parameter("table_id", default="prefect_flow_runs", required=True)
Expand All @@ -49,7 +51,7 @@
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

filepath = crawler()
filepath = crawler_flow_runs()

# pylint: disable=C0103
wait_upload_table = create_table_and_upload_to_gcs(
Expand Down Expand Up @@ -89,6 +91,74 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

bd_prefect.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
bd_prefect.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
bd_prefect.schedule = every_day_prefect
bd_prefect_flow_runs.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
bd_prefect_flow_runs.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
bd_prefect_flow_runs.schedule = every_day_prefect_flow_runs


with Flow(
name="br_bd_metadados.prefect_flows",
code_owners=[
"lauris",
],
) as bd_prefect_flows:
# Parameters
dataset_id = Parameter("dataset_id", default="br_bd_metadados", required=True)
table_id = Parameter("table_id", default="prefect_flows", required=True)

materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)

materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)

filepath = crawler_flows()

# pylint: disable=C0103
wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=filepath,
)

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

bd_prefect_flows.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
bd_prefect_flows.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
bd_prefect_flows.schedule = every_day_prefect_flows
21 changes: 20 additions & 1 deletion pipelines/datasets/br_bd_metadados/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pipelines.constants import constants

every_day_prefect = Schedule(
every_day_prefect_flow_runs = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
Expand All @@ -28,3 +28,22 @@
),
],
)

every_day_prefect_flows = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2022, 9, 20, 10, 00),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_bd_metadados",
"table_id": "prefect_flows",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
},
),
],
)
121 changes: 92 additions & 29 deletions pipelines/datasets/br_bd_metadados/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timedelta
import os
import pandas as pd
from pipelines.utils.utils import log

from prefect import task
from prefect import Client
Expand All @@ -16,37 +17,37 @@
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def crawler():
def crawler_flow_runs():
client = Client()

body = """{
flow_run(where: {_and: [{state: {_nin: ["Scheduled", "Cancelled"]}}]}) {
id
name
start_time
end_time
labels
flow {
archived
name
project{
name
}
}
parameters
state
state_message
task_runs {
state
task {
name
}
}
logs(where: {level: {_eq: "ERROR"}}) {
message
}
}
}"""
body = """{flow_run(where: {_and: [{state: {_nin: ["Scheduled", "Cancelled"]}}]}) {
id
name
start_time
end_time
labels
flow {
flow_group_id
archived
name
project{
name
}
}
parameters
state
state_message
task_runs {
state
task {
name
}
}
logs(where: {level: {_eq: "ERROR"}}) {
message
}
}
}"""

r = client.graphql(
query= body
Expand Down Expand Up @@ -84,3 +85,65 @@ def crawler():
selected_df.to_csv(full_filepath, index=False)

return full_filepath

@task
def crawler_flows():
client = Client()

body = """{
flow(where: {archived: {_eq: false}, is_schedule_active: {_eq: true}}) {
name
version
created
schedule
flow_group_id
project{
name
}
flow_group {
flows_aggregate {
aggregate {
min {
created
}
}
}
}
}
}
"""

r = client.graphql(
query= body
)

df = pd.json_normalize(r,record_path=['data','flow'],sep='_')

df_schedule_clocks = pd.json_normalize(df['schedule_clocks'].str[0],max_level=0, sep='_').add_prefix('schedule_')
df.drop(columns=['schedule_clocks',], inplace=True)
df_schedule_clocks.drop(columns=['schedule___version__'], inplace=True)

df_parameters = pd.json_normalize(df_schedule_clocks['schedule_parameter_defaults']).add_prefix('schedule_parameters_')
standard_params = ['schedule_parameters_table_id',
'schedule_parameters_dbt_alias',
'schedule_parameters_dataset_id',
'schedule_parameters_update_metadata',
'schedule_parameters_materialization_mode',
'schedule_parameters_materialize_after_dump']

df_final = pd.concat([df,df_schedule_clocks,df_parameters[standard_params]],axis = 1)


table_id = 'prefect_flows'

# Define the folder path for storing the file
folder = f"tmp/{table_id}/"
# Create the folder if it doesn't exist
os.system(f"mkdir -p {folder}")
# Define the full file path for the CSV file
full_filepath = f"{folder}/{table_id}.csv"
# Save the DataFrame as a CSV file
df_final.to_csv(full_filepath, index=False)

return full_filepath

0 comments on commit a318e4f

Please sign in to comment.