Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
max-ostapenko authored and GCP Dataform committed Oct 8, 2024
2 parents 7614512 + 4f09997 commit d28389b
Show file tree
Hide file tree
Showing 19 changed files with 580 additions and 153 deletions.
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# HTTP Archive BigQuery pipeline with Dataform

## Datasets
This repo handles the HTTP Archive data pipeline, which takes the results of the monthly HTTP Archive run and saves this to the `httparchive` dataset in BigQuery.

## Pipelines

The pipelines are run in Dataform service in Google Cloud Platform (GCP) and are kicked off automatically on crawl completion and other events. The code in the `main` branch is used on each triggered pipeline run.

### Crawl results

Expand Down Expand Up @@ -67,6 +71,16 @@ Tag: `crawl_results_legacy`

### Dataform development workspace hints

1. In workflow settings vars set `dev_name: dev` to process sampled data in dev workspace.
2. Change `today` variable to a date in the past. May be helpful for testing pipelines based on `chrome-ux-report` data.
3. `definitions/extra/test_env.sqlx` script helps to setup the tables required to run pipelines when in dev workspace. It's disabled by default.
1. In workflow settings vars:
1. set `env_name: dev` to process sampled data in dev workspace.
2. change `today` variable to a month in the past. May be helpful for testing pipelines based on `chrome-ux-report` data.
2. `definitions/extra/test_env.sqlx` script helps to setup the tables required to run pipelines when in dev workspace. It's disabled by default.

### Error Monitoring

The issues within the pipeline are being tracked using the following alerts:

1. the event trigger processing fails - [Dataform Trigger Function Error](https://console.cloud.google.com/monitoring/alerting/policies/3950167380893746326?authuser=7&project=httparchive)
2. a job in the workflow fails - "[Dataform Workflow Invocation Failed](https://console.cloud.google.com/monitoring/alerting/policies/7137542315653007241?authuser=7&project=httparchive)

Error notifications are sent to [#10x-infra](https://httparchive.slack.com/archives/C030V4WAVL3) Slack channel.
67 changes: 35 additions & 32 deletions definitions/extra/test_env.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,48 @@
const past_month = constants.fn_past_month(constants.current_month);
const date = constants.current_month;

operate("test_env", {
hasOutput: true,
disabled: true // MUST be disabled in main branch
}).queries(ctx => `
CREATE SCHEMA IF NOT EXISTS all_dev;
var resources_list = [
{ datasetId: "all", tableId: "pages" },
{ datasetId: "all", tableId: "requests" },
//{datasetId: "all", tableId: "parsed_css"},
//{datasetId: "core_web_vitals", tableId: "technologies"},
];

CREATE TABLE IF NOT EXISTS ${ctx.resolve("all", "pages")} AS
SELECT *
FROM httparchive.all.pages
WHERE
date = '${constants.current_month}'
${constants.dev_rank5000_filter};
resources_list.forEach(resource => {
operate(
`test_table ${resource.datasetId}_${resource.tableId}`,
{ hasOutput: true }
).queries(`
CREATE SCHEMA IF NOT EXISTS ${resource.datasetId}_dev;
CREATE TABLE IF NOT EXISTS ${ctx.resolve("all", "requests")} AS
SELECT *
FROM httparchive.all.requests ${constants.dev_TABLESAMPLE}
WHERE date = '${constants.current_month}';
DROP TABLE IF EXISTS ${resource.datasetId}_dev.dev_${resource.tableId};
CREATE TABLE IF NOT EXISTS ${ctx.resolve("all", "parsed_css")} AS
CREATE TABLE IF NOT EXISTS ${resource.datasetId}_dev.dev_${resource.tableId} AS
SELECT *
FROM httparchive.all.parsed_css
WHERE date = '${constants.current_month}'
${constants.dev_rank5000_filter};
FROM \`${resource.datasetId}.${resource.tableId}\` ${constants.dev_TABLESAMPLE}
WHERE date = '${date}'
`);
})

CREATE SCHEMA IF NOT EXISTS core_web_vitals_dev;
operate("test_table blink_features_dev_dev_usage", {
hasOutput: true,
}).queries(`
CREATE SCHEMA IF NOT EXISTS blink_features_dev;
CREATE TABLE IF NOT EXISTS ${ctx.resolve("core_web_vitals", "technologies")} AS
CREATE TABLE IF NOT EXISTS blink_features_dev.dev_usage AS
SELECT *
FROM httparchive.core_web_vitals.technologies ${constants.dev_TABLESAMPLE}
WHERE date = '${past_month}';
FROM blink_features.usage ${constants.dev_TABLESAMPLE}
WHERE yyyymmdd = '${date}';
`)

