Skip to content

Commit

Permalink
Merge pull request #275 from georgetown-cset/269-automate-data
Browse files Browse the repository at this point in the history
Automate data updates
  • Loading branch information
jmelot authored Sep 15, 2023
2 parents 404b42a + 9f7f618 commit c150c60
Show file tree
Hide file tree
Showing 24 changed files with 321 additions and 46 deletions.
4 changes: 4 additions & 0 deletions .sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ exclude_rules = L014,L018,L027,L032,L034,L042,L044
[sqlfluff:rules]
tab_space_size = 2
max_line_length = 120

[sqlfluff:templater:jinja:context]
staging_dataset = staging_orca
production_dataset = orca
251 changes: 251 additions & 0 deletions orca_data_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import json
from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.operators.compute import (
ComputeEngineStartInstanceOperator,
ComputeEngineStopInstanceOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import (
BigQueryToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from dataloader.airflow_utils.defaults import (
DAGS_DIR,
DATA_BUCKET,
GCP_ZONE,
PROJECT_ID,
get_default_args,
get_post_success,
)
from dataloader.scripts.populate_documentation import update_table_descriptions

"""
This DAG retrieves data from GitHub and updates the tables in the `orca` BigQuery dataset
"""


production_dataset = "orca"
staging_dataset = f"staging_{production_dataset}"
backup_dataset = f"{production_dataset}_backups"
tmp_dir = f"{production_dataset}/tmp"
sql_dir = f"sql/{production_dataset}"
gce_resource_id = "orca-etl"
vm_working_dir = "current_run"

default_args = get_default_args()

dag = DAG(
"orca_updater",
default_args=default_args,
description="Updates ORCA data",
user_defined_macros={
"staging_dataset": staging_dataset,
"production_dataset": production_dataset,
},
schedule_interval="0 0 1 * *",
catchup=False,
)

with dag:
clear_dl_dir = GCSDeleteObjectsOperator(
task_id="clear_dl_dir", bucket_name=DATA_BUCKET, prefix=tmp_dir
)

extract_repo_mentions = BigQueryInsertJobOperator(
task_id="extract_repo_mentions",
configuration={
"query": {
"query": "{% include '" + f"{sql_dir}/repos_in_papers.sql" + "' %}",
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_ID,
"datasetId": staging_dataset,
"tableId": "repos_in_papers",
},
"allowLargeResults": True,
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE",
}
},
)

gce_instance_start = ComputeEngineStartInstanceOperator(
project_id=PROJECT_ID,
zone=GCP_ZONE,
resource_id=gce_resource_id,
task_id="start-" + gce_resource_id,
)

# Pull the repos from BQ, along with repos specified by custom lists
retrieve_repos_sequence = [
f"mkdir {vm_working_dir}",
f"cd {vm_working_dir}",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/scripts .",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/input_data .",
f"gsutil cp -r gs://{DATA_BUCKET}/{production_dataset}/code/requirements.txt .",
"python3 -m pip install -r requirements.txt",
"PYTHONPATH='.' python3 scripts/retrieve_repos.py --query_bq",
]
vm_script = f"rm -r {vm_working_dir};" + " && ".join(retrieve_repos_sequence)

retrieve_repos = BashOperator(
task_id="retrieve_repos",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
)

# Retrieve full metadata for each repo from the GitHub API
get_full_repo_metadata_sequence = [
f"cd {vm_working_dir}",
"PYTHONPATH='.' python3 scripts/backfill_top_level_repo_data.py",
]
vm_script = " && ".join(get_full_repo_metadata_sequence)

get_full_repo_metadata = BashOperator(
task_id="get_full_repo_metadata",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
)

# Scrape GitHub for READMEs and additional metadata we aren't otherwise able to collect
scrape_gh_sequence = [
f"cd {vm_working_dir}",
"PYTHONPATH='.' python3 scripts/retrieve_repo_metadata.py curr_repos_filled.jsonl curr_repos_final.jsonl",
f"gsutil cp curr_repos_final.jsonl gs://{DATA_BUCKET}/{tmp_dir}/",
]
vm_script = " && ".join(scrape_gh_sequence)

scrape_gh = BashOperator(
task_id="scrape_gh",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{vm_script}"',
)

gce_instance_stop = ComputeEngineStopInstanceOperator(
project_id=PROJECT_ID,
zone=GCP_ZONE,
resource_id=gce_resource_id,
task_id="stop-" + gce_resource_id,
)

load_data_to_bq = GCSToBigQueryOperator(
task_id="load_data_to_bq",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/curr_repos_final.jsonl"],
schema_object=f"{production_dataset}/schemas/repos_with_full_meta_raw.json",
destination_project_dataset_table=f"{staging_dataset}.repos_with_full_meta_raw",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
)

(
clear_dl_dir
>> extract_repo_mentions
>> gce_instance_start
>> retrieve_repos
>> get_full_repo_metadata
>> scrape_gh
>> gce_instance_stop
>> load_data_to_bq
)

curr = load_data_to_bq
downstream_seq_file = (
f"{DAGS_DIR}/sequences/{production_dataset}/downstream_order.txt"
)
for line in open(downstream_seq_file):
table_name = line.strip()
if not table_name:
continue
last = BigQueryInsertJobOperator(
task_id=f"create_{table_name}",
configuration={
"query": {
"query": "{% include '" + f"{sql_dir}/{table_name}.sql" + "' %}",
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_ID,
"datasetId": staging_dataset,
"tableId": table_name,
},
"allowLargeResults": True,
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE",
}
},
)
curr >> last
curr = last

