Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
Kill ScalableS3Client because we have no more use for it.
Browse files Browse the repository at this point in the history
DE-1374 (PART 2)
  • Loading branch information
pwnage101 committed Apr 22, 2019
1 parent e39420e commit 00e65aa
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 26 deletions.
7 changes: 4 additions & 3 deletions edx/analytics/tasks/common/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import luigi.contrib.hdfs.format
import luigi.task
from luigi.date_interval import Custom
from luigi.contrib.s3 import S3Client

from edx.analytics.tasks.util import eventlog
from edx.analytics.tasks.util.s3_util import ScalableS3Client, generate_s3_sources, get_s3_bucket_key_names
from edx.analytics.tasks.util.s3_util import generate_s3_sources, get_s3_bucket_key_names
from edx.analytics.tasks.util.url import ExternalURL, UncheckedExternalURL, get_target_from_url, url_path_join

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,7 +58,7 @@ def generate_file_list(self):
if src.startswith('s3'):
# connect lazily as needed:
if self.s3_conn is None:
self.s3_conn = ScalableS3Client().s3
self.s3_conn = S3Client().s3
for _bucket, _root, path in generate_s3_sources(self.s3_conn, src, self.include, self.include_zero_length):
source = url_path_join(src, path)
yield ExternalURL(source)
Expand Down Expand Up @@ -186,7 +187,7 @@ def _get_requirements(self):

def _get_s3_urls(self, source):
"""Recursively list all files inside the source URL directory."""
s3_conn = ScalableS3Client().s3
s3_conn = S3Client().s3
bucket_name, root = get_s3_bucket_key_names(source)
bucket = s3_conn.get_bucket(bucket_name)
for key_metadata in bucket.list(root):
Expand Down
6 changes: 3 additions & 3 deletions edx/analytics/tasks/tests/acceptance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import boto
import pandas
from pandas.util.testing import assert_frame_equal, assert_series_equal
from luigi.contrib.s3 import S3Client

from edx.analytics.tasks.common.pathutil import PathSetTask
from edx.analytics.tasks.tests.acceptance.services import db, elasticsearch_service, fs, hive, task, vertica
from edx.analytics.tasks.util.s3_util import ScalableS3Client
from edx.analytics.tasks.util.url import get_target_from_url, url_path_join

log = logging.getLogger(__name__)
Expand All @@ -24,7 +24,7 @@ def when_s3_available(function):
s3_available = getattr(when_s3_available, 's3_available', None)
if s3_available is None:
try:
connection = ScalableS3Client().s3
connection = S3Client().s3
# ^ The above line will not error out if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# are set, so it can't be used to check if we have a valid connection to S3. Instead:
connection.get_all_buckets()
Expand Down Expand Up @@ -142,7 +142,7 @@ class AcceptanceTestCase(unittest.TestCase):

def setUp(self):
try:
self.s3_client = ScalableS3Client()
self.s3_client = S3Client()
except Exception:
self.s3_client = None

Expand Down
5 changes: 3 additions & 2 deletions edx/analytics/tasks/tests/acceptance/test_database_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

import gnupg

from luigi.contrib.s3 import S3Client

from edx.analytics.tasks.tests.acceptance import AcceptanceTestCase, when_exporter_available
from edx.analytics.tasks.tests.acceptance.services import shell
from edx.analytics.tasks.util.opaque_key_util import get_filename_safe_course_id, get_org_id_for_course
from edx.analytics.tasks.util.s3_util import ScalableS3Client
from edx.analytics.tasks.util.url import url_path_join

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -198,7 +199,7 @@ def validate_exporter_output(self, org_id, exported_filename):
"""
today = datetime.datetime.utcnow().strftime('%Y-%m-%d')
bucket = ScalableS3Client().s3.get_bucket(self.config.get('exporter_output_bucket'))
bucket = S3Client().s3.get_bucket(self.config.get('exporter_output_bucket'))
export_id = '{org}-{date}'.format(org=org_id, date=today)
filename = export_id + '.zip'
key = bucket.lookup(self.output_prefix + filename)
Expand Down
17 changes: 1 addition & 16 deletions edx/analytics/tasks/util/s3_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,6 @@ def func(name):
return (n for n in names if func(n))


# TODO: Once we upgrade to boto3 (luigi>=2.7.6), delete this class! In boto3/luigi>=2.7.6, we must
# NOT pass `host` to the s3 client or else it will throw a KeyError. boto3 will already default to
# s3.amazonaws.com.
class ScalableS3Client(S3Client):
"""
S3 client that adds support for defaulting host name to s3.amazonaws.com.
"""

def __init__(self, **kwargs):
if 'host' not in kwargs:
kwargs['host'] = self._get_s3_config('host') or 's3.amazonaws.com'

super(ScalableS3Client, self).__init__(**kwargs)


class S3HdfsTarget(HdfsTarget):
"""HDFS target that supports writing and reading files directly in S3."""

Expand All @@ -157,7 +142,7 @@ def open(self, mode='r'):
else:
safe_path = self.path.replace('s3n://', 's3://')
if not hasattr(self, 's3_client'):
self.s3_client = ScalableS3Client()
self.s3_client = S3Client()
return AtomicS3File(safe_path, self.s3_client, policy=DEFAULT_KEY_ACCESS_POLICY)


Expand Down
3 changes: 1 addition & 2 deletions edx/analytics/tasks/util/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from luigi.contrib.hdfs.target import HdfsTarget
from luigi.contrib.s3 import S3Target

from edx.analytics.tasks.util.s3_util import DEFAULT_KEY_ACCESS_POLICY, S3HdfsTarget, ScalableS3Client
from edx.analytics.tasks.util.s3_util import DEFAULT_KEY_ACCESS_POLICY, S3HdfsTarget

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -220,7 +220,6 @@ def get_target_class_from_url(url, marker=False):
# everything else off the url and pass that in to the target.
url = parsed_url.path
if issubclass(target_class, S3Target):
kwargs['client'] = ScalableS3Client()
kwargs['policy'] = DEFAULT_KEY_ACCESS_POLICY
if issubclass(target_class, GCSTarget):
raise NotImplementedError(
Expand Down

0 comments on commit 00e65aa

Please sign in to comment.