operate("test_table blink_features_dev_dev_features", {
hasOutput: true,
}).queries(`
CREATE SCHEMA IF NOT EXISTS blink_features_dev;
CREATE TABLE IF NOT EXISTS ${ctx.resolve("blink_features", "usage")} AS
SELECT *
FROM httparchive.blink_features.usage ${constants.dev_TABLESAMPLE}
WHERE yyyymmdd = '${past_month}';
DROP TABLE IF EXISTS blink_features_dev.dev_features;
CREATE TABLE IF NOT EXISTS ${ctx.resolve("blink_features", "features")} AS
CREATE TABLE IF NOT EXISTS blink_features_dev.dev_features AS
SELECT *
FROM httparchive.blink_features.features ${constants.dev_TABLESAMPLE}
WHERE yyyymmdd = DATE '${past_month}';
`)
FROM blink_features.features ${constants.dev_TABLESAMPLE}
WHERE yyyymmdd = DATE '${date}';
`)
44 changes: 28 additions & 16 deletions definitions/output/all/pages.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
publish("pages", {
type: "incremental",
protected: true,
schema: "all",
bigquery: {
partitionBy: "date",
clusterBy: ["client", "is_root_page", "rank"],
requirePartitionFilter: true
},
tags: ["crawl_results_all"],
type: "incremental",
protected: true,
schema: "all",
bigquery: {
partitionBy: "date",
clusterBy: ["client", "is_root_page", "rank"],
requirePartitionFilter: true
},
tags: ["crawl_results_all"],
}).preOps(ctx => `
DELETE FROM ${ctx.self()}
WHERE date = '${constants.current_month}';
`).query(ctx => `
SELECT *
FROM ${ctx.ref("crawl_staging", "pages")} ${constants.dev_TABLESAMPLE}
WHERE date = '${constants.current_month}' AND client = 'desktop' AND is_root_page = TRUE
FROM ${ctx.ref("crawl_staging", "pages")}
WHERE date = '${constants.current_month}'
AND client = 'desktop'
AND is_root_page = TRUE
${constants.dev_rank_filter}
`).postOps(ctx => `
INSERT INTO ${ctx.self()}
SELECT *
FROM ${ctx.ref("crawl_staging", "pages")} ${constants.dev_TABLESAMPLE}
WHERE date = '${constants.current_month}' AND client = 'desktop' AND is_root_page = FALSE;
FROM ${ctx.ref("crawl_staging", "pages")}
WHERE date = '${constants.current_month}'
AND client = 'desktop'
AND is_root_page = FALSE
${constants.dev_rank_filter};
INSERT INTO ${ctx.self()}
SELECT *
FROM ${ctx.ref("crawl_staging", "pages")} ${constants.dev_TABLESAMPLE}
WHERE date = '${constants.current_month}' AND client = 'mobile' AND is_root_page = TRUE;
WHERE date = '${constants.current_month}'
AND client = 'mobile'
AND is_root_page = TRUE
${constants.dev_rank_filter};
INSERT INTO ${ctx.self()}
SELECT *
FROM ${ctx.ref("crawl_staging", "pages")} ${constants.dev_TABLESAMPLE}
WHERE date = '${constants.current_month}' AND client = 'mobile' AND is_root_page = FALSE
FROM ${ctx.ref("crawl_staging", "pages")}
WHERE date = '${constants.current_month}'
AND client = 'mobile'
AND is_root_page = FALSE
${constants.dev_rank_filter};
`)
30 changes: 17 additions & 13 deletions definitions/output/all/parsed_css.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
publish("parsed_css", {
type: "incremental",
protected: true,
schema: "all",
bigquery: {
partitionBy: "date",
clusterBy: ["client", "is_root_page", "rank", "page"],
requirePartitionFilter: true
},
tags: ["crawl_results_all"],
type: "incremental",
protected: true,
schema: "all",
bigquery: {
partitionBy: "date",
clusterBy: ["client", "is_root_page", "rank", "page"],
requirePartitionFilter: true
},
tags: ["crawl_results_all"],
}).preOps(ctx => `
DELETE FROM ${ctx.self()}
WHERE date = '${constants.current_month}';
`).query(ctx => `
SELECT *
FROM ${ctx.ref("crawl_staging", "parsed_css")} ${constants.dev_TABLESAMPLE}
WHERE date = '${constants.current_month}' AND client = 'desktop'
FROM ${ctx.ref("crawl_staging", "parsed_css")}
WHERE date = '${constants.current_month}'
AND client = 'desktop'
${constants.dev_rank_filter}
`).postOps(ctx => `
INSERT INTO ${ctx.self()}
SELECT *
FROM ${ctx.ref("crawl_staging", "parsed_css")} ${constants.dev_TABLESAMPLE}
WHERE date = '${constants.current_month}' AND client = 'mobile'
FROM ${ctx.ref("crawl_staging", "parsed_css")}
WHERE date = '${constants.current_month}'
AND client = 'mobile'
${constants.dev_rank_filter};
`)
Loading

0 comments on commit d28389b

Please sign in to comment.