wait_for_checks = DummyOperator(task_id="wait_for_checks")

checks = [
BigQueryCheckOperator(
task_id="check_pk_distinct_website_stats",
sql=f"select count(0) = count(distinct(id)) from {staging_dataset}.website_stats",
use_legacy_sql=False,
),
BigQueryCheckOperator(
task_id="check_distinct_repos_with_full_meta",
sql=f"select count(0) = count(distinct(concat(matched_owner, '/', matched_name))) from {staging_dataset}.repos_with_full_meta",
use_legacy_sql=False,
),
BigQueryCheckOperator(
task_id="check_no_readme_404s",
sql=f"select count(id) = 0 from {staging_dataset}.repos_with_full_meta where readme_text='404: Not Found'",
use_legacy_sql=False,
),
]

last >> checks >> wait_for_checks

msg_success = get_post_success("ORCA data updated!", dag)
curr_time = datetime.strftime(datetime.now(), "%Y_%m_%d")
with open(f"{DAGS_DIR}/schemas/{production_dataset}/table_info.json") as f:
table_desc = json.loads(f.read())

for table in ["website_stats", "repos_with_full_meta"]:
copy_to_prod = BigQueryToBigQueryOperator(
task_id=f"copy_{table}_to_prod",
source_project_dataset_tables=[f"{staging_dataset}.{table}"],
destination_project_dataset_table=f"{production_dataset}.{table}",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
)

pop_descriptions = (
PythonOperator(
task_id=f"populate_column_documentation_for_{table}",
op_kwargs={
"input_schema": f"{DAGS_DIR}/schemas/{production_dataset}/{table}.json",
"table_name": f"{production_dataset}.{table}",
"table_description": table_desc[table],
},
python_callable=update_table_descriptions,
)
if table != "website_stats"
else None
)

