Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Tasks for loading GA data into Snowflake (PART 2) #722

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 318 additions & 0 deletions edx/analytics/tasks/common/bigquery_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
"""
Collection of abstract tasks for extracting tables from BigQuery.

BigQuery data can ONLY dump into Google Cloud Storage (GCS), not S3. Therefore, this module also
provides GCSToS3Task.
"""
import errno
import json
import logging
import subprocess
import time

import luigi

from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
from edx.analytics.tasks.util.s3_util import canonicalize_s3_url
from edx.analytics.tasks.util.url import ExternalURL, GCSMarkerTarget, get_target_from_url, url_path_join

log = logging.getLogger(__name__)

try:
from google.cloud import bigquery
from google.oauth2 import service_account
bigquery_available = True # pylint: disable=invalid-name
except ImportError:
log.warn('Unable to import Bigquery libraries')
# On hadoop slave nodes we don't have bigquery libraries installed,
# so just fail noisily if we attempt to use these libraries there.
bigquery_available = False # pylint: disable=invalid-name


class BigQueryExtractDownstreamMixin(OverwriteOutputMixin):
"""
Common luigi parameters for all BigQuery extract tasks and wrapper tasks.
"""
credentials = luigi.Parameter()
project = luigi.Parameter()
dataset = luigi.Parameter()


class BigQueryExtractTask(BigQueryExtractDownstreamMixin, luigi.Task):
"""
Abstract Task to extract one BigQuery table into Google Cloud Storage (GCS).

This task outputs a GCSMarkerTarget which represents the destination prefix for the dumped table
which gets broken up (automatically) into several 1GB JSON gzip'd files.
"""

output_target = None
required_tasks = None

# For example, output_url might be:
#
# gs://bucket/table
#
# in which case the following output files could be generated:
#
# gs://bucket/table/_SUCCESS
# gs://bucket/table/_metadata
# gs://bucket/table/0000001.json.gz
# gs://bucket/table/0000002.json.gz
# gs://bucket/table/0000003.json.gz
output_url = luigi.Parameter(
description='The GCS URL prefix of the output files, NOT including the filename pattern or trailing slash.'
)

def requires(self): # pylint: disable=missing-docstring
if self.required_tasks is None:
self.required_tasks = {
'credentials': ExternalURL(url=self.credentials),
}
return self.required_tasks

def output(self): # pylint: disable=missing-docstring
if self.output_target is None:
self.output_target = GCSMarkerTarget(
self.output_url,
credentials_file=self.input()['credentials'],
)
return self.output_target

@property
def table(self):
"""
Name (str) of the table in BQ to extract.

If this is a sharded table, do not include the date suffix, e.g. return
"ga_sessions_" instead of "ga_sessions_20190408".
"""
raise NotImplementedError

@property
def output_compression_type(self):
"""
The type of compression to use for the output files.

Return None for no compression.

All available options are listed here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.compression
"""
return 'GZIP'

@property
def output_format(self):
"""
The file format to use for the output files.

All available options are listed here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.destinationFormat
"""
return bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON

def _get_table_identifier(self):
"""
Construct a table identifier for the table to be extracted.

Override this method to include partition or shard suffixes. By default, this simply
returns the passed-in table without any suffixes.

Returns:
str: The table to extract.
"""
return self.table

def _get_destination_url_pattern(self):
"""
Generate the BQ-compatible GCS destination URL pattern including a single asterisk to
specify where the table dumps fan-out.

For example, if self.output_url is:

gs://bucket/table

then this function might return:

gs://bucket/table/*.json.gz

Returns:
str: GCS URL pattern in BQ extract_table format.
"""
if self.output_compression_type == 'GZIP':
filename_extension = 'json.gz'
else:
filename_extension = 'json'

# This will cause the leaf files themselves to just be named like "0000001.json.gz".
filename_pattern = '*.{}'.format(filename_extension)

return url_path_join(
self.output_url,
filename_pattern,
)

def _get_job_id(self):
"""
Returns:
str: Unique identifier to assign to the BQ job.
"""
return 'extract_{table}_{timestamp}'.format(table=self._get_table_identifier(), timestamp=int(time.time()))

def _make_bq_client(self):
"""
Construct a BigQuery client using the credentials file input target.
"""
with self.input()['credentials'].open('r') as credentials_file:
json_creds = json.load(credentials_file)
self.project_id = json_creds['project_id']
credentials = service_account.Credentials.from_service_account_info(json_creds)
return bigquery.Client(credentials=credentials, project=self.project_id)

def run(self): # pylint: disable=missing-docstring
self.check_bigquery_availability()

client = self._make_bq_client()
dataset_ref = client.dataset(self.dataset, project=self.project)
table_name = self._get_table_identifier()
table_ref = dataset_ref.table(table_name)

job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = self.output_format
if self.output_compression_type:
job_config.compression = self.output_compression_type
job = client.extract_table(
table_ref,
self._get_destination_url_pattern(),
job_config=job_config,
location='US',
job_id=self._get_job_id(),
)
log.info("Starting BigQuery Extract job.")
job.result() # Waits for job to complete.

self.output().touch_marker()

def check_bigquery_availability(self):
"""
Call to ensure fast failure if this machine doesn't have the Bigquery libraries available.
"""
if not bigquery_available:
raise ImportError('Bigquery library not available.')


class BigQueryExtractShardedTask(BigQueryExtractTask): # pylint: disable=abstract-method
"""
Abstract class for extracting BigQuery tables to GCS, specifically for tables sharded by date.

Be sure to include a trailing underscore when overriding the self.table property, if the table
name needs one.
"""
date = luigi.DateParameter()

