Skip to content

Commit

Permalink
Move to cc2, add sensors to long-running tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jmelot committed Mar 11, 2024
1 parent 574c6a5 commit d4041c2
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 45 deletions.
107 changes: 68 additions & 39 deletions orca_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryInsertJobOperator,
Expand All @@ -14,6 +14,7 @@
ComputeEngineStopInstanceOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import (
BigQueryToBigQueryOperator,
)
Expand All @@ -37,30 +38,33 @@
This DAG retrieves data from GitHub and updates the tables in the `orca` BigQuery dataset
"""

args = get_default_args(pocs=["Jennifer"])
args["retries"] = 1
args["on_failure_callback"] = None

production_dataset = "orca"
staging_dataset = f"staging_{production_dataset}"
backup_dataset = f"{production_dataset}_backups"
tmp_dir = f"{production_dataset}/tmp"
sql_dir = f"sql/{production_dataset}"
gce_resource_id = "orca-etl"
vm_working_dir = "current_run"

default_args = get_default_args()

dag = DAG(
with DAG(
"orca_updater",
default_args=default_args,
default_args=args,
description="Updates ORCA data",
user_defined_macros={
"staging_dataset": staging_dataset,
"production_dataset": production_dataset,
},
schedule_interval="0 0 1 * *",
catchup=False,
)
) as dag:
tmp_dir = f"{production_dataset}/tmp"
sql_dir = f"sql/{production_dataset}"
gce_resource_id = "orca-etl"
ssh_command = (
f"gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "
+ '"{}"'
)

with dag:
clear_dl_dir = GCSDeleteObjectsOperator(
task_id="clear_dl_dir", bucket_name=DATA_BUCKET, prefix=tmp_dir
)
Expand Down Expand Up @@ -90,46 +94,67 @@
task_id="start-" + gce_resource_id,
)

# Pull the repos from BQ, along with repos specified by custom lists
retrieve_repos_sequence = [
f"mkdir {vm_working_dir}",
f"cd {vm_working_dir}",
working_dir = "current_run"
prep_environment_sequence = [
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/scripts/*.sh .",
f"mkdir {working_dir}",
f"cd {working_dir}",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/scripts .",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/input_data .",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/requirements.txt .",
"python3 -m pip install -r requirements.txt",
"PYTHONPATH='.' python3 scripts/retrieve_repos.py --query_bq",
]
vm_script = f"rm -r {vm_working_dir};" + " && ".join(retrieve_repos_sequence)
prep_environment_script = f"rm -r {working_dir};" + " && ".join(
prep_environment_sequence
)
prep_environment = BashOperator(
task_id="prep_environment",
bash_command=ssh_command.format(prep_environment_script),
)

# Pull the repos from BQ, along with repos specified by custom lists
retrieve_repos = BashOperator(
task_id="retrieve_repos",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
bash_command=ssh_command.format(
f"bash retrieve_repos.sh {working_dir} {DATA_BUCKET} "
f"{production_dataset} &> retrieve_repos_log &"
),
)
wait_for_retrieve_repos = GCSObjectExistenceSensor(
task_id="wait_for_retrieve_repos",
bucket=DATA_BUCKET,
object=f"{production_dataset}/done_files/retrieve_repos",
deferrable=True,
)

# Retrieve full metadata for each repo from the GitHub API
get_full_repo_metadata_sequence = [
f"cd {vm_working_dir}",
"PYTHONPATH='.' python3 scripts/backfill_top_level_repo_data.py",
]
vm_script = " && ".join(get_full_repo_metadata_sequence)

get_full_repo_metadata = BashOperator(
task_id="get_full_repo_metadata",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
get_full_metadata = BashOperator(
task_id="get_full_metadata",
bash_command=ssh_command.format(
f"bash get_full_metadata.sh {working_dir} {DATA_BUCKET} "
f"{production_dataset} &> get_full_metadata_log &"
),
)
wait_for_get_full_metadata = GCSObjectExistenceSensor(
task_id="wait_for_get_full_metadata",
bucket=DATA_BUCKET,
object=f"{production_dataset}/done_files/get_full_metadata",
deferrable=True,
)

# Scrape GitHub for READMEs and additional metadata we aren't otherwise able to collect
scrape_gh_sequence = [
f"cd {vm_working_dir}",
"PYTHONPATH='.' python3 scripts/retrieve_repo_metadata.py curr_repos_filled.jsonl curr_repos_final.jsonl",
f"gsutil cp curr_repos_final.jsonl gs://{DATA_BUCKET}/{tmp_dir}/",
]
vm_script = " && ".join(scrape_gh_sequence)

scrape_gh = BashOperator(
task_id="scrape_gh",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
scrape_additional_metadata = BashOperator(
task_id="scrape_additional_metadata",
bash_command=ssh_command.format(
f"bash scrape_additional_metadata.sh {working_dir} {DATA_BUCKET} "
f"{production_dataset} {tmp_dir} &> scrape_additional_metadata_log &"
),
)
wait_for_scrape_additional_metadataa = GCSObjectExistenceSensor(
task_id="wait_for_scrape_additional_metadata",
bucket=DATA_BUCKET,
object=f"{production_dataset}/done_files/scrape_additional_metadata",
deferrable=True,
)

gce_instance_stop = ComputeEngineStopInstanceOperator(
Expand All @@ -154,9 +179,13 @@
clear_dl_dir
>> extract_repo_mentions
>> gce_instance_start
>> prep_environment
>> retrieve_repos
>> get_full_repo_metadata
>> scrape_gh
>> wait_for_retrieve_repos
>> get_full_metadata
>> wait_for_get_full_metadata
>> scrape_additional_metadata
>> wait_for_scrape_additional_metadataa
>> gce_instance_stop
>> load_data_to_bq
)
Expand Down
12 changes: 6 additions & 6 deletions push_to_airflow.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
gsutil cp orca_data_pipeline.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/
gsutil cp orca_data_pipeline.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/
gsutil -m rm gs://airflow-data-exchange/orca/schemas/*
gsutil -m cp schemas/* gs://airflow-data-exchange/orca/schemas/
gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/*
gsutil -m cp schemas/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/
gsutil -m rm gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/orca/*
gsutil -m cp schemas/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/orca/
gsutil rm -r gs://airflow-data-exchange/orca/code
gsutil -m cp -r scripts gs://airflow-data-exchange/orca/code/
gsutil -m cp -r input_data gs://airflow-data-exchange/orca/code/
gsutil cp requirements.txt gs://airflow-data-exchange/orca/code/
gsutil cp sequences/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/orca/
gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/*
gsutil -m cp sql/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/
gsutil cp sequences/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sequences/orca/
gsutil -m rm gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/orca/*
gsutil -m cp sql/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/orca/
11 changes: 11 additions & 0 deletions scripts/get_full_metadata.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

working_dir=$1
data_bucket=$2
production_dataset=$3
done_file="get_full_metadata"

cd $working_dir
gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}"
PYTHONPATH='.' python3 scripts/backfill_top_level_repo_data.py && touch $done_file
gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/"
11 changes: 11 additions & 0 deletions scripts/retrieve_repos.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

working_dir=$1
data_bucket=$2
production_dataset=$3
done_file="retrieve_repos"

cd $working_dir
gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}"
PYTHONPATH='.' python3 scripts/retrieve_repos.py --query_bq && touch $done_file
gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/"
13 changes: 13 additions & 0 deletions scripts/scrape_additional_metadata.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

working_dir=$1
data_bucket=$2
production_dataset=$3
tmp_dir=$4
done_file="scrape_additional_metadata"

cd $working_dir
gsutil rm "gs://${data_bucket}/${production_dataset}/done_files/${done_file}"
PYTHONPATH='.' python3 scripts/retrieve_repo_metadata.py curr_repos_filled.jsonl curr_repos_final.jsonl
gsutil cp curr_repos_final.jsonl "gs://${data_bucket}/${tmp_dir}" && touch $done_file
gsutil cp $done_file "gs://${data_bucket}/${production_dataset}/done_files/"

0 comments on commit d4041c2

Please sign in to comment.