take_snapshot = BigQueryToBigQueryOperator(
task_id=f"snapshot_{table}",
source_project_dataset_tables=[f"{staging_dataset}.{table}"],
destination_project_dataset_table=f"{backup_dataset}.{table}_{curr_time}",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
)
wait_for_checks >> copy_to_prod
if table != "website_stats":
copy_to_prod >> pop_descriptions >> take_snapshot
else:
copy_to_prod >> take_snapshot
take_snapshot >> msg_success
17 changes: 12 additions & 5 deletions push_to_airflow.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
gsutil cp ml_repo_dag.py gs://us-east1-dev-888d59ac-bucket/dags/
gsutil cp scripts/* gs://us-east1-dev-888d59ac-bucket/dags/github_repo_monitor_scripts/
gsutil cp data/manually_collected_links.jsonl gs://airflow-data-exchange/github_repo_monitor/
gsutil cp schemas/* gs://airflow-data-exchange/github_repo_monitor/schemas/
gsutil cp schemas/* gs://us-east1-dev-888d59ac-bucket/dags/schemas/github_repo_monitor/
gsutil cp orca_data_pipeline.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/
gsutil -m rm gs://airflow-data-exchange/orca/schemas/*
gsutil -m cp schemas/* gs://airflow-data-exchange/orca/schemas/
gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/*
gsutil -m cp schemas/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/orca/
gsutil rm -r gs://airflow-data-exchange/orca/code
gsutil -m cp -r scripts gs://airflow-data-exchange/orca/code/
gsutil -m cp -r input_data gs://airflow-data-exchange/orca/code/
gsutil cp requirements.txt gs://airflow-data-exchange/orca/code/
gsutil cp sequences/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/orca/
gsutil -m rm gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/*
gsutil -m cp sql/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/orca/
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ functions-framework
pycountry
tqdm
coverage
google-cloud-secret-manager
4 changes: 4 additions & 0 deletions schemas/table_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"website_stats": "Metadata aggregated for ORCA",
"repos_with_full_meta": "Full metadata retrieved from scrapers and Github API calls"
}
9 changes: 9 additions & 0 deletions scripts/github_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from google.cloud import secretmanager

RATE_LIMIT_INTERVAL = 60 * 60 / 5000 + 0.2


Expand All @@ -9,8 +11,15 @@ def mk_auth() -> tuple:
tuple if set, otherwise complains
:return: Tuple of values of (GITHUB_ACCESS_TOKEN, GITHUB_USER)
"""
client = secretmanager.SecretManagerServiceClient()
gh_tok = os.environ.get("GITHUB_ACCESS_TOKEN")
if not gh_tok:
secret_name = "projects/gcp-cset-projects/secrets/github_api_key/versions/1"
secret = client.access_secret_version(request={"name": secret_name})
gh_tok = secret.payload.data.decode("UTF-8")
assert gh_tok, "Please set the GITHUB_ACCESS_TOKEN environment variable"
username = os.environ.get("GITHUB_USER")
if gh_tok and not username:
username = "jmelot"
assert username, "Please set the GITHUB_USER environment variable"
return username, gh_tok
3 changes: 1 addition & 2 deletions sequences/downstream_order.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ pypi_repo_metadata
pypi_over_time
curated_repos
repos_with_full_meta
repo_owners
events
push_event_commits
pull_requests_opened
star_events
issue_events
top_level1_fields
contributor_affiliations
top_cited_repo_citers
website_stats
2 changes: 1 addition & 1 deletion sql/annotation_data.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- prepares user country data for student annotators
with distinct_locations as (select distinct trim(lower(location)) as location from github_metrics.repo_owners)
with distinct_locations as (select distinct trim(lower(location)) as location from {{ production_dataset }}.repo_owners)

select

Expand Down
6 changes: 3 additions & 3 deletions sql/contributor_affiliations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ with aggregated_li as (
distinct repo_id
) as contributed_repos,
array_agg(distinct user_li_url) as linkedin_urls
from staging_github_metrics.push_event_commits
from {{ staging_dataset }}.push_event_commits
left join gcp_cset_revelio.user
on lower(contributor_name) = lower(concat(firstname, " ", lastname))
left join staging_github_metrics.repos_with_full_meta
left join {{ staging_dataset }}.repos_with_full_meta
on repo_id = id
where user_li_url is not null
group by contributor
Expand All @@ -36,7 +36,7 @@ contributor_mapping as (
from
aggregated_li
left join
github_metrics.li_annotation_data_20230104
{{ production_dataset }}.li_annotation_data_20230104
using (contributor)
where contributor not in
(select contributor from aggregated_li where array_length(linkedin_urls) = 1)
Expand Down
6 changes: 3 additions & 3 deletions sql/events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ relevant_repos AS (
FROM (
SELECT CONCAT(matched_owner, "/", matched_name) AS repo_name
FROM
staging_github_metrics.repos_with_full_meta_for_app
{{ staging_dataset }}.repos_with_full_meta
UNION DISTINCT
SELECT CONCAT(current_owner, "/", current_name) AS repo_name
FROM
staging_github_metrics.repos_with_full_meta_for_app
{{ staging_dataset }}.repos_with_full_meta
WHERE
current_name IS NOT NULL
UNION DISTINCT
SELECT repo AS repo_name
FROM
staging_github_metrics.repos_in_papers) ),
{{ staging_dataset }}.repos_in_papers) ),

curr_data AS (
SELECT
Expand Down
2 changes: 1 addition & 1 deletion sql/issue_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ SELECT DISTINCT
TRIM(JSON_EXTRACT(payload,
"$.action"), '"') AS action
FROM
staging_github_metrics.events
{{ staging_dataset }}.events
WHERE
type = "IssuesEvent"
Loading

0 comments on commit c150c60

Please sign in to comment.