def _get_date_suffix(self):
"""
Generate a date sharding suffix in BQ's typical format (YYYYMMDD).

Returns:
str: BQ date suffix.
"""
iso_date_string = self.date.isoformat()
return iso_date_string.replace('-', '')

def _get_table_identifier(self):
"""
Override super's method to add the shard date suffix to the table identifier.
"""
return '{}{}'.format(self.table, self._get_date_suffix())


class CopyGCSToS3Task(OverwriteOutputMixin, luigi.Task):
"""
Abstract Task to copy data from Google Cloud Storage (GCS) to Amazon S3.

Input to this task is a GCSMarkerTarget, and output is an S3MarkerTarget.
For example, if input and output URLs were as follows:

input = gs://bucket/foo
output = s3+https://bucket/bar

and the contents of the input were:

gs://bucket/foo/_SUCCESS
gs://bucket/foo/_metadata
gs://bucket/foo/0000001.json.gz
gs://bucket/foo/0000002.json.gz
gs://bucket/foo/0000003.json.gz

then, the following output files would be generated:

s3://bucket/bar/_SUCCESS
s3://bucket/bar/_metadata
s3://bucket/bar/0000001.json.gz
s3://bucket/bar/0000002.json.gz
s3://bucket/bar/0000003.json.gz
"""

output_url = luigi.Parameter(
description='The S3 URL prefix of the output files, NOT including the filename pattern or extension.'
)

output_target = None
required_tasks = None

def requires(self): # pylint: disable=missing-docstring
if self.required_tasks is None:
self.required_tasks = {
'source': self.insert_source_task,
}
return self.required_tasks

def output(self): # pylint: disable=missing-docstring
if self.output_target is None:
self.output_target = get_target_from_url(self.output_url, marker=True)
return self.output_target

@property
def insert_source_task(self):
"""
Override this to define the source task that outputs a GCSMarkerTarget to copy from.
"""
raise NotImplementedError

def _copy_data_from_gcs_to_s3(self, source_path, destination_path):
"""
Recursively copy a "directory" from GCS to S3.
"""
# Exclude any luigi marker files which should not be copied. The pattern is a Python
# regular expression.
exclusion_pattern = r'.*_SUCCESS$|.*_metadata$'
command = [
'gsutil', '-m', 'rsync', '-x', exclusion_pattern,
source_path,
canonicalize_s3_url(destination_path),
]
log.info(
'Invoking the following command to copy from GCS to S3: %s',
' '.join(command),
)
try:
return_code = subprocess.call(command)
except OSError as err:
if err.errno == errno.ENOENT:
log.error('Check that gsutil is installed.')
raise
if return_code != 0:
raise RuntimeError('Error {code} while syncing {source} to {destination}'.format(
code=return_code,
source=source_path,
destination=destination_path,
))

def run(self): # pylint: disable=missing-docstring
log.debug("Starting GCS Extract job.")
self._copy_data_from_gcs_to_s3(
self.input()['source'].path,
self.output_url,
)
self.output().touch_marker()
7 changes: 4 additions & 3 deletions edx/analytics/tasks/common/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import luigi.contrib.hdfs.format
import luigi.task
from luigi.date_interval import Custom
from luigi.contrib.s3 import S3Client

from edx.analytics.tasks.util import eventlog
from edx.analytics.tasks.util.s3_util import ScalableS3Client, generate_s3_sources, get_s3_bucket_key_names
from edx.analytics.tasks.util.s3_util import generate_s3_sources, get_s3_bucket_key_names
from edx.analytics.tasks.util.url import ExternalURL, UncheckedExternalURL, get_target_from_url, url_path_join

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,7 +58,7 @@ def generate_file_list(self):
if src.startswith('s3'):
# connect lazily as needed:
if self.s3_conn is None:
self.s3_conn = ScalableS3Client().s3
self.s3_conn = S3Client().s3
for _bucket, _root, path in generate_s3_sources(self.s3_conn, src, self.include, self.include_zero_length):
source = url_path_join(src, path)
yield ExternalURL(source)
Expand Down Expand Up @@ -186,7 +187,7 @@ def _get_requirements(self):

def _get_s3_urls(self, source):
"""Recursively list all files inside the source URL directory."""
s3_conn = ScalableS3Client().s3
s3_conn = S3Client().s3
bucket_name, root = get_s3_bucket_key_names(source)
bucket = s3_conn.get_bucket(bucket_name)
for key_metadata in bucket.list(root):
Expand Down
6 changes: 3 additions & 3 deletions edx/analytics/tasks/tests/acceptance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import boto
import pandas
from pandas.util.testing import assert_frame_equal, assert_series_equal
from luigi.contrib.s3 import S3Client

from edx.analytics.tasks.common.pathutil import PathSetTask
from edx.analytics.tasks.tests.acceptance.services import db, elasticsearch_service, fs, hive, task, vertica
from edx.analytics.tasks.util.s3_util import ScalableS3Client
from edx.analytics.tasks.util.url import get_target_from_url, url_path_join

log = logging.getLogger(__name__)
Expand All @@ -24,7 +24,7 @@ def when_s3_available(function):
s3_available = getattr(when_s3_available, 's3_available', None)
if s3_available is None:
try:
connection = ScalableS3Client().s3
connection = S3Client().s3
# ^ The above line will not error out if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# are set, so it can't be used to check if we have a valid connection to S3. Instead:
connection.get_all_buckets()
Expand Down Expand Up @@ -142,7 +142,7 @@ class AcceptanceTestCase(unittest.TestCase):

def setUp(self):
try:
self.s3_client = ScalableS3Client()
self.s3_client = S3Client()
except Exception:
self.s3_client = None

Expand Down
Loading