diff --git a/.isort.cfg b/.isort.cfg index 241a19da9e..bdac169f2c 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,3 +1,4 @@ [settings] line_length=120 multi_line_output=5 +known_future_library=future diff --git a/.travis.yml b/.travis.yml index b22537b18f..bc0ad497bd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,33 @@ env: - secure: NLqmm18NpV3JRwD4CaugXm5cMWgxjdOA88xRFocmmVrduv0QT9JxBZFGebLYmFQOoKNJ23hz6g3EHe1aWfhLYnr1iUYerrwIriSI1wzuqbXJBRN6gO2n3YW+IfG83OLMZkOIMswT8MEdT3JPWVJL3bsocjHp8bYhRCt1KTCMJjY= - secure: aG8l39jaLFWXB5CEOOAR9mJTT3GnqxCl/oFM/7NvTZCBoSWIPIztpFhSAkRE9xSIiKUKXakZcL5H349NLC28jdlHPVsNAaKKt2YNhB6MjmePihp3RPwZGn8c/SjslwY7DPVUKMdWsI7AVNJBH8ab30OPxKwXFAMOiJJza206CYQ= +# TODO: re-introduce the coverage test. +matrix: + # Mark travis build as finished before jobs under allow_failures complete. + fast_finish: true + + include: + # Standard unit tests. + - name: "Python 2.7 Unit Tests" + env: TEST_SUITE=test-docker + + # Python 3 whitelisted and full unit test jobs. Once python 3 support is + # complete, delete the whitelist job and remove the full job from + # allow_failures. + - name: "Python 3.x Whitelisted Unit Tests" + env: TEST_SUITE=test-docker-py3-whitelist + - name: "Python 3.x FULL Unit Tests" + env: TEST_SUITE=test-docker-py3 + + - name: "Quality Tests" + env: TEST_SUITE=quality-docker + + # Names of jobs (defined above) that cannot fail the travis build even if + # they fail. + allow_failures: + - name: "Python 3.x FULL Unit Tests" + - name: "Quality Tests" # This is here because isort is a hot mess right now. + # Do NOT install Python requirements. # Doing so is a waste of time since they won't be used. install: true @@ -37,10 +64,7 @@ before_install: # Ensure we have a place to store coverage output - mkdir -p coverage -script: - - make test-docker - - make quality-docker - - make coverage-docker +script: make $TEST_SUITE after_success: - pip install --upgrade codecov diff --git a/Makefile b/Makefile index 46488d5bf1..9ac03d1b6d 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ uninstall: pip install -r requirements/pip.txt - while pip uninstall -y edx.analytics.tasks; do true; done + pip uninstall -y edx.analytics.tasks python setup.py clean install: requirements uninstall @@ -28,7 +28,7 @@ docker-shell: system-requirements: ifeq (,$(wildcard /usr/bin/yum)) # This is not great, we can't use these libraries on slave nodes using this method. - sudo apt-get install -y -q libmysqlclient-dev libpq-dev python-dev libffi-dev libssl-dev libxml2-dev libxslt1-dev + sudo apt-get install -y -q libmysqlclient-dev libpq-dev python-dev python3-dev libffi-dev libssl-dev libxml2-dev libxslt1-dev else sudo yum install -y -q postgresql-devel libffi-devel endif @@ -56,20 +56,48 @@ upgrade: ## update the requirements/*.txt files with the latest packages satisfy CUSTOM_COMPILE_COMMAND="make upgrade" pip-compile --upgrade -o requirements/docs.txt requirements/docs.in CUSTOM_COMPILE_COMMAND="make upgrade" pip-compile --upgrade -o requirements/test.txt requirements/test.in -test-docker-local: - docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make develop-local test-local - +# Entry point for running python 2 unit tests in CI. test-docker: - docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make reset-virtualenv test-requirements develop-local test-local + docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make system-requirements reset-virtualenv test-requirements develop-local test-local +# Entry point for running python 3 unit tests in CI. test-docker-py3: - docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make reset-virtualenv-py3 test-requirements develop-local test-local + docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make system-requirements reset-virtualenv-py3 test-requirements develop-local test-local + +# Entry point for running python 3 unit tests in CI. Only invokes a subset +# (whitelist) of unit tests which are known to pass under python 3. +test-docker-py3-whitelist: + docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make system-requirements reset-virtualenv-py3 test-requirements develop-local test-local-py3-whitelist test-local: # TODO: when we have better coverage, modify this to actually fail when coverage is too low. rm -rf .coverage LUIGI_CONFIG_PATH='config/test.cfg' python -m coverage run --rcfile=./.coveragerc -m nose --with-xunit --xunit-file=unittests.xml -A 'not acceptance' +# Speical test-local target specifically for running a whitelist of tests which +# are known to pass under python 3 +test-local-py3-whitelist: + # TODO: when we have better coverage, modify this to actually fail when coverage is too low. + rm -rf .coverage + LUIGI_CONFIG_PATH='config/test.cfg' python -m coverage run --rcfile=./.coveragerc -m nose --with-xunit --xunit-file=unittests.xml -A 'not acceptance' \ + edx.analytics.tasks.enterprise.tests \ + edx.analytics.tasks.insights.tests.test_database_imports \ + edx.analytics.tasks.insights.tests.test_grades \ + edx.analytics.tasks.monitor.tests.test_overall_events \ + edx.analytics.tasks.tests \ + edx.analytics.tasks.util.tests.helpers \ + edx.analytics.tasks.util.tests.opaque_key_mixins \ + edx.analytics.tasks.util.tests.test_decorators \ + edx.analytics.tasks.util.tests.test_geolocation \ + edx.analytics.tasks.util.tests.test_hive \ + edx.analytics.tasks.util.tests.test_retry \ + edx.analytics.tasks.util.tests.test_s3_util \ + edx.analytics.tasks.util.tests.test_url \ + edx.analytics.tasks.warehouse.financial.tests \ + edx.analytics.tasks.warehouse.tests.test_internal_reporting_active_users \ + edx.analytics.tasks.warehouse.tests.test_internal_reporting_database \ + edx.analytics.tasks.warehouse.tests.test_run_vertica_sql_scripts + test: test-requirements develop test-local test-acceptance: test-requirements @@ -98,7 +126,7 @@ quality-docker-local: docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make develop-local quality-local quality-docker: - docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make reset-virtualenv test-requirements develop-local quality-local + docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest make system-requirements reset-virtualenv test-requirements develop-local quality-local coverage-docker: docker run --rm -u root -v `(pwd)`:/edx/app/analytics_pipeline/analytics_pipeline -it edxops/analytics_pipeline:latest coverage xml diff --git a/edx/analytics/tasks/common/bigquery_load.py b/edx/analytics/tasks/common/bigquery_load.py index 27a9f4f762..83334b166e 100644 --- a/edx/analytics/tasks/common/bigquery_load.py +++ b/edx/analytics/tasks/common/bigquery_load.py @@ -1,10 +1,12 @@ +from __future__ import absolute_import + import json import logging import os import subprocess import tempfile import time -import urlparse +from six.moves.urllib.parse import urlparse import luigi @@ -216,7 +218,7 @@ def field_delimiter(self): @property def null_marker(self): - return '\N' + return r'\N' @property def quote_character(self): @@ -262,7 +264,7 @@ def init_copy(self, client): self.output().clear_marker_table() def _get_destination_from_source(self, source_path): - parsed_url = urlparse.urlparse(source_path) + parsed_url = urlparse(source_path) destination_path = url_path_join('gs://{}'.format(parsed_url.netloc), parsed_url.path) return destination_path diff --git a/edx/analytics/tasks/common/mapreduce.py b/edx/analytics/tasks/common/mapreduce.py index 071ee8eeea..19682280f1 100644 --- a/edx/analytics/tasks/common/mapreduce.py +++ b/edx/analytics/tasks/common/mapreduce.py @@ -7,7 +7,7 @@ import logging import logging.config import os -import StringIO +from io import StringIO from hashlib import md5 import luigi @@ -183,7 +183,7 @@ class EmulatedMapReduceJobRunner(luigi.contrib.hadoop.JobRunner): """ def group(self, input): - output = StringIO.StringIO() + output = StringIO() lines = [] for i, line in enumerate(input): parts = line.rstrip('\n').split('\t') @@ -197,7 +197,7 @@ def group(self, input): def run_job(self, job): job.init_hadoop() job.init_mapper() - map_output = StringIO.StringIO() + map_output = StringIO() input_targets = luigi.task.flatten(job.input_hadoop()) for input_target in input_targets: # if file is a directory, then assume that it's Hadoop output, @@ -232,7 +232,7 @@ def run_job(self, job): try: reduce_output = job.output().open('w') except Exception: - reduce_output = StringIO.StringIO() + reduce_output = StringIO() try: job._run_reducer(reduce_input, reduce_output) diff --git a/edx/analytics/tasks/common/mysql_load.py b/edx/analytics/tasks/common/mysql_load.py index 39fb14440c..f719f5321d 100644 --- a/edx/analytics/tasks/common/mysql_load.py +++ b/edx/analytics/tasks/common/mysql_load.py @@ -422,7 +422,7 @@ def coerce_for_mysql_connect(input): return input # Hive indicates a null value with the string "\N" # We represent an infinite value with the string "inf", MySQL has no such representation so we use NULL - if input in ('None', '\\N', 'inf', '-inf'): + if input in ('None', r'\N', 'inf', '-inf'): return None if isinstance(input, str): return input.decode('utf-8') diff --git a/edx/analytics/tasks/common/pathutil.py b/edx/analytics/tasks/common/pathutil.py index ee6cacd6eb..a87585c63b 100644 --- a/edx/analytics/tasks/common/pathutil.py +++ b/edx/analytics/tasks/common/pathutil.py @@ -268,12 +268,12 @@ def get_event_and_date_string(self, line): """Default mapper implementation, that always outputs the log line, but with a configurable key.""" event = eventlog.parse_json_event(line) if event is None: - self.incr_counter('Event', 'Discard Unparseable Event', 1) + self.incr_counter(u'Event', u'Discard Unparseable Event', 1) return None event_time = self.get_event_time(event) if not event_time: - self.incr_counter('Event', 'Discard Missing Time Field', 1) + self.incr_counter(u'Event', u'Discard Missing Time Field', 1) return None # Don't use strptime to parse the date, it is extremely slow @@ -283,7 +283,7 @@ def get_event_and_date_string(self, line): date_string = event_time.split("T")[0] if date_string < self.lower_bound_date_string or date_string >= self.upper_bound_date_string: - # Slow: self.incr_counter('Event', 'Discard Outside Date Interval', 1) + # Slow: self.incr_counter(u'Event', u'Discard Outside Date Interval', 1) return None return event, date_string @@ -307,5 +307,5 @@ def get_map_input_file(self): return os.environ['map_input_file'] except KeyError: log.warn('mapreduce_map_input_file not defined in os.environ, unable to determine input file path') - self.incr_counter('Event', 'Missing map_input_file', 1) + self.incr_counter(u'Event', u'Missing map_input_file', 1) return '' diff --git a/edx/analytics/tasks/common/sqoop.py b/edx/analytics/tasks/common/sqoop.py index 44a854bc40..2091a1cc0e 100644 --- a/edx/analytics/tasks/common/sqoop.py +++ b/edx/analytics/tasks/common/sqoop.py @@ -1,6 +1,8 @@ """ Gather data using Sqoop table dumps run on RDBMS databases. """ +from __future__ import absolute_import + import datetime import json import logging @@ -296,7 +298,12 @@ def run_job(self, job): metadata['end_time'] = datetime.datetime.utcnow().isoformat() try: with job.metadata_output().open('w') as metadata_file: - json.dump(metadata, metadata_file) + # Under python 2, json.dumps() will return ascii-only bytes, so .encode('utf-8') + # is a no-op. Under python 3, json.dumps() will return ascii-only unicode, so + # .encode('utf-8') will return bytes, thus normalizing the output to bytes + # across all python versions. + metadata_file.write(json.dumps(metadata).encode('utf-8')) + metadata_file.flush() except Exception: log.exception("Unable to dump metadata information.") pass diff --git a/edx/analytics/tasks/common/tests/test_sqoop.py b/edx/analytics/tasks/common/tests/test_sqoop.py index 1c8d50128d..27de9daa1d 100644 --- a/edx/analytics/tasks/common/tests/test_sqoop.py +++ b/edx/analytics/tasks/common/tests/test_sqoop.py @@ -197,12 +197,12 @@ def test_connect_with_columns(self): self.assertEquals(arglist[-3], 'column1,column2') def test_connect_with_null_string(self): - self.create_and_run_mysql_task(null_string='\\\\N') + self.create_and_run_mysql_task(null_string=r'\\N') arglist = self.get_call_args_after_run() self.assertEquals(arglist[-6], '--null-string') - self.assertEquals(arglist[-5], '\\\\N') + self.assertEquals(arglist[-5], r'\\N') self.assertEquals(arglist[-4], '--null-non-string') - self.assertEquals(arglist[-3], '\\\\N') + self.assertEquals(arglist[-3], r'\\N') def test_connect_with_fields_terminations(self): self.create_and_run_mysql_task(fields_terminated_by='\x01') diff --git a/edx/analytics/tasks/common/vertica_load.py b/edx/analytics/tasks/common/vertica_load.py index 4626b81b4d..c2086d30b0 100644 --- a/edx/analytics/tasks/common/vertica_load.py +++ b/edx/analytics/tasks/common/vertica_load.py @@ -1,6 +1,7 @@ """ Support for loading data into an HP Vertica database. """ +from __future__ import absolute_import import logging import traceback @@ -12,6 +13,7 @@ from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin from edx.analytics.tasks.util.url import ExternalURL from edx.analytics.tasks.util.vertica_target import CredentialFileVerticaTarget +import six log = logging.getLogger(__name__) @@ -416,7 +418,7 @@ def copy_delimiter(self): @property def copy_null_sequence(self): """The null sequence in the data to be copied. Default is Hive NULL (\\N)""" - return "'\\N'" + return r"'\N'" @property def copy_enclosed_by(self): @@ -437,7 +439,7 @@ def copy_escape_spec(self): def copy_data_table_from_target(self, cursor): """Performs the copy query from the insert source.""" - if isinstance(self.columns[0], basestring): + if isinstance(self.columns[0], six.string_types): column_names = ','.join([name for name in self.columns]) elif len(self.columns[0]) == 2: column_names = ','.join([name for name, _type in self.columns]) diff --git a/edx/analytics/tasks/export/data_obfuscation.py b/edx/analytics/tasks/export/data_obfuscation.py index 6d9261c44a..bfc0414e80 100644 --- a/edx/analytics/tasks/export/data_obfuscation.py +++ b/edx/analytics/tasks/export/data_obfuscation.py @@ -8,11 +8,11 @@ import tempfile import xml.etree.ElementTree -import cjson import luigi import yaml import edx.analytics.tasks.util.opaque_key_util as opaque_key_util +from edx.analytics.tasks.util.fast_json import FastJson from edx.analytics.tasks.common.pathutil import PathSetTask from edx.analytics.tasks.util.file_util import copy_file_to_file, read_config_file from edx.analytics.tasks.util.obfuscate_util import ( @@ -194,7 +194,7 @@ def filter_row(self, row): if state_str == 'NULL': updated_state_dict = {} else: - state_dict = cjson.decode(state_str, all_unicode=True) + state_dict = FastJson.loads(state_str) # Traverse the dictionary, looking for entries that need to be scrubbed. updated_state_dict = self.obfuscator.obfuscate_structure(state_dict, u"state", user_info) except Exception: # pylint: disable=broad-except @@ -204,7 +204,7 @@ def filter_row(self, row): if updated_state_dict is not None: # Can't reset values, so update original fields. - updated_state = cjson.encode(updated_state_dict).replace('\\', '\\\\') + updated_state = FastJson.dumps(updated_state_dict).replace('\\', '\\\\') row[4] = updated_state if self.obfuscator.is_logging_enabled(): log.info(u"Obfuscated state for user_id '%s' module_id '%s'", user_id, row[2]) diff --git a/edx/analytics/tasks/export/events_obfuscation.py b/edx/analytics/tasks/export/events_obfuscation.py index 1eb1000ff4..edb0632fa0 100644 --- a/edx/analytics/tasks/export/events_obfuscation.py +++ b/edx/analytics/tasks/export/events_obfuscation.py @@ -6,10 +6,10 @@ import re from collections import defaultdict, namedtuple -import cjson import luigi.date_interval import edx.analytics.tasks.util.opaque_key_util as opaque_key_util +from edx.analytics.tasks.util.fast_json import FastJson from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin, MultiOutputMapReduceJobTask from edx.analytics.tasks.common.pathutil import PathSetTask from edx.analytics.tasks.util import eventlog @@ -328,7 +328,7 @@ def _obfuscate_event(self, event): # Re-encode payload as a json string if it originally was one. # (This test works because we throw away string values that didn't parse as JSON.) if isinstance(event.get('event'), basestring): - event['event'] = cjson.encode(event_data) + event['event'] = FastJson.dumps(event_data) else: event['event'] = event_data diff --git a/edx/analytics/tasks/export/obfuscation.py b/edx/analytics/tasks/export/obfuscation.py index 0a189caffd..8062ee1d69 100644 --- a/edx/analytics/tasks/export/obfuscation.py +++ b/edx/analytics/tasks/export/obfuscation.py @@ -1,11 +1,10 @@ """Tasks to obfuscate course data for RDX.""" - import errno import json import logging import os import tarfile -import urlparse +from six.moves.urllib.parse import urlparse import luigi @@ -126,7 +125,7 @@ def run(self): for target in path_task.output(): with target.open('r') as input_file: # Get path without urlscheme. - course_files_path = urlparse.urlparse(self.course_files_url).path + course_files_path = urlparse(self.course_files_url).path # Calculates target's relative path to course_files_path by getting the substring that # occurs after course_files_path substring in target's path. # Needed as target.path returns path with urlscheme for s3target & without for hdfstarget. diff --git a/edx/analytics/tasks/insights/calendar_task.py b/edx/analytics/tasks/insights/calendar_task.py index 0f5ce6fc01..1ac4811562 100644 --- a/edx/analytics/tasks/insights/calendar_task.py +++ b/edx/analytics/tasks/insights/calendar_task.py @@ -1,8 +1,11 @@ -"""A canonical calendar that can be joined with other tables to provide information about dates.""" - +""" +A canonical calendar that can be joined with other tables to provide information about dates. +""" +from __future__ import absolute_import import logging from datetime import timedelta +import six import luigi.configuration @@ -63,7 +66,7 @@ def run(self): (week.sunday() + timedelta(1)).isoformat(), iso_weekday ) - output_file.write('\t'.join([unicode(v).encode('utf8') for v in column_values]) + '\n') + output_file.write(b'\t'.join([six.text_type(v).encode('utf-8') for v in column_values]) + b'\n') class CalendarTableTask(CalendarDownstreamMixin, HiveTableTask): diff --git a/edx/analytics/tasks/insights/database_imports.py b/edx/analytics/tasks/insights/database_imports.py index fd6a7801f3..2896d674c6 100644 --- a/edx/analytics/tasks/insights/database_imports.py +++ b/edx/analytics/tasks/insights/database_imports.py @@ -1,6 +1,8 @@ """ Import data from external RDBMS databases into Hive. """ +from __future__ import absolute_import + import datetime import logging import textwrap @@ -91,7 +93,7 @@ def partition(self): def partition_location(self): """Provides location of Hive database table's partition data.""" # The actual folder name where the data is stored is expected to be in the format = - partition_name = '='.join(self.partition.items()[0]) + partition_name = '='.join(list(self.partition.items())[0]) # Make sure that input path ends with a slash, to indicate a directory. # (This is necessary for S3 paths that are output from Hadoop jobs.) return url_path_join(self.table_location, partition_name + '/') @@ -169,9 +171,8 @@ def requires(self): database=self.database, # Hive expects NULL to be represented by the string "\N" in the data. You have to pass in "\\N" to sqoop # since it uses that string directly in the generated Java code, so "\\N" actually looks like "\N" to the - # Java code. In order to get "\\N" onto the command line we have to use another set of escapes to tell the - # python code to pass through the "\" character. - null_string='\\\\N', + # Java code. In order to write "\\N" in python, we use a raw string prefix `r`. + null_string=r'\\N', # It's unclear why, but this setting prevents us from correctly substituting nulls with \N. mysql_delimiters=False, # This is a string that is interpreted as an octal number, so it is equivalent to the character Ctrl-A diff --git a/edx/analytics/tasks/insights/tests/test_answer_dist.py b/edx/analytics/tasks/insights/tests/test_answer_dist.py index b3691e233b..56088622a1 100644 --- a/edx/analytics/tasks/insights/tests/test_answer_dist.py +++ b/edx/analytics/tasks/insights/tests/test_answer_dist.py @@ -2,12 +2,13 @@ Tests for tasks that calculate answer distributions. """ +from __future__ import absolute_import import hashlib import json import math import os import shutil -import StringIO +from io import BytesIO import tempfile from unittest import TestCase @@ -20,6 +21,7 @@ ) from edx.analytics.tasks.util.tests.config import OPTION_REMOVED, with_luigi_config from edx.analytics.tasks.util.tests.opaque_key_mixins import InitializeLegacyKeysMixin, InitializeOpaqueKeysMixin +import six class ProblemCheckEventBaseTest(MapperTestMixin, ReducerTestMixin, TestCase): @@ -73,7 +75,7 @@ def _create_event_data_dict(self, **kwargs): @staticmethod def _update_with_kwargs(data_dict, **kwargs): """Updates a dict from kwargs only if it modifies a top-level value.""" - for key, value in kwargs.iteritems(): + for key, value in six.iteritems(kwargs): if key in data_dict: data_dict[key] = value @@ -317,7 +319,7 @@ def insert_answer_data(submission, attempt_category): submission: dictionary of all responses submitted at once for a user attempt_category: a string that is 'first' for a user's first submission and 'last' otherwise """ - for answer_id, submission_data in submission.iteritems(): + for answer_id, submission_data in six.iteritems(submission): answer_id_data = { "answer": submission_data['answer'], "problem_display_name": None, @@ -830,7 +832,7 @@ def _load_metadata(self, **kwargs): } } metadata_dict[self.answer_id].update(**kwargs) - answer_metadata = StringIO.StringIO(json.dumps(metadata_dict)) + answer_metadata = BytesIO(json.dumps(metadata_dict).encode('utf-8')) self.task.load_answer_metadata(answer_metadata) def test_non_submission_choice_with_metadata(self): @@ -944,7 +946,7 @@ def test_reduce_multiple_values(self): # To test sorting, the first sample is made to sort after the # second sample. - column_values_2 = [(k, unicode(k) + u'\u2603') for k in field_names] + column_values_2 = [(k, six.text_type(k) + u'\u2603') for k in field_names] column_values_2[3] = (column_values_2[3][0], 10) column_values_1 = list(column_values_2) column_values_1[4] = (column_values_1[4][0], u'ZZZZZZZZZZZ') @@ -958,9 +960,9 @@ def test_reduce_multiple_values(self): self.assertEquals(mock_output_file.write.mock_calls[0], call(expected_header_string)) # Confirm that the second sample appears before the first. - expected_row_1 = ','.join(unicode(v[1]).encode('utf8') for v in column_values_2) + '\r\n' + expected_row_1 = b','.join(six.text_type(v[1]).encode('utf8') for v in column_values_2) + b'\r\n' self.assertEquals(mock_output_file.write.mock_calls[1], call(expected_row_1)) - expected_row_2 = ','.join(unicode(v[1]).encode('utf8') for v in column_values_1) + '\r\n' + expected_row_2 = b','.join(six.text_type(v[1]).encode('utf8') for v in column_values_1) + b'\r\n' self.assertEquals(mock_output_file.write.mock_calls[2], call(expected_row_2)) def test_output_path_for_legacy_key(self): diff --git a/edx/analytics/tasks/insights/tests/test_course_blocks.py b/edx/analytics/tasks/insights/tests/test_course_blocks.py index 191172a573..a7ab936884 100644 --- a/edx/analytics/tasks/insights/tests/test_course_blocks.py +++ b/edx/analytics/tasks/insights/tests/test_course_blocks.py @@ -1,12 +1,11 @@ """Test course blocks tasks.""" - import json import logging import os import shutil import tempfile from unittest import TestCase -from urllib import urlencode +from six.moves.urllib.parse import urlencode import httpretty from ddt import data, ddt, unpack @@ -53,7 +52,7 @@ def create_input_file(self, course_ids): """Create a tab-separated file containing the given course_ids.""" with open(self.input_file, 'w') as output: for course_id in course_ids: - output.write("\t".join([course_id, 'Name', 'Org', 'Number', 'http://'] + ['\\N'] * 9)) + output.write("\t".join([course_id, 'Name', 'Org', 'Number', 'http://'] + [r'\N'] * 9)) output.write("\r\n") def cleanup(self, dirname): @@ -174,18 +173,18 @@ class CourseBlocksApiDataReducerTaskTest(CourseBlocksTestMixin, ReducerTestMixin # data tuple fields are given in this order: # (block_id,block_type,display_name,is_root,is_orphan,is_dag,parent_block_id,course_path,sort_idx) @data( - ((('abc', 'course', 'ABC', '1', '0', '0', '\\N', '', '0'),), False), - ((('abc', 'course', 'ABC', '1', '0', '0', '\\N', '', '0'),), True), - ((('abc', 'block', 'ABC', '1', '0', '0', '\\N', '', '0'), + ((('abc', 'course', 'ABC', '1', '0', '0', r'\N', '', '0'),), False), + ((('abc', 'course', 'ABC', '1', '0', '0', r'\N', '', '0'),), True), + ((('abc', 'block', 'ABC', '1', '0', '0', r'\N', '', '0'), ('def', 'block', 'DEF', '0', '0', '0', 'abc', 'ABC', '1'), ('jkl', 'block', 'JKL', '0', '0', '1', 'def', 'ABC / DEF', '2'), ('vwx', 'block', 'VWX', '0', '0', '0', 'jkl', 'ABC / DEF / JKL', '3'), ('mno', 'block', 'MNO', '0', '0', '0', 'def', 'ABC / DEF', '4'), ('pqr', 'block', 'PQR', '0', '0', '0', 'mno', 'ABC / DEF / MNO', '5'), ('stu', 'block', 'STU', '0', '0', '0', 'abc', 'ABC', '6'), - ('ghi', 'block', 'GHI', '0', '1', '0', '\\N', '(Deleted block :)', '8')), False), - ((('ghi', 'block', 'GHI', '0', '1', '0', '\\N', '(Deleted block :)', '-1'), - ('abc', 'block', 'ABC', '1', '0', '0', '\\N', '', '0'), + ('ghi', 'block', 'GHI', '0', '1', '0', r'\N', '(Deleted block :)', '8')), False), + ((('ghi', 'block', 'GHI', '0', '1', '0', r'\N', '(Deleted block :)', '-1'), + ('abc', 'block', 'ABC', '1', '0', '0', r'\N', '', '0'), ('def', 'block', 'DEF', '0', '0', '0', 'abc', 'ABC', '1'), ('jkl', 'block', 'JKL', '0', '0', '1', 'def', 'ABC / DEF', '2'), ('vwx', 'block', 'VWX', '0', '0', '0', 'jkl', 'ABC / DEF / JKL', '3'), diff --git a/edx/analytics/tasks/insights/tests/test_course_list.py b/edx/analytics/tasks/insights/tests/test_course_list.py index 549b879463..0f96056b48 100644 --- a/edx/analytics/tasks/insights/tests/test_course_list.py +++ b/edx/analytics/tasks/insights/tests/test_course_list.py @@ -1,4 +1,6 @@ -"""Test course list tasks.""" +""" +Test course list tasks. +""" import json import logging import os @@ -6,7 +8,7 @@ import tempfile from datetime import datetime from unittest import TestCase -from urllib import urlencode +from six.moves.urllib.parse import urlencode import httpretty from ddt import data, ddt, unpack diff --git a/edx/analytics/tasks/insights/tests/test_enrollments.py b/edx/analytics/tasks/insights/tests/test_enrollments.py index 06e1d893c0..25bea2125c 100644 --- a/edx/analytics/tasks/insights/tests/test_enrollments.py +++ b/edx/analytics/tasks/insights/tests/test_enrollments.py @@ -432,8 +432,8 @@ def test_no_events(self): def test_single_enrollment(self): inputs = [('2013-01-01T00:00:01', ACTIVATED, 'honor'), ] - expected = ((self.course_id, self.user_id, 'honor', '1', 'honor', '2013-01-01 00:00:01.000000', '\\N', '\\N', - '\\N', '2013-01-02 00:00:00.000000'),) + expected = ((self.course_id, self.user_id, 'honor', '1', 'honor', '2013-01-01 00:00:01.000000', r'\N', r'\N', + r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def create_enrollment_task(self, interval='2013-01-01'): @@ -461,7 +461,7 @@ def test_normal_multiple_event_sequence(self): ('2013-01-01T00:00:04', DEACTIVATED, 'honor'), ] expected = ((self.course_id, self.user_id, 'honor', '0', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:04.000000', '\\N', '\\N', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:04.000000', r'\N', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_ignore_conflicting_activate_after_mode_change(self): @@ -471,7 +471,7 @@ def test_ignore_conflicting_activate_after_mode_change(self): ('2013-01-01T00:00:03', ACTIVATED, 'honor'), ] expected = ((self.course_id, self.user_id, 'verified', '1', 'honor', '2013-01-01 00:00:01.000000', - '\\N', '2013-01-01 00:00:02.000000', '\\N', '2013-01-02 00:00:00.000000'),) + r'\N', '2013-01-01 00:00:02.000000', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_ignore_redundant_unenroll_events(self): @@ -482,7 +482,7 @@ def test_ignore_redundant_unenroll_events(self): ('2013-01-01T00:00:04', ACTIVATED, 'honor'), ] expected = ((self.course_id, self.user_id, 'honor', '1', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:02.000000', '\\N', '\\N', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:02.000000', r'\N', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_events_out_of_order(self): @@ -494,7 +494,7 @@ def test_events_out_of_order(self): ('2013-01-01T00:00:02', DEACTIVATED, 'honor'), ] expected = ((self.course_id, self.user_id, 'honor', '0', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:04.000000', '\\N', '\\N', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:04.000000', r'\N', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_ignore_redundant_enroll_events(self): @@ -504,8 +504,8 @@ def test_ignore_redundant_enroll_events(self): ('2013-01-01T00:00:03', ACTIVATED, 'honor'), ('2013-01-01T00:00:04', ACTIVATED, 'honor'), ] - expected = ((self.course_id, self.user_id, 'honor', '1', 'honor', '2013-01-01 00:00:01.000000', '\\N', '\\N', - '\\N', '2013-01-02 00:00:00.000000'),) + expected = ((self.course_id, self.user_id, 'honor', '1', 'honor', '2013-01-01 00:00:01.000000', r'\N', r'\N', + r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_ignore_mode_change_on_redundant_enroll_events(self): @@ -513,8 +513,8 @@ def test_ignore_mode_change_on_redundant_enroll_events(self): ('2013-01-01T00:00:01', ACTIVATED, 'honor'), ('2013-01-01T00:00:02', ACTIVATED, 'verified'), ] - expected = ((self.course_id, self.user_id, 'honor', '1', 'honor', '2013-01-01 00:00:01.000000', '\\N', '\\N', - '\\N', '2013-01-02 00:00:00.000000'),) + expected = ((self.course_id, self.user_id, 'honor', '1', 'honor', '2013-01-01 00:00:01.000000', r'\N', r'\N', + r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_ignore_mode_change_while_deactivated(self): @@ -524,7 +524,7 @@ def test_ignore_mode_change_while_deactivated(self): ('2013-01-01T00:00:03', MODE_CHANGED, 'verified'), ] expected = ((self.course_id, self.user_id, 'honor', '0', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:02.000000', '\\N', '\\N', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:02.000000', r'\N', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_mode_change_via_activation_events(self): @@ -534,7 +534,7 @@ def test_mode_change_via_activation_events(self): ('2013-01-01T00:00:03', ACTIVATED, 'verified'), ] expected = ((self.course_id, self.user_id, 'verified', '1', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:02.000000', '2013-01-01 00:00:03.000000', '\\N', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:02.000000', '2013-01-01 00:00:03.000000', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_ignore_different_mode_on_unenroll_event(self): @@ -543,7 +543,7 @@ def test_ignore_different_mode_on_unenroll_event(self): ('2013-01-01T00:00:02', DEACTIVATED, 'verified'), ] expected = ((self.course_id, self.user_id, 'honor', '0', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:02.000000', '\\N', '\\N', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:02.000000', r'\N', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_normal_explicit_mode_change(self): @@ -552,8 +552,8 @@ def test_normal_explicit_mode_change(self): ('2013-01-01T00:00:01', ACTIVATED, 'honor'), ('2013-01-02T00:00:02', MODE_CHANGED, 'verified') ] - expected = ((self.course_id, self.user_id, 'verified', '1', 'honor', '2013-01-01 00:00:01.000000', '\\N', - '2013-01-02 00:00:02.000000', '\\N', '2013-01-03 00:00:00.000000'),) + expected = ((self.course_id, self.user_id, 'verified', '1', 'honor', '2013-01-01 00:00:01.000000', r'\N', + '2013-01-02 00:00:02.000000', r'\N', '2013-01-03 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_multiple_mode_change_events(self): @@ -565,7 +565,7 @@ def test_multiple_mode_change_events(self): ('2013-01-02T00:00:04', MODE_CHANGED, 'audit'), ('2013-01-02T00:00:05', MODE_CHANGED, 'credit') ] - expected = ((self.course_id, self.user_id, 'credit', '1', 'honor', '2013-01-01 00:00:01.000000', '\\N', + expected = ((self.course_id, self.user_id, 'credit', '1', 'honor', '2013-01-01 00:00:01.000000', r'\N', '2013-01-02 00:00:02.000000', '2013-01-02 00:00:05.000000', '2013-01-03 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) self._check_output_complete_tuple(inputs, expected) @@ -579,7 +579,7 @@ def test_capture_first_verified_time(self): ('2013-01-01T00:00:04', ACTIVATED, 'verified') ] expected = ((self.course_id, self.user_id, 'verified', '1', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:03.000000', '2013-01-01 00:00:02.000000', '\\N', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:03.000000', '2013-01-01 00:00:02.000000', r'\N', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) def test_capture_first_credit_time(self): @@ -591,7 +591,7 @@ def test_capture_first_credit_time(self): ('2013-01-01T00:00:04', ACTIVATED, 'credit') ] expected = ((self.course_id, self.user_id, 'credit', '1', 'honor', '2013-01-01 00:00:01.000000', - '2013-01-01 00:00:03.000000', '\\N', '2013-01-01 00:00:02.000000', '2013-01-02 00:00:00.000000'),) + '2013-01-01 00:00:03.000000', r'\N', '2013-01-01 00:00:02.000000', '2013-01-02 00:00:00.000000'),) self._check_output_complete_tuple(inputs, expected) diff --git a/edx/analytics/tasks/insights/tests/test_video.py b/edx/analytics/tasks/insights/tests/test_video.py index a1e2f4d02b..80f2ff2476 100644 --- a/edx/analytics/tasks/insights/tests/test_video.py +++ b/edx/analytics/tasks/insights/tests/test_video.py @@ -397,8 +397,8 @@ def setUp(self): super(UserVideoViewingTaskReducerTest, self).setUp() self.user_id = 10 self.reduce_key = (self.user_id, self.COURSE_ID, self.VIDEO_MODULE_ID) - patcher = patch('edx.analytics.tasks.insights.video.urllib') - self.mock_urllib = patcher.start() + patcher = patch('edx.analytics.tasks.insights.video.urlopen') + self.mock_urlopen = patcher.start() self.addCleanup(patcher.stop) def test_simple_viewing(self): @@ -634,7 +634,7 @@ def prepare_youtube_api_mock_raw(self, response_string): mock_response = MagicMock(spec=file) mock_response.code = 200 mock_response.read.side_effect = [response_string, ''] - self.mock_urllib.urlopen.return_value = mock_response + self.mock_urlopen.return_value = mock_response def test_pause_after_end_of_video(self): self.prepare_youtube_api_mock('PT1M2S') diff --git a/edx/analytics/tasks/insights/video.py b/edx/analytics/tasks/insights/video.py index 2636caa11b..01bccede87 100644 --- a/edx/analytics/tasks/insights/video.py +++ b/edx/analytics/tasks/insights/video.py @@ -5,7 +5,7 @@ import math import re import textwrap -import urllib +from six.moves.urllib.request import urlopen from collections import namedtuple import ciso8601 @@ -475,7 +475,7 @@ def get_video_duration(self, youtube_id): video_url = "https://www.googleapis.com/youtube/v3/videos?id={0}&part=contentDetails&key={1}".format( youtube_id, self.api_key ) - video_file = urllib.urlopen(video_url) + video_file = urlopen(video_url) content = json.load(video_file) items = content.get('items', []) if len(items) > 0: diff --git a/edx/analytics/tasks/launchers/local.py b/edx/analytics/tasks/launchers/local.py index 90b854c69f..532a59d968 100644 --- a/edx/analytics/tasks/launchers/local.py +++ b/edx/analytics/tasks/launchers/local.py @@ -20,8 +20,8 @@ import certifi import chardet import ciso8601 -import cjson -import filechunkio +import __future__ +import future import idna import luigi import luigi.configuration @@ -34,6 +34,13 @@ import six import stevedore import urllib3 +cjson, ujson = None, None +try: + import ujson + ujson_found = True +except ImportError: + import cjson + ujson_found = False import edx.analytics.tasks @@ -93,15 +100,20 @@ def main(): # Tell luigi what dependencies to pass to the Hadoop nodes: # - edx.analytics.tasks is used to load the pipeline code, since we cannot trust all will be loaded automatically. # - boto is used for all direct interactions with s3. - # - cjson is used for all parsing event logs. - # - filechunkio is used for multipart uploads of large files to s3. + # - cjson/ujson is used for all parsing event logs. # - opaque_keys is used to interpret serialized course_ids # - opaque_keys extensions: ccx_keys # - dependencies of opaque_keys: bson, stevedore, six # - requests has several dependencies: # - chardet, urllib3, certifi, idna luigi.contrib.hadoop.attach(edx.analytics.tasks) - luigi.contrib.hadoop.attach(boto, cjson, filechunkio, opaque_keys, bson, stevedore, six, ciso8601, chardet, urllib3, certifi, idna, requests, pytz) + if ujson_found: + luigi.contrib.hadoop.attach(ujson) + else: + luigi.contrib.hadoop.attach(cjson) + luigi.contrib.hadoop.attach(boto, opaque_keys, bson, stevedore, six, ciso8601, chardet, urllib3, certifi, idna, requests, pytz) + # Try to get this to work with Python3 as well: + luigi.contrib.hadoop.attach(__future__, future) if configuration.getboolean('ccx', 'enabled', default=False): import ccx_keys @@ -176,7 +188,7 @@ def output_dependency_tree(cmdline_args): """Print out a tree representation of the dependencies of the given task.""" with luigi.cmdline_parser.CmdlineParser.global_instance(cmdline_args) as command_parser: task = command_parser.get_task_obj() - print print_dependency_tree(task) + print(print_dependency_tree(task)) if __name__ == '__main__': diff --git a/edx/analytics/tasks/launchers/remote.py b/edx/analytics/tasks/launchers/remote.py index f392494c7f..6c8e8bc525 100755 --- a/edx/analytics/tasks/launchers/remote.py +++ b/edx/analytics/tasks/launchers/remote.py @@ -1,6 +1,5 @@ #!/usr/bin/env python """Execute tasks on a remote EMR cluster.""" - import argparse import json import os @@ -8,7 +7,7 @@ import sys import uuid from subprocess import PIPE, Popen -from urlparse import parse_qsl, urlparse +from six.moves.urllib.parse import urlparse, parse_qsl STATIC_FILES_PATH = os.path.join(sys.prefix, 'share', 'edx.analytics.tasks') EC2_INVENTORY_PATH = os.path.join(STATIC_FILES_PATH, 'ec2.py') @@ -119,6 +118,8 @@ def run_task_playbook(inventory, arguments, uid): if arguments.workflow_profiler: env_vars['WORKFLOW_PROFILER'] = arguments.workflow_profiler env_vars['WORKFLOW_PROFILER_PATH'] = log_dir + if arguments.python_version: + env_vars['HADOOP_PYTHON_EXECUTABLE'] = arguments.python_version env_var_string = ' '.join('{0}={1}'.format(k, v) for k, v in env_vars.iteritems()) @@ -191,7 +192,7 @@ def convert_args_to_extra_vars(arguments, uid): if arguments.virtualenv_extra_args: extra_vars['virtualenv_extra_args'] = arguments.virtualenv_extra_args if arguments.python_version: - extra_vars['python_version'] = arguments.python_version + extra_vars['virtualenv_python'] = arguments.python_version if arguments.package: extra_vars['packages'] = arguments.package diff --git a/edx/analytics/tasks/monitor/tests/test_overall_events.py b/edx/analytics/tasks/monitor/tests/test_overall_events.py index f0fde7bec3..4795c11550 100644 --- a/edx/analytics/tasks/monitor/tests/test_overall_events.py +++ b/edx/analytics/tasks/monitor/tests/test_overall_events.py @@ -2,7 +2,7 @@ import json import sys -from StringIO import StringIO +from io import StringIO from unittest import TestCase from edx.analytics.tasks.common.tests.map_reduce_mixins import MapperTestMixin, ReducerTestMixin diff --git a/edx/analytics/tasks/monitor/tests/test_total_events_report.py b/edx/analytics/tasks/monitor/tests/test_total_events_report.py index 5eb87a49c8..17510fd8ef 100644 --- a/edx/analytics/tasks/monitor/tests/test_total_events_report.py +++ b/edx/analytics/tasks/monitor/tests/test_total_events_report.py @@ -6,7 +6,7 @@ import shutil import tempfile import textwrap -from StringIO import StringIO +from io import BytesIO from unittest import TestCase import pandas @@ -86,7 +86,7 @@ def reformat(string): task.run() data = output_target.buffer.read() - result = pandas.read_csv(StringIO(data), + result = pandas.read_csv(BytesIO(data), na_values=['-'], index_col=False, header=None, diff --git a/edx/analytics/tasks/tests/acceptance/__init__.py b/edx/analytics/tasks/tests/acceptance/__init__.py index c1db23c20d..65862e35be 100644 --- a/edx/analytics/tasks/tests/acceptance/__init__.py +++ b/edx/analytics/tasks/tests/acceptance/__init__.py @@ -1,3 +1,6 @@ +from __future__ import print_function +from __future__ import absolute_import + import csv import hashlib import json @@ -14,6 +17,7 @@ from edx.analytics.tasks.tests.acceptance.services import db, elasticsearch_service, fs, hive, task, vertica from edx.analytics.tasks.util.s3_util import ScalableS3Client from edx.analytics.tasks.util.url import get_target_from_url, url_path_join +import six log = logging.getLogger(__name__) @@ -123,7 +127,7 @@ def as_list_param(value, escape_quotes=True): def coerce_columns_to_string(row): # Vertica response includes datatypes in some columns i-e. datetime, Decimal etc. so convert # them into string before comparison with expected output. Also a challenge with 'None' values. - return [unicode(x) for x in row] + return [six.text_type(x) for x in row] def read_csv_fixture_as_list(fixture_file_path): @@ -206,7 +210,12 @@ def setUp(self): elasticsearch_alias = 'alias_test_' + self.identifier self.warehouse_path = url_path_join(self.test_root, 'warehouse') self.edx_rest_api_cache_root = url_path_join(self.test_src, 'edx-rest-api-cache') + # Use config directly, rather than os.getenv('HADOOP_PYTHON_EXECUTABLE', '/usr/bin/python') + python_executable = self.config.get('python_version', '/usr/bin/python') task_config_override = { + 'hadoop': { + 'python-executable': python_executable, + }, 'hive': { 'database': database_name, 'warehouse_path': self.warehouse_path @@ -374,19 +383,19 @@ def assert_data_frames_equal(data, expected): assert_frame_equal(data, expected) except AssertionError: pandas.set_option('display.max_columns', None) - print '----- The report generated this data: -----' - print data - print '----- vs expected: -----' - print expected + print('----- The report generated this data: -----') + print(data) + print('----- vs expected: -----') + print(expected) if data.shape != expected.shape: - print "Data shapes differ." + print("Data shapes differ.") else: for index, _series in data.iterrows(): # Try to print a more helpful/localized difference message: try: assert_series_equal(data.iloc[index, :], expected.iloc[index, :]) except AssertionError: - print "First differing row: {index}".format(index=index) + print("First differing row: {index}".format(index=index)) raise @staticmethod diff --git a/edx/analytics/tasks/tests/acceptance/services/task.py b/edx/analytics/tasks/tests/acceptance/services/task.py index 15a1abb774..361b8d93eb 100644 --- a/edx/analytics/tasks/tests/acceptance/services/task.py +++ b/edx/analytics/tasks/tests/acceptance/services/task.py @@ -1,11 +1,13 @@ +from __future__ import absolute_import -import ConfigParser +import six.moves.configparser import logging import os import sys import tempfile from edx.analytics.tasks.tests.acceptance.services import shell +import six log = logging.getLogger(__name__) @@ -31,7 +33,7 @@ def __init__(self, config, task_config_override, identifier): def launch(self, task_args, config_override=None): self.delete_existing_logs() - config_parser = ConfigParser.ConfigParser() + config_parser = six.moves.configparser.ConfigParser() config_parser.read(os.environ['LUIGI_CONFIG_PATH']) self.override_config(config_parser, self.default_config_override) if config_override: @@ -103,18 +105,18 @@ def delete_existing_logs(self): pass def override_config(self, config_parser, overrides): - for section_name, section in overrides.iteritems(): + for section_name, section in six.iteritems(overrides): if not config_parser.has_section(section_name): config_parser.add_section(section_name) - for key, value in section.iteritems(): + for key, value in six.iteritems(section): config_parser.set(section_name, key, value) def write_logs_to_standard_streams(self): if not self.log_path: return - for filename, output_file in self.logs.iteritems(): + for filename, output_file in six.iteritems(self.logs): try: with open(os.path.join(self.log_path, filename), 'r') as src_file: while True: diff --git a/edx/analytics/tasks/tests/acceptance/test_database_export.py b/edx/analytics/tasks/tests/acceptance/test_database_export.py index 489a6cb5fa..aef555456c 100644 --- a/edx/analytics/tasks/tests/acceptance/test_database_export.py +++ b/edx/analytics/tasks/tests/acceptance/test_database_export.py @@ -3,15 +3,14 @@ validate user visible outputs. """ - import datetime import logging import os import shutil +import stat import tempfile import textwrap -import urlparse - +from six.moves.urllib.parse import urlparse import gnupg from edx.analytics.tasks.tests.acceptance import AcceptanceTestCase, when_exporter_available @@ -56,7 +55,10 @@ def create_temporary_directories(self): for dir_path in [self.external_files_dir, self.working_dir, self.validation_dir, self.gpg_dir]: os.makedirs(dir_path) - os.chmod(self.gpg_dir, 0700) + os.chmod( + self.gpg_dir, + stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR, # equivalent to "0700" in Unix chmod. + ) # The exporter expects this directory to already exist. os.makedirs(os.path.join(self.working_dir, 'course-data')) diff --git a/edx/analytics/tasks/tests/acceptance/test_database_import.py b/edx/analytics/tasks/tests/acceptance/test_database_import.py index e9197008a0..7639e9eef6 100644 --- a/edx/analytics/tasks/tests/acceptance/test_database_import.py +++ b/edx/analytics/tasks/tests/acceptance/test_database_import.py @@ -50,7 +50,7 @@ def test_import_from_mysql(self): )) def map_null_to_hive_null(row): - return ['\\N' if x == 'NULL' else x for x in row] + return [r'\N' if x == 'NULL' else x for x in row] output_rows = [x.split('\t') for x in hive_output.splitlines() if '\t' in x] output_rows = map(map_null_to_hive_null, output_rows) @@ -59,16 +59,16 @@ def map_null_to_hive_null(row): expected_rows = [ [ - '1', '1', 'edX/Open_DemoX/edx_demo_course', '\\N', 'version-1', 'grading-policy-1', + '1', '1', 'edX/Open_DemoX/edx_demo_course', r'\N', 'version-1', 'grading-policy-1', '0.7', 'C', '2017-01-31 00:05:00', '2017-02-01 00:00:00', '2017-02-01 00:00:00', ], [ - '2', '2', 'edX/Open_DemoX/edx_demo_course', '\\N', 'version-1', 'grading-policy-1', + '2', '2', 'edX/Open_DemoX/edx_demo_course', r'\N', 'version-1', 'grading-policy-1', '0.8', 'B', '2017-01-31 00:05:00', '2017-02-01 00:00:00', '2017-02-01 00:00:00', ], [ - '3', '3', 'edX/Open_DemoX/edx_demo_course', '\\N', 'version-1', 'grading-policy-1', - '0.2', 'Fail', '\\N', '2017-02-01 00:00:00', '2017-02-01 00:00:00', + '3', '3', 'edX/Open_DemoX/edx_demo_course', r'\N', 'version-1', 'grading-policy-1', + '0.2', 'Fail', r'\N', '2017-02-01 00:00:00', '2017-02-01 00:00:00', ], ] diff --git a/edx/analytics/tasks/tests/acceptance/test_enrollment_validation.py b/edx/analytics/tasks/tests/acceptance/test_enrollment_validation.py index 4cdcfceb16..b9d04379a6 100644 --- a/edx/analytics/tasks/tests/acceptance/test_enrollment_validation.py +++ b/edx/analytics/tasks/tests/acceptance/test_enrollment_validation.py @@ -4,7 +4,7 @@ import gzip import json import logging -import StringIO +from io import StringIO from collections import defaultdict from edx.analytics.tasks.tests.acceptance import AcceptanceTestCase, as_list_param @@ -108,7 +108,7 @@ def check_synthetic_events(self, output_dir): histogram = defaultdict(int) # int() returns 0 for output in outputs: # Read S3 file into a buffer, since the S3 file doesn't support seek() and tell(). - gzip_output = StringIO.StringIO() + gzip_output = StringIO() with output.open('r') as event_file: gzip_output.write(event_file.read()) gzip_output.seek(0) diff --git a/edx/analytics/tasks/tests/acceptance/test_enrollments.py b/edx/analytics/tasks/tests/acceptance/test_enrollments.py index b053f7299f..dfc6414273 100644 --- a/edx/analytics/tasks/tests/acceptance/test_enrollments.py +++ b/edx/analytics/tasks/tests/acceptance/test_enrollments.py @@ -5,7 +5,7 @@ import datetime import logging import os -from cStringIO import StringIO +from io import StringIO import pandas from ddt import data, ddt diff --git a/edx/analytics/tasks/tests/acceptance/test_financial_reports.py b/edx/analytics/tasks/tests/acceptance/test_financial_reports.py index e0120d4cf5..8bb8dc35c3 100644 --- a/edx/analytics/tasks/tests/acceptance/test_financial_reports.py +++ b/edx/analytics/tasks/tests/acceptance/test_financial_reports.py @@ -1,12 +1,14 @@ """ End to end test of the financial reporting workflow. """ +from __future__ import absolute_import import logging import os import luigi import pandas +from six.moves import map from edx.analytics.tasks.tests.acceptance import ( AcceptanceTestCase, coerce_columns_to_string, read_csv_fixture_as_list, when_vertica_available @@ -71,7 +73,7 @@ def test_end_to_end(self): )) response = cursor.fetchall() - f_orderitem_transactions = pandas.DataFrame(map(coerce_columns_to_string, response), columns=columns) + f_orderitem_transactions = pandas.DataFrame(list(map(coerce_columns_to_string, response)), columns=columns) for frame in (f_orderitem_transactions, expected): frame.sort(['payment_ref_id', 'transaction_type'], inplace=True, ascending=[True, False]) diff --git a/edx/analytics/tasks/tools/obfuscate_eval.py b/edx/analytics/tasks/tools/obfuscate_eval.py index 513c3d8c19..d31a3cdb1d 100644 --- a/edx/analytics/tasks/tools/obfuscate_eval.py +++ b/edx/analytics/tasks/tools/obfuscate_eval.py @@ -14,6 +14,7 @@ read from the same directory as the database dump being analyzed. """ +from __future__ import print_function import argparse import errno @@ -24,11 +25,11 @@ import os import sys from collections import defaultdict, namedtuple -from cStringIO import StringIO +from io import StringIO -import cjson from pyinstrument import Profiler +from edx.analytics.tasks.util.fast_json import FastJson from edx.analytics.tasks.common.pathutil import PathSetTask from edx.analytics.tasks.util import eventlog from edx.analytics.tasks.util.obfuscate_util import Obfuscator, backslash_decode_value, backslash_encode_value @@ -378,12 +379,12 @@ def obfuscate_event_entry(self, line): log.info(u"Obfuscated %s event with event_type = '%s'", event_source, event_type) if event_json_decoded: - # TODO: should really use cjson, if that were originally used for decoding the json. + # TODO: should really use FastJson, if that were originally used for decoding the json. updated_event_data = json.dumps(updated_event_data) event['event'] = updated_event_data - # TODO: should really use cjson, if that were originally used for decoding the json. + # TODO: should really use FastJson, if that were originally used for decoding the json. return json.dumps(event) def obfuscate_courseware_file(self, input_filepath, output_dir): @@ -440,7 +441,7 @@ def obfuscate_courseware_entry(self, line, user_profile): # is not escaped in the same way. In particular, we will not decode and encode it. state_str = record.state.replace('\\\\', '\\') try: - state_dict = cjson.decode(state_str, all_unicode=True) + state_dict = FastJson.loads(state_str) except Exception as exc: log.exception(u"Unable to parse state as JSON for record %s: type = %s, state = %r", record.id, type(state_str), state_str) return line @@ -539,7 +540,7 @@ def obfuscate_forum_entry(self, line, user_profile): # are also different, as to when \u notation is used for a character as # opposed to a utf8 encoding of the character. try: - entry = cjson.decode(line, all_unicode=True) + entry = FastJson.loads(line) except ValueError as exc: log.error("Failed to parse json for line: %r", line) return "" @@ -698,7 +699,7 @@ def main(): finally: if profiler: profiler.stop() - print >>sys.stderr, profiler.output_text(unicode=True, color=True) + print(profiler.output_text(unicode=True, color=True), file=sys.stderr) if __name__ == '__main__': diff --git a/edx/analytics/tasks/tools/s3util.py b/edx/analytics/tasks/tools/s3util.py index 3fc7286818..6452bc0bde 100644 --- a/edx/analytics/tasks/tools/s3util.py +++ b/edx/analytics/tasks/tools/s3util.py @@ -1,4 +1,5 @@ """Command-line utility for using (and testing) s3 utility methods.""" +from __future__ import print_function import argparse import os @@ -14,7 +15,7 @@ def list_s3_files(source_url, patterns): for bucket, root, path in generate_s3_sources(s3_conn, source_url, patterns): source = join_as_s3_url(bucket, root, path) src_key = get_s3_key(s3_conn, source) - print "%10d %s" % (src_key.size if src_key is not None else -1, path) + print("%10d %s" % (src_key.size if src_key is not None else -1, path)) def get_s3_files(source_url, dest_root, patterns): @@ -28,7 +29,7 @@ def get_s3_files(source_url, dest_root, patterns): if src_key is not None: src_key.get_contents_to_filename(destination) else: - print "No key for source " + source + print("No key for source " + source) def main(): diff --git a/edx/analytics/tasks/util/csv_util.py b/edx/analytics/tasks/util/csv_util.py index 408082b0eb..539106eb84 100644 --- a/edx/analytics/tasks/util/csv_util.py +++ b/edx/analytics/tasks/util/csv_util.py @@ -1,9 +1,11 @@ """ Simple CSV utilities. """ +from __future__ import absolute_import import csv -from StringIO import StringIO +from io import BytesIO +import six class MySQLDumpDialect(csv.Dialect): @@ -47,21 +49,21 @@ class MySQLExportDialect(MySQLPipeDialect): 'mysqlexport': MySQLExportDialect } -for dialect_name, dialect_class in DIALECTS.iteritems(): +for dialect_name, dialect_class in six.iteritems(DIALECTS): csv.register_dialect(dialect_name, dialect_class) def parse_line(line, dialect='excel'): """Parse one line of CSV in the dialect specified.""" # csv.reader requires an iterable per row, so we wrap the line in a list - parsed = csv.reader([line], dialect=dialect).next() + parsed = next(csv.reader([line], dialect=dialect)) return parsed def to_csv_line(row, dialect='excel'): """Return a CSV line by joining the values in row in the dialect specified.""" - output = StringIO() + output = BytesIO() csv.writer(output, dialect=dialect).writerow(row) output.seek(0) diff --git a/edx/analytics/tasks/util/eventlog.py b/edx/analytics/tasks/util/eventlog.py index 90b24301b8..29b18272b6 100644 --- a/edx/analytics/tasks/util/eventlog.py +++ b/edx/analytics/tasks/util/eventlog.py @@ -4,9 +4,8 @@ import logging import re -import cjson - import edx.analytics.tasks.util.opaque_key_util as opaque_key_util +from edx.analytics.tasks.util.fast_json import FastJson log = logging.getLogger(__name__) @@ -15,14 +14,14 @@ def decode_json(line): """Wrapper to decode JSON string in an implementation-independent way.""" - # TODO: Verify correctness of cjson - return cjson.decode(line) + # TODO: Verify correctness of FastJson + return FastJson.loads(line) def encode_json(obj): """Wrapper to re-encode JSON string in an implementation-independent way.""" - # TODO: Verify correctness of cjson - return cjson.encode(obj) + # TODO: Verify correctness of FastJson + return FastJson.dumps(obj) def parse_json_event(line, nested=False): diff --git a/edx/analytics/tasks/util/fast_json.py b/edx/analytics/tasks/util/fast_json.py new file mode 100644 index 0000000000..7f91f4b112 --- /dev/null +++ b/edx/analytics/tasks/util/fast_json.py @@ -0,0 +1,36 @@ +""" +Provide an abstraction layer for fast json implementations across python 2 and 3. +""" +try: + import ujson + ujson_found = True +except ImportError: + import cjson + ujson_found = False + +class FastJson(object): + """ + Abstraction layer on top of cjson (python 2 only) and ujson (python 3 only). + """ + @staticmethod + def dumps(obj): + """ + Dump/encode the Python object into a JSON message. + """ + if ujson_found: + return ujson.dumps(obj) + else: + return cjson.encode(obj) + + @staticmethod + def loads(msg): + """ + Load/decode the JSON message and return a Python object. + + All strings in the decoded object will be unicode strings! This + matches the behavior of python's built-in json library. + """ + if ujson_found: + return ujson.loads(msg) + else: + return cjson.decode(msg, all_unicode=True) diff --git a/edx/analytics/tasks/util/hive.py b/edx/analytics/tasks/util/hive.py index cbe434ac7d..3c15bb6660 100644 --- a/edx/analytics/tasks/util/hive.py +++ b/edx/analytics/tasks/util/hive.py @@ -1,4 +1,5 @@ """Various helper utilities that are commonly used when working with Hive""" +from __future__ import absolute_import import logging import textwrap diff --git a/edx/analytics/tasks/util/id_codec.py b/edx/analytics/tasks/util/id_codec.py index 79ba02233c..fca48da4e6 100644 --- a/edx/analytics/tasks/util/id_codec.py +++ b/edx/analytics/tasks/util/id_codec.py @@ -1,10 +1,13 @@ """Various helper utilities to calculate reversible one-to-one mappings of sensitive ids.""" +from __future__ import absolute_import import base64 import logging import random import luigi +from six.moves import map +from six.moves import range try: import numpy as np @@ -15,13 +18,17 @@ def encode_id(scope, id_type, id_value): - """Encode a scope-type-value tuple into a single ID string.""" - return base64.b32encode('|'.join([scope, id_type, id_value])) + """ + Encode a scope-type-value tuple into a single ID string. + + All inputs must be bytestrings (or `str` in python 2). + """ + return base64.b32encode(b'|'.join([scope, id_type, id_value])) def decode_id(encoded_id): """Decode an ID string back to the original scope-type-value tuple.""" - scope, id_type, id_value = base64.b32decode(encoded_id).split('|') + scope, id_type, id_value = base64.b32decode(encoded_id).split(b'|') return scope, id_type, id_value @@ -49,7 +56,7 @@ def random_permutation_matrix(self, seed, matrix_dim): """Return a random permutation matrix of dimension matrix_dim using seed.""" rng = random.Random(seed) # Decide where each bit goes. - mapping = range(matrix_dim) + mapping = list(range(matrix_dim)) rng.shuffle(mapping) # Then make a matrix that does that. permutation = np.zeros((matrix_dim, matrix_dim), dtype=int) diff --git a/edx/analytics/tasks/util/obfuscate_util.py b/edx/analytics/tasks/util/obfuscate_util.py index 0d6e7f1942..03a369add9 100644 --- a/edx/analytics/tasks/util/obfuscate_util.py +++ b/edx/analytics/tasks/util/obfuscate_util.py @@ -434,7 +434,7 @@ def find_name_context(text, log_context=DEFAULT_LOG_CONTEXT): # Find phone numbers. PHONE_CONTEXT = re.compile( - r'(\bphone:|\bp:|b\c:|\bcall me\b|\(home\)|\(cell\)|my phone|phone number)', + r'(\bphone:|\bp:|\bc:|\bcall me\b|\(home\)|\(cell\)|my phone|phone number)', re.IGNORECASE, ) diff --git a/edx/analytics/tasks/util/opaque_key_util.py b/edx/analytics/tasks/util/opaque_key_util.py index 7904384017..fccc45ef1d 100644 --- a/edx/analytics/tasks/util/opaque_key_util.py +++ b/edx/analytics/tasks/util/opaque_key_util.py @@ -1,4 +1,5 @@ """Utility functions that wrap opaque_keys in useful ways.""" +from __future__ import absolute_import import logging import re @@ -6,6 +7,7 @@ from opaque_keys import InvalidKeyError from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import CourseLocator +import six log = logging.getLogger(__name__) @@ -84,7 +86,7 @@ def get_filename_safe_course_id(course_id, replacement_char='_'): # The safest characters are A-Z, a-z, 0-9, , and . # We represent the first four with \w. # TODO: Once we support courses with unicode characters, we will need to revisit this. - return re.sub(r'[^\w\.\-]', unicode(replacement_char), filename) + return re.sub(r'[^\w\.\-]', six.text_type(replacement_char), filename) def get_course_key_from_url(url): diff --git a/edx/analytics/tasks/util/overwrite.py b/edx/analytics/tasks/util/overwrite.py index 68a264a9a1..c336b974fb 100644 --- a/edx/analytics/tasks/util/overwrite.py +++ b/edx/analytics/tasks/util/overwrite.py @@ -1,6 +1,8 @@ """ Provide support for overwriting existing output files. """ +from __future__ import absolute_import + import logging import luigi diff --git a/edx/analytics/tasks/util/record.py b/edx/analytics/tasks/util/record.py index 7250a23864..e16b5d92d7 100644 --- a/edx/analytics/tasks/util/record.py +++ b/edx/analytics/tasks/util/record.py @@ -18,7 +18,7 @@ bigquery_available = False # pylint: disable=invalid-name -DEFAULT_NULL_VALUE = '\\N' # This is the default string used by Hive to represent a NULL value. +DEFAULT_NULL_VALUE = r'\N' # This is the default string used by Hive to represent a NULL value. log = logging.getLogger(__name__) diff --git a/edx/analytics/tasks/util/retry.py b/edx/analytics/tasks/util/retry.py index d66c8c16a4..62c21e8a0d 100644 --- a/edx/analytics/tasks/util/retry.py +++ b/edx/analytics/tasks/util/retry.py @@ -1,4 +1,5 @@ """Utility decorator for retrying functions that fail.""" +from __future__ import absolute_import import logging import time diff --git a/edx/analytics/tasks/util/s3_util.py b/edx/analytics/tasks/util/s3_util.py index 280820a269..40ffd4fce0 100644 --- a/edx/analytics/tasks/util/s3_util.py +++ b/edx/analytics/tasks/util/s3_util.py @@ -5,7 +5,7 @@ import os import time from fnmatch import fnmatch -from urlparse import urlparse, urlunparse +from six.moves.urllib.parse import urlparse, urlunparse from luigi.contrib.hdfs.format import Plain from luigi.contrib.hdfs.target import HdfsTarget diff --git a/edx/analytics/tasks/util/tests/config.py b/edx/analytics/tasks/util/tests/config.py index c7eb87d2c1..6f1885d6fa 100644 --- a/edx/analytics/tasks/util/tests/config.py +++ b/edx/analytics/tasks/util/tests/config.py @@ -1,8 +1,10 @@ """Support modifying luigi configuration settings in tests.""" +from __future__ import absolute_import from functools import wraps from luigi.configuration import LuigiConfigParser +import six def with_luigi_config(*decorator_args): @@ -65,7 +67,7 @@ def modify_config(section, option, value): new_instance.set(section, option, str(value)) # Support the single override case: @with_luigi_config('section', 'option', 'value') - if isinstance(decorator_args[0], basestring): + if isinstance(decorator_args[0], six.string_types): section, option, value = decorator_args modify_config(section, option, value) else: diff --git a/edx/analytics/tasks/util/tests/opaque_key_mixins.py b/edx/analytics/tasks/util/tests/opaque_key_mixins.py index be874e34d7..be70dafe8b 100644 --- a/edx/analytics/tasks/util/tests/opaque_key_mixins.py +++ b/edx/analytics/tasks/util/tests/opaque_key_mixins.py @@ -1,6 +1,8 @@ """Mixin classes for providing opaque or legacy key values.""" +from __future__ import absolute_import from opaque_keys.edx.locator import CourseLocator +import six class InitializeOpaqueKeysMixin(object): @@ -9,13 +11,13 @@ class InitializeOpaqueKeysMixin(object): def initialize_ids(self): """Define set of id values for use in tests.""" course_key = CourseLocator(org=u'FooX\u00e9', course='1.23x', run='2013_Spring') - self.course_id = unicode(course_key) + self.course_id = six.text_type(course_key) self.encoded_course_id = self.course_id.encode('utf8') self.org_id = course_key.org self.encoded_org_id = self.org_id.encode('utf8') block_id = "9cee77a606ea4c1aa5440e0ea5d0f618" - self.problem_id = unicode(course_key.make_usage_key("problem", block_id)) + self.problem_id = six.text_type(course_key.make_usage_key("problem", block_id)) self.encoded_problem_id = self.problem_id.encode('utf8') self.answer_id = "{block_id}_2_1".format(block_id=block_id) self.second_answer_id = "{block_id}_3_1".format(block_id=block_id) diff --git a/edx/analytics/tasks/util/tests/target.py b/edx/analytics/tasks/util/tests/target.py index 5f6dfd4ca6..e237cd7826 100644 --- a/edx/analytics/tasks/util/tests/target.py +++ b/edx/analytics/tasks/util/tests/target.py @@ -1,14 +1,18 @@ """ Emulates a luigi target, storing all data in memory. """ +from __future__ import absolute_import from contextlib import contextmanager -from StringIO import StringIO +from io import BytesIO +import six class FakeTarget(object): - """Fake Luigi-like target that saves data in memory, using a StringIO buffer.""" - def __init__(self, path=None, value=''): + """ + Fake Luigi-like target that saves data in memory, using a BytesIO buffer. + """ + def __init__(self, path=None, value=b''): self.value = value self.path = path @@ -18,7 +22,7 @@ def value(self): @value.setter def value(self, value): - self.buffer = StringIO(value) + self.buffer = BytesIO(value) # Rewind the buffer head so the value can be read self.buffer.seek(0) diff --git a/edx/analytics/tasks/util/tests/test_hive.py b/edx/analytics/tasks/util/tests/test_hive.py index 081f7c861a..0ac67ec2d7 100644 --- a/edx/analytics/tasks/util/tests/test_hive.py +++ b/edx/analytics/tasks/util/tests/test_hive.py @@ -1,4 +1,5 @@ """Tests for some hive related utilities""" +from __future__ import absolute_import from datetime import date from unittest import TestCase diff --git a/edx/analytics/tasks/util/tests/test_id_codec.py b/edx/analytics/tasks/util/tests/test_id_codec.py index 1fa942c5de..9155ffce95 100644 --- a/edx/analytics/tasks/util/tests/test_id_codec.py +++ b/edx/analytics/tasks/util/tests/test_id_codec.py @@ -1,15 +1,17 @@ """ Tests for encoding/decoding id values. """ +from __future__ import absolute_import + from unittest import TestCase from ddt import data, ddt import edx.analytics.tasks.util.id_codec as id_codec -SCOPE = "Arbitrary Scope" -TYPE = "Arbitrary Type" -VALUE = "Arbitrary Value" +SCOPE = b"Arbitrary Scope" +TYPE = b"Arbitrary Type" +VALUE = b"Arbitrary Value" @ddt @@ -17,15 +19,15 @@ class EncodeDecodeIdTest(TestCase): """Test that encoding works in round-trip.""" @data( - '', + b'', + b'test', u'\ufffd'.encode('utf8'), u'\u00e9'.encode('utf8'), - u'test', ) def test_round_trip(self, suffix): - encoded_id = id_codec.encode_id(SCOPE + suffix, TYPE + suffix, VALUE + suffix) - decoded = id_codec.decode_id(encoded_id) - self.assertEquals((SCOPE + suffix, TYPE + suffix, VALUE + suffix), decoded) + input_id = (SCOPE + suffix, TYPE + suffix, VALUE + suffix) + decoded_id = id_codec.decode_id(id_codec.encode_id(*input_id)) + self.assertEquals(input_id, decoded_id) class PermutationGeneratorTest(TestCase): diff --git a/edx/analytics/tasks/util/tests/test_opaque_key_util.py b/edx/analytics/tasks/util/tests/test_opaque_key_util.py index 2bc25fa5f5..862b69c834 100644 --- a/edx/analytics/tasks/util/tests/test_opaque_key_util.py +++ b/edx/analytics/tasks/util/tests/test_opaque_key_util.py @@ -1,6 +1,8 @@ """ Tests for utilities that parse event logs. """ +from __future__ import absolute_import + from unittest import TestCase from ccx_keys.locator import CCXLocator @@ -8,14 +10,15 @@ from opaque_keys.edx.locator import CourseLocator import edx.analytics.tasks.util.opaque_key_util as opaque_key_util +import six -VALID_COURSE_ID = unicode(CourseLocator(org='org', course='course_id', run='course_run')) +VALID_COURSE_ID = six.text_type(CourseLocator(org='org', course='course_id', run='course_run')) VALID_LEGACY_COURSE_ID = "org/course_id/course_run" INVALID_LEGACY_COURSE_ID = "org:course_id:course_run" INVALID_NONASCII_LEGACY_COURSE_ID = u"org/course\ufffd_id/course_run" VALID_NONASCII_LEGACY_COURSE_ID = u"org/cours\u00e9_id/course_run" -VALID_CCX_COURSE_ID = unicode(CCXLocator(org='org', course='course_id', run='course_run', ccx='13')) -COURSE_ID_WITH_COLONS = unicode(CourseLocator(org='org', course='course:id', run='course:run')) +VALID_CCX_COURSE_ID = six.text_type(CCXLocator(org='org', course='course_id', run='course_run', ccx='13')) +COURSE_ID_WITH_COLONS = six.text_type(CourseLocator(org='org', course='course:id', run='course:run')) @ddt @@ -97,7 +100,7 @@ def test_get_filename_with_default_separator(self, course_id, expected_filename, def test_get_course_key_from_url(self, course_id): url = u"https://courses.edx.org/courses/{course_id}/stuff".format(course_id=course_id) course_key = opaque_key_util.get_course_key_from_url(url) - self.assertEquals(unicode(course_key), course_id) + self.assertEquals(six.text_type(course_key), course_id) @data( INVALID_LEGACY_COURSE_ID, diff --git a/edx/analytics/tasks/util/tests/test_record.py b/edx/analytics/tasks/util/tests/test_record.py index 1daaff573a..cbf89408d1 100644 --- a/edx/analytics/tasks/util/tests/test_record.py +++ b/edx/analytics/tasks/util/tests/test_record.py @@ -134,7 +134,7 @@ def test_to_string_tuple_nulls(self): test_record = SampleStruct(None, 0, None) self.assertEqual( test_record.to_string_tuple(), - ('\\N', '0', '\\N') + (r'\N', '0', r'\N') ) def test_to_string_tuple_custom_nulls(self): @@ -152,7 +152,7 @@ def test_from_string_tuple(self): self.assertEqual(test_record.date, datetime.date(2015, 11, 1)) def test_from_string_tuple_nulls(self): - string_tuple = ('\\N', '0', '2015-11-01') + string_tuple = (r'\N', '0', '2015-11-01') test_record = SampleStruct.from_string_tuple(string_tuple) self.assertEqual(test_record.name, None) self.assertEqual(test_record.index, 0) diff --git a/edx/analytics/tasks/util/tests/test_retry.py b/edx/analytics/tasks/util/tests/test_retry.py index df28799dbf..198d320718 100644 --- a/edx/analytics/tasks/util/tests/test_retry.py +++ b/edx/analytics/tasks/util/tests/test_retry.py @@ -1,5 +1,8 @@ """Test the retry decorator""" +from __future__ import absolute_import + +import six from datetime import datetime, timedelta from unittest import TestCase @@ -73,7 +76,7 @@ def some_func(): some_func() self.assertEqual(self.func_call_counter, 4) - self.assertItemsEqual(self.mock_sleep.mock_calls, [call(0.5), call(1), call(2)]) + six.assertCountEqual(self, self.mock_sleep.mock_calls, [call(0.5), call(1), call(2)]) def test_different_base_delay(self): @@ -85,7 +88,7 @@ def some_func(): raise Exception('error') some_func() - self.assertItemsEqual(self.mock_sleep.mock_calls, [call(1), call(2), call(4), call(8)]) + six.assertCountEqual(self, self.mock_sleep.mock_calls, [call(1), call(2), call(4), call(8)]) def test_fatal_exception(self): diff --git a/edx/analytics/tasks/util/tests/test_s3_util.py b/edx/analytics/tasks/util/tests/test_s3_util.py index 68e4c39120..c6504ca656 100644 --- a/edx/analytics/tasks/util/tests/test_s3_util.py +++ b/edx/analytics/tasks/util/tests/test_s3_util.py @@ -1,10 +1,10 @@ """ Tests for S3-related utility functionality. """ -from __future__ import print_function +from __future__ import absolute_import, print_function +import six from unittest import TestCase - from ddt import data, ddt, unpack from mock import MagicMock @@ -33,7 +33,7 @@ def _make_s3_generator(self, bucket_name, root, path_info, patterns): s3_bucket = MagicMock() s3_conn.get_bucket = MagicMock(return_value=s3_bucket) target_list = [self._make_key("{root}/{path}".format(root=root, path=path), size) - for path, size in path_info.iteritems()] + for path, size in six.iteritems(path_info)] s3_bucket.list = MagicMock(return_value=target_list) print([(k.key, k.size) for k in target_list]) diff --git a/edx/analytics/tasks/util/tests/test_url.py b/edx/analytics/tasks/util/tests/test_url.py index e036dcd17b..eca1298e87 100644 --- a/edx/analytics/tasks/util/tests/test_url.py +++ b/edx/analytics/tasks/util/tests/test_url.py @@ -1,4 +1,5 @@ """Tests for URL-related functionality.""" +from __future__ import absolute_import from unittest import TestCase diff --git a/edx/analytics/tasks/util/url.py b/edx/analytics/tasks/util/url.py index 1d70af0391..94618337c4 100644 --- a/edx/analytics/tasks/util/url.py +++ b/edx/analytics/tasks/util/url.py @@ -13,7 +13,8 @@ import logging import os import time -import urlparse +from six.moves.urllib.parse import urlparse, urlunparse +import six import luigi import luigi.configuration @@ -124,7 +125,7 @@ def open(self, mode='r'): def get_target_class_from_url(url, marker=False): """Returns a luigi target class based on the url scheme""" - parsed_url = urlparse.urlparse(url) + parsed_url = urlparse(url) if marker: target_class = URL_SCHEME_TO_MARKER_TARGET_CLASS.get(parsed_url.scheme, DEFAULT_MARKER_TARGET_CLASS) @@ -174,6 +175,13 @@ def url_path_join(url, *extra_path): Returns: The URL with the path component joined with `extra_path` argument. """ - (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url) + (scheme, netloc, path, params, query, fragment) = urlparse(url) joined_path = os.path.join(path, *extra_path) - return urlparse.urlunparse((scheme, netloc, joined_path, params, query, fragment)) + return urlunparse(( + six.text_type(scheme), + six.text_type(netloc), + six.text_type(joined_path), + six.text_type(params), + six.text_type(query), + six.text_type(fragment), + )) diff --git a/edx/analytics/tasks/util/vertica_target.py b/edx/analytics/tasks/util/vertica_target.py index 6d72978403..743273f9c6 100644 --- a/edx/analytics/tasks/util/vertica_target.py +++ b/edx/analytics/tasks/util/vertica_target.py @@ -146,7 +146,8 @@ def create_marker_table(self): """.format(marker_schema=self.marker_schema, marker_table=self.marker_table) ) except vertica_python.errors.QueryError as err: - if 'Sqlstate: 42710' in err.args[0]: # This Sqlstate will appear if the marker table already exists. + # Sqlstate 42710 will appear if the marker table already exists. + if 'Sqlstate:' in err.args[0] and '42710' in err.args[0]: pass else: raise diff --git a/edx/analytics/tasks/warehouse/financial/affiliate_window.py b/edx/analytics/tasks/warehouse/financial/affiliate_window.py index 1af075c8ae..5bd6526f2a 100644 --- a/edx/analytics/tasks/warehouse/financial/affiliate_window.py +++ b/edx/analytics/tasks/warehouse/financial/affiliate_window.py @@ -1,6 +1,8 @@ """ Tasks to support pulling Affiliate Window reports from their REST API to the data warehouse. """ +from __future__ import absolute_import, print_function + import csv import datetime import json @@ -251,7 +253,7 @@ def run(self): json.dumps(row) ] - result = [col if col is not None else '\N' for col in result] + result = [col if col is not None else r'\N' for col in result] writer.writerow(result) def output(self): diff --git a/edx/analytics/tasks/warehouse/financial/cybersource.py b/edx/analytics/tasks/warehouse/financial/cybersource.py index 80d9eb5363..ab7cf0637b 100644 --- a/edx/analytics/tasks/warehouse/financial/cybersource.py +++ b/edx/analytics/tasks/warehouse/financial/cybersource.py @@ -1,10 +1,14 @@ """Collect information about payments from third-party sources for financial reporting.""" +from __future__ import absolute_import +import codecs import csv import datetime import logging import os +from CyberSource import ReportDownloadsApi +from CyberSource.rest import ApiException import luigi import requests from luigi import date_interval @@ -37,9 +41,8 @@ def __init__(self, *args, **kwargs): config = get_config() section_name = 'cybersource:' + self.merchant_id - self.host = config.get(section_name, 'host') - self.username = config.get(section_name, 'username') - self.password = config.get(section_name, 'password') + self.keyid = config.get(section_name, 'keyid') + self.secretkey = config.get(section_name, 'secretkey') self.interval_start = luigi.DateParameter().parse(config.get(section_name, 'interval_start')) self.merchant_close_date = None @@ -70,26 +73,45 @@ class DailyPullFromCybersourceTask(PullFromCybersourceTaskMixin, luigi.Task): # This is the table that we had been using for gathering and # storing historical Cybersource data. It adds one additional # column over the 'PaymentBatchDetailReport' format. - REPORT_NAME = 'PaymentSubmissionDetailReport' + REPORT_NAME = 'PaymentSubmissionDetailReport_Daily_Classic' REPORT_FORMAT = 'csv' + # Specify the production/live environemnt for report downloads. + ENVIRONMENT = 'CyberSource.Environment.PRODUCTION' def requires(self): pass def run(self): self.remove_output_on_overwrite() - auth = (self.username, self.password) - response = requests.get(self.query_url, auth=auth) - if response.status_code != requests.codes.ok: # pylint: disable=no-member - msg = "Encountered status {} on request to Cybersource for {}".format(response.status_code, self.run_date) - raise Exception(msg) + merchant_config = { + 'authentication_type': 'http_signature', + 'merchantid': self.merchant_id, + 'run_environment': self.ENVIRONMENT, + 'merchant_keyid': self.keyid, + 'merchant_secretkey': self.secretkey, + 'enable_log': False, + } + report_download_obj = ReportDownloadsApi(merchant_config) + + # With the REST API, to get the same report we got with the servlets method, we need to specify `date + 1`. + batch_date = self.run_date + datetime.timedelta(days=1) + try: + return_data, status, body = report_download_obj.download_report( + batch_date.isoformat(), + self.REPORT_NAME, + organization_id=self.merchant_id + ) + except ApiException as e: + log.error("Exception while downloading report for date(%s): %s", batch_date.isoformat(), e) + raise - # if there are no transactions in response, there will be no merchant id. - if self.merchant_id not in response.content and not self.is_empty_transaction_allowed: - raise Exception('No transactions to process.') + # Cybersource REST API returns status as either 200(OK), 400(Invalid request) or 404(No reports found). + if status != requests.codes.ok: + msg = "Encountered status {} on request to Cybersource for {}".format(status, batch_date) + raise Exception(msg) with self.output().open('w') as output_file: - output_file.write(response.content) + output_file.write(body.encode('utf-8')) def output(self): """Output is in the form {output_root}/cybersource/{CCYY-mm}/cybersource_{merchant}_{CCYYmmdd}.csv""" @@ -103,22 +125,11 @@ def output(self): url_with_filename = url_path_join(self.output_root, "cybersource", month_year_string, filename) return get_target_from_url(url_with_filename) - @property - def query_url(self): - """Generate the url to download a report from a Cybersource account.""" - slashified_date = self.run_date.strftime('%Y/%m/%d') # pylint: disable=no-member - url = 'https://{host}/DownloadReport/{date}/{merchant_id}/{report_name}.{report_format}'.format( - host=self.host, - date=slashified_date, - merchant_id=self.merchant_id, - report_name=self.REPORT_NAME, - report_format=self.REPORT_FORMAT - ) - return url - TRANSACTION_TYPE_MAP = { 'ics_bill': 'sale', + 'ics_auth,ics_bill': 'sale', + 'ics_bill, ics_auth': 'sale', 'ics_credit': 'refund' } @@ -156,16 +167,25 @@ def run(self): # Skip the first line, which provides information about the source # of the file. The second line should define the column headings. _download_header = input_file.readline() - reader = csv.DictReader(input_file, delimiter=',') + reader = csv.DictReader(codecs.iterdecode(input_file, 'utf-8'), delimiter=',') with self.output().open('w') as output_file: for row in reader: # Output most of the fields from the original source. # The values not included are: # batch_id: CyberSource batch in which the transaction was sent. # payment_processor: code for organization that processes the payment. + + # Cybersource servlets reports used to return a negative amount in case of a refund. + # However, the REST API does not, so we manually prepend a minus(-). + transaction_type = self.get_transaction_type(row['ics_applications']) + if transaction_type == 'refund' and float(row['amount']) > 0: + row_amount = '-' + row['amount'] + else: + row_amount = row['amount'] + result = [ - # Date - row['batch_date'], + # Date(Using the REST API Cybersource returns the timestamp). + row['batch_date'].split('T')[0], # Name of system. 'cybersource', # CyberSource merchant ID used for the transaction. @@ -176,19 +196,19 @@ def run(self): row['merchant_ref_number'], # ISO currency code used for the transaction. row['currency'], - row['amount'], + row_amount, # Transaction fee - '\\N', - TRANSACTION_TYPE_MAP[row['transaction_type']], + r'\N', + transaction_type, # We currently only process credit card transactions with Cybersource 'credit_card', # Type of credit card used - row['payment_method'].lower().replace(' ', '_'), + row['payment_type'].lower().replace(' ', '_'), # Identifier for the transaction. row['request_id'], ] - output_file.write('\t'.join(result)) - output_file.write('\n') + output_file.write(b'\t'.join(field.encode('utf-8') for field in result)) + output_file.write(b'\n') def output(self): """ @@ -202,6 +222,12 @@ def output(self): url_with_filename = url_path_join(self.output_root, "payments", partition_path_spec, filename) return get_target_from_url(url_with_filename) + def get_transaction_type(self, ics_applications): + if 'ics_bill' in ics_applications: + return 'sale' + elif 'ics_credit' in ics_applications: + return 'refund' + class IntervalPullFromCybersourceTask(PullFromCybersourceTaskMixin, WarehouseMixin, luigi.WrapperTask): """Determines a set of dates to pull, and requires them.""" @@ -229,10 +255,12 @@ def __init__(self, *args, **kwargs): path_targets = PathSetTask([path], include=[file_pattern], include_zero_length=True).output() paths = list(set([os.path.dirname(target.path) for target in path_targets])) dates = [path.rsplit('/', 2)[-1] for path in paths] - latest_date = sorted(dates)[-1] - - latest_completion_date = datetime.datetime.strptime(latest_date, "dt=%Y-%m-%d").date() - run_date = latest_completion_date + datetime.timedelta(days=1) + if dates: + latest_date = sorted(dates)[-1] + latest_completion_date = datetime.datetime.strptime(latest_date, "dt=%Y-%m-%d").date() + run_date = latest_completion_date + datetime.timedelta(days=1) + else: + run_date = self.interval_start # Limit intervals to merchant account close date(if any). if self.merchant_close_date: diff --git a/edx/analytics/tasks/warehouse/financial/ed_services_financial_report.py b/edx/analytics/tasks/warehouse/financial/ed_services_financial_report.py index a14086e4fb..2e66ff74df 100644 --- a/edx/analytics/tasks/warehouse/financial/ed_services_financial_report.py +++ b/edx/analytics/tasks/warehouse/financial/ed_services_financial_report.py @@ -1,4 +1,6 @@ """Generates a financial report to be delivered to our good friends in Ed Services.""" +from __future__ import absolute_import + import luigi from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin diff --git a/edx/analytics/tasks/warehouse/financial/fees.py b/edx/analytics/tasks/warehouse/financial/fees.py index ab2530828d..61ea719f1d 100644 --- a/edx/analytics/tasks/warehouse/financial/fees.py +++ b/edx/analytics/tasks/warehouse/financial/fees.py @@ -1,8 +1,9 @@ """ Tasks associated with pulling and storing financial fees related data. """ -import logging +from __future__ import absolute_import +import logging import luigi from edx.analytics.tasks.common.vertica_load import VerticaCopyTask diff --git a/edx/analytics/tasks/warehouse/financial/finance_reports.py b/edx/analytics/tasks/warehouse/financial/finance_reports.py index ef8bb5b21e..6af1bb4a59 100644 --- a/edx/analytics/tasks/warehouse/financial/finance_reports.py +++ b/edx/analytics/tasks/warehouse/financial/finance_reports.py @@ -1,4 +1,6 @@ """Provide entry-point for generating finance reports.""" +from __future__ import absolute_import + import luigi from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin diff --git a/edx/analytics/tasks/warehouse/financial/orders_import.py b/edx/analytics/tasks/warehouse/financial/orders_import.py index 2f8ba06a19..6ad2504419 100644 --- a/edx/analytics/tasks/warehouse/financial/orders_import.py +++ b/edx/analytics/tasks/warehouse/financial/orders_import.py @@ -1,4 +1,5 @@ """Import Orders: Shopping Cart Tables from the LMS, Orders from Otto.""" +from __future__ import absolute_import import luigi @@ -298,7 +299,7 @@ def insert_query(self): -- the complete line item quantity and amount IF(oi.status = 'refunded', oi.qty * oi.unit_cost, NULL) AS refunded_amount, IF(oi.status = 'refunded', oi.qty, NULL) AS refunded_quantity, - oi.order_id AS payment_ref_id, + CAST(oi.order_id AS STRING) AS payment_ref_id, -- The partner short code is extracted from the course ID during order reconciliation. '' AS partner_short_code, @@ -612,7 +613,7 @@ def insert_query(self): -- the complete line item quantity and amount IF(oi.status = 'refunded', oi.qty * oi.unit_cost, NULL) AS refunded_amount, IF(oi.status = 'refunded', oi.qty, NULL) AS refunded_quantity, - oi.order_id AS payment_ref_id, + CAST(oi.order_id AS STRING) AS payment_ref_id, -- The partner short code is extracted from the course ID during order reconciliation. '' AS partner_short_code, diff --git a/edx/analytics/tasks/warehouse/financial/payment.py b/edx/analytics/tasks/warehouse/financial/payment.py index 5f98c2a436..19f43debce 100644 --- a/edx/analytics/tasks/warehouse/financial/payment.py +++ b/edx/analytics/tasks/warehouse/financial/payment.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import import luigi diff --git a/edx/analytics/tasks/warehouse/financial/paypal.py b/edx/analytics/tasks/warehouse/financial/paypal.py index 3d54fdf536..b6fe96be57 100644 --- a/edx/analytics/tasks/warehouse/financial/paypal.py +++ b/edx/analytics/tasks/warehouse/financial/paypal.py @@ -5,6 +5,7 @@ https://developer.paypal.com/docs/classic/payflow/reporting/ """ +from __future__ import absolute_import import datetime import logging @@ -12,7 +13,7 @@ import time import xml.etree.cElementTree as ET from collections import OrderedDict, namedtuple -from cStringIO import StringIO +from io import BytesIO from decimal import Decimal import luigi @@ -24,6 +25,8 @@ from edx.analytics.tasks.util.hive import WarehouseMixin from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url, url_path_join +import six +from six.moves import range log = logging.getLogger(__name__) @@ -180,7 +183,10 @@ class PaypalMalformedResponseError(PaypalError): def __init__(self, message, root_node=None): with_tree = message if root_node: - with_tree = message + ':' + ET.tostring(root_node, encoding='UTF-8', method='xml') + with_tree = u'{}:{}'.format( + message, + ET.tostring(root_node, encoding='UTF-8', method='xml').decode('utf-8'), + ) super(PaypalMalformedResponseError, self).__init__(with_tree) @@ -216,11 +222,11 @@ def create_request_document(self): self.append_request_node(root_node) # NOTE: we have to use this API to get the XML declaration, it is suboptimal that we have to construct a - # StringIO buffer to write to. + # BytesIO buffer to write to. tree = ET.ElementTree(root_node) - string_buffer = StringIO() - tree.write(string_buffer, encoding='UTF-8', xml_declaration=True) - return string_buffer.getvalue() + bytes_buffer = BytesIO() + tree.write(bytes_buffer, encoding='UTF-8', xml_declaration=True) + return bytes_buffer.getvalue() def append_authentication_node(self, root_node): """Inject the authentication elements into the request.""" @@ -228,7 +234,7 @@ def append_authentication_node(self, root_node): for attribute in ('user', 'vendor', 'partner', 'password'): child_node = ET.SubElement(auth_node, attribute) - child_node.text = unicode(getattr(self, attribute)) + child_node.text = six.text_type(getattr(self, attribute)) def append_request_node(self, root_node): """Inject the request-specific elements into the request.""" @@ -339,17 +345,17 @@ def append_request_node(self, root_node): # WARNING: the paypal XML parser is position sensitive. Do NOT change the ordering of the fields in the request. request_node = ET.SubElement(root_node, 'runReportRequest') name_node = ET.SubElement(request_node, 'reportName') - name_node.text = unicode(self.report_name) + name_node.text = six.text_type(self.report_name) - for param_name, param_value in self.report_params.iteritems(): + for param_name, param_value in six.iteritems(self.report_params): param_node = ET.SubElement(request_node, 'reportParam') param_name_node = ET.SubElement(param_node, 'paramName') - param_name_node.text = unicode(param_name) + param_name_node.text = six.text_type(param_name) param_value_node = ET.SubElement(param_node, 'paramValue') - param_value_node.text = unicode(param_value) + param_value_node.text = six.text_type(param_value) page_size_node = ET.SubElement(request_node, 'pageSize') - page_size_node.text = unicode(self.page_size) + page_size_node.text = six.text_type(self.page_size) ColumnMetadata = namedtuple('ColumnMetadata', ('name', 'data_type')) # pylint: disable=invalid-name @@ -411,7 +417,7 @@ def __init__(self, report_id): def append_request_node(self, root_node): request_node = ET.SubElement(root_node, 'getMetaDataRequest') report_id_node = ET.SubElement(request_node, 'reportId') - report_id_node.text = unicode(self.report_id) + report_id_node.text = six.text_type(self.report_id) class PaypalReportDataResponse(PaypalApiResponse): @@ -465,9 +471,9 @@ def __init__(self, report_id, page_num=1): def append_request_node(self, root_node): request_node = ET.SubElement(root_node, 'getDataRequest') report_id_node = ET.SubElement(request_node, 'reportId') - report_id_node.text = unicode(self.report_id) + report_id_node.text = six.text_type(self.report_id) page_num_node = ET.SubElement(request_node, 'pageNum') - page_num_node.text = unicode(self.page_num) + page_num_node.text = six.text_type(self.page_num) class PaypalReportResultsRequest(PaypalApiRequest): @@ -487,7 +493,7 @@ def __init__(self, report_id): def append_request_node(self, root_node): request_node = ET.SubElement(root_node, 'getResultsRequest') report_id_node = ET.SubElement(request_node, 'reportId') - report_id_node.text = unicode(self.report_id) + report_id_node.text = six.text_type(self.report_id) BaseSettlementReportRecord = namedtuple('SettlementReportRecord', [ # pylint: disable=invalid-name @@ -655,7 +661,9 @@ def write_transaction_record(self, row, output_tsv_file): # identifier for the transaction payment_record.paypal_transaction_id, ] - output_tsv_file.write('\t'.join(record) + '\n') + # output_tsv_file.write(b'\t'.join(field.encode('utf-8') for field in record) + b'\n') + # Apparently the write wants str, not bytes. + output_tsv_file.write('\t'.join(field for field in record) + '\n') def output(self): # NOTE: both the cybersource and paypal tasks write to the payments folder @@ -708,10 +716,12 @@ def __init__(self, *args, **kwargs): path_targets = PathSetTask([path], include=['*paypal.tsv']).output() paths = list(set([os.path.dirname(target.path) for target in path_targets])) dates = [path.rsplit('/', 2)[-1] for path in paths] - latest_date = sorted(dates)[-1] - - latest_completion_date = datetime.datetime.strptime(latest_date, "dt=%Y-%m-%d").date() - run_date = latest_completion_date + datetime.timedelta(days=1) + if dates: + latest_date = sorted(dates)[-1] + latest_completion_date = datetime.datetime.strptime(latest_date, "dt=%Y-%m-%d").date() + run_date = latest_completion_date + datetime.timedelta(days=1) + else: + run_date = self.interval_start self.selection_interval = date_interval.Custom(self.interval_start, run_date) self.run_interval = date_interval.Custom(run_date, self.interval_end) diff --git a/edx/analytics/tasks/warehouse/financial/reconcile.py b/edx/analytics/tasks/warehouse/financial/reconcile.py index a62bf603e9..12afbced8c 100644 --- a/edx/analytics/tasks/warehouse/financial/reconcile.py +++ b/edx/analytics/tasks/warehouse/financial/reconcile.py @@ -1,4 +1,5 @@ """Perform reconciliation of transaction history against order history""" +from __future__ import absolute_import import csv import json @@ -153,7 +154,8 @@ def __init__(self, *args, **kwargs): def requires(self): yield ( OrderTableTask( - import_date=self.import_date + import_date=self.import_date, + verbose=True, ), PaymentTask( import_date=self.import_date, @@ -169,7 +171,7 @@ def mapper(self, line): key_index = ORDERITEM_FIELDS.index('payment_ref_id') key = fields[key_index] - # Convert Hive null values ('\\N') in fields like 'product_detail': + # Convert Hive null values ("\N") in fields like 'product_detail': defaults = ( ('product_detail', ''), ('refunded_amount', '0.0'), @@ -184,7 +186,7 @@ def mapper(self, line): ) for field_name, default_value in defaults: index = ORDERITEM_FIELD_INDICES[field_name] - if fields[index] == '\\N': + if fields[index] == r'\N': fields[index] = default_value elif len(fields) == len(TRANSACTION_FIELDS): @@ -192,7 +194,7 @@ def mapper(self, line): record_type = TransactionRecord.__name__ key = fields[3] # payment_ref_id # Convert nulls in 'transaction_fee'. - if fields[6] == '\\N': + if fields[6] == r'\N': fields[6] = None # Edx-only: if the transaction was within a time period when @@ -677,12 +679,20 @@ def format_transaction_table_output(self, audit_code, transaction, orderitem, tr orderitem.partner_short_code if orderitem else self.default_partner_short_code, orderitem.payment_ref_id if orderitem else transaction.payment_ref_id, orderitem.order_id if orderitem else None, - encode_id(orderitem.order_processor, "order_id", orderitem.order_id) if orderitem else None, + encode_id( + orderitem.order_processor.encode('utf-8'), + b"order_id", + orderitem.order_id.encode('utf-8') + ) if orderitem else None, orderitem.date_placed if orderitem else None, # transaction information transaction.date if transaction else None, transaction.transaction_id if transaction else None, - encode_id(transaction.payment_gateway_id, "transaction_id", transaction.transaction_id) if transaction else None, + encode_id( + transaction.payment_gateway_id.encode('utf-8'), + b"transaction_id", + transaction.transaction_id.encode('utf-8') + ) if transaction else None, transaction.payment_gateway_id if transaction else None, transaction.payment_gateway_account_id if transaction else None, transaction.transaction_type if transaction else None, @@ -695,7 +705,11 @@ def format_transaction_table_output(self, audit_code, transaction, orderitem, tr str(transaction_fee_per_item) if transaction_fee_per_item is not None else None, # orderitem information orderitem.line_item_id if orderitem else None, - encode_id(orderitem.order_processor, "line_item_id", orderitem.line_item_id) if orderitem else None, + encode_id( + orderitem.order_processor.encode('utf-8'), + b"line_item_id", + orderitem.line_item_id.encode('utf-8') + ) if orderitem else None, orderitem.line_item_product_id if orderitem else None, orderitem.line_item_price if orderitem else None, orderitem.line_item_unit_price if orderitem else None, @@ -771,13 +785,13 @@ class OrderTransactionRecord(OrderTransactionRecordBase): def to_tsv(self): """Serializes the record to a TSV-formatted string.""" - return '\t'.join([str(v) if v is not None else "\\N" for v in self]) + return '\t'.join([str(v) if v is not None else r'\N' for v in self]) @staticmethod def from_job_output(tsv_str): """Constructor that reads format generated by to_tsv().""" record = tsv_str.split('\t') - nulled_record = [v if v != "\\N" else None for v in record] + nulled_record = [v if v != r'\N' else None for v in record] return OrderTransactionRecord(*nulled_record) @@ -891,7 +905,7 @@ def run(self): # first load all records in memory so that we can sort them for record_str in self.input().open('r'): - record = OrderTransactionRecord.from_job_output(record_str) + record = OrderTransactionRecord.from_job_output(record_str.decode('utf8')) if record.transaction_date is not None and record.transaction_date != '': # pylint: disable=no-member all_records.append(record) @@ -1121,13 +1135,13 @@ class FullOrderTransactionRecord(FullOrderTransactionRecordBase): def to_tsv(self): """Serializes the record to a TSV-formatted string.""" - return '\t'.join([str(v) if v is not None else "\\N" for v in self]) + return '\t'.join([str(v) if v is not None else r'\N' for v in self]) @staticmethod def from_job_output(tsv_str): """Constructor that reads format generated by to_tsv().""" record = tsv_str.split('\t') - nulled_record = [v if v != "\\N" else None for v in record] + nulled_record = [v if v != r'\N' else None for v in record] return FullOrderTransactionRecord(*nulled_record) @@ -1140,14 +1154,16 @@ class ReconcileFullOrdersAndTransactionsTask(ReconcileOrdersAndTransactionsTask) def requires(self): yield ( FullOttoOrderTableTask( - import_date=self.import_date + import_date=self.import_date, + verbose=True, ), FullShoppingcartOrderTableTask( - import_date=self.import_date + import_date=self.import_date, + verbose=True, ), PaymentTask( import_date=self.import_date, - is_empty_transaction_allowed=self.is_empty_transaction_allowed + is_empty_transaction_allowed=self.is_empty_transaction_allowed, ) ) @@ -1159,7 +1175,7 @@ def mapper(self, line): key_index = FULLORDERITEM_FIELDS.index('payment_ref_id') key = fields[key_index] - # Convert Hive null values ('\\N') in fields like 'product_detail': + # Convert Hive null values ("\N") in fields like 'product_detail': defaults = ( ('product_detail', ''), ('refunded_amount', '0.0'), @@ -1172,7 +1188,7 @@ def mapper(self, line): ) for field_name, default_value in defaults: index = FULLORDERITEM_FIELD_INDICES[field_name] - if fields[index] == '\\N': + if fields[index] == r'\N': fields[index] = default_value elif len(fields) == len(TRANSACTION_FIELDS): @@ -1180,7 +1196,7 @@ def mapper(self, line): record_type = TransactionRecord.__name__ key = fields[3] # payment_ref_id # Convert nulls in 'transaction_fee'. - if fields[6] == '\\N': + if fields[6] == r'\N': fields[6] = None # Edx-only: if the transaction was within a time period when @@ -1238,12 +1254,20 @@ def format_transaction_table_output(self, audit_code, transaction, orderitem, tr orderitem.partner_short_code if orderitem else self.default_partner_short_code, orderitem.payment_ref_id if orderitem else transaction.payment_ref_id, orderitem.order_id if orderitem else None, - encode_id(orderitem.order_processor, "order_id", orderitem.order_id) if orderitem else None, + encode_id( + orderitem.order_processor.encode('utf-8'), + b"order_id", + orderitem.order_id.encode('utf-8'), + ) if orderitem else None, orderitem.date_placed if orderitem else None, # transaction information transaction.date if transaction else None, transaction.transaction_id if transaction else None, - encode_id(transaction.payment_gateway_id, "transaction_id", transaction.transaction_id) if transaction else None, + encode_id( + transaction.payment_gateway_id.encode('utf-8'), + b"transaction_id", + transaction.transaction_id.encode('utf-8'), + ) if transaction else None, transaction.payment_gateway_id if transaction else None, transaction.payment_gateway_account_id if transaction else None, transaction.transaction_type if transaction else None, @@ -1256,7 +1280,11 @@ def format_transaction_table_output(self, audit_code, transaction, orderitem, tr str(transaction_fee_per_item) if transaction_fee_per_item is not None else None, # orderitem information orderitem.line_item_id if orderitem else None, - encode_id(orderitem.order_processor, "line_item_id", orderitem.line_item_id) if orderitem else None, + encode_id( + orderitem.order_processor.encode('utf-8'), + b"line_item_id", + orderitem.line_item_id.encode('utf-8'), + ) if orderitem else None, orderitem.line_item_product_id if orderitem else None, orderitem.line_item_price if orderitem else None, orderitem.line_item_unit_price if orderitem else None, @@ -1417,6 +1445,7 @@ def insert_source_task(self): return ( FullOttoOrderTableTask( import_date=self.import_date, + verbose=True, # DO NOT PASS OVERWRITE FURTHER. We mean for overwrite here # to just apply to the writing to Vertica, not to anything further upstream. # overwrite=self.overwrite, @@ -1439,6 +1468,7 @@ def insert_source_task(self): return ( FullShoppingcartOrderTableTask( import_date=self.import_date, + verbose=True, # DO NOT PASS OVERWRITE FURTHER. We mean for overwrite here # to just apply to the writing to Vertica, not to anything further upstream. # overwrite=self.overwrite, diff --git a/edx/analytics/tasks/warehouse/financial/tests/test_paypal.py b/edx/analytics/tasks/warehouse/financial/tests/test_paypal.py index faed863124..d65244fe2f 100644 --- a/edx/analytics/tasks/warehouse/financial/tests/test_paypal.py +++ b/edx/analytics/tasks/warehouse/financial/tests/test_paypal.py @@ -1,7 +1,8 @@ +from __future__ import absolute_import import xml.etree.cElementTree as ET from collections import OrderedDict -from cStringIO import StringIO +from io import BytesIO from unittest import TestCase import httpretty @@ -16,6 +17,8 @@ PaypalReportMetadataRequest, PaypalReportRequest, PaypalReportResultsRequest, PaypalTimeoutError, PaypalTransactionsByDayTask, SettlementReportRecord ) +import six +from six.moves import zip TEST_URL = 'http://test.api/endpoint' @@ -30,9 +33,9 @@ def setUp(self): def on_post_return_xml(self): element_tree = ET.ElementTree(self.response_xml_root) - string_buffer = StringIO() - element_tree.write(string_buffer, encoding='UTF-8', xml_declaration=True) - response_xml_root_string = string_buffer.getvalue() + bytes_buffer = BytesIO() + element_tree.write(bytes_buffer, encoding='UTF-8', xml_declaration=True) + response_xml_root_string = bytes_buffer.getvalue() httpretty.register_uri(httpretty.POST, TEST_URL, response_xml_root_string) def remove_xml_node(self, path): @@ -42,7 +45,7 @@ def remove_xml_node(self, path): def set_xml_node_text(self, path, value): element = self.response_xml_root.findall(path)[0] - element.text = unicode(value) + element.text = six.text_type(value) def parse_request_xml(self): http_request = httpretty.last_request() @@ -755,9 +758,20 @@ def test_normal_run(self): self.task.run() - expected_record = ['2015-08-28', 'paypal', 'testing', 'EDX-123456', 'USD', '50.00', '1.40', 'sale', - 'instant_transfer', 'paypal', '1FW12345678901234'] - self.assertEquals(self.output_target.value.strip(), '\t'.join(expected_record)) + expected_record = [ + b'2015-08-28', + b'paypal', + b'testing', + b'EDX-123456', + b'USD', + b'50.00', + b'1.40', + b'sale', + b'instant_transfer', + b'paypal', + b'1FW12345678901234', + ] + self.assertEquals(self.output_target.value.strip(), b'\t'.join(expected_record)) @data( (4, 'Report has failed'), @@ -864,9 +878,20 @@ def test_delayed_report(self, mock_time): call(5) ]) - expected_record = ['2015-08-28', 'paypal', 'testing', 'EDX-123456', 'USD', '50.00', '1.40', 'sale', - 'instant_transfer', 'paypal', '1FW12345678901234'] - self.assertEquals(self.output_target.value.strip(), '\t'.join(expected_record)) + expected_record = [ + b'2015-08-28', + b'paypal', + b'testing', + b'EDX-123456', + b'USD', + b'50.00', + b'1.40', + b'sale', + b'instant_transfer', + b'paypal', + b'1FW12345678901234', + ] + self.assertEquals(self.output_target.value.strip(), b'\t'.join(expected_record)) @with_luigi_config('paypal', 'timeout', '1') @patch('edx.analytics.tasks.warehouse.financial.paypal.time') diff --git a/edx/analytics/tasks/warehouse/financial/tests/test_reconcile.py b/edx/analytics/tasks/warehouse/financial/tests/test_reconcile.py index e2f4f07faf..356c473527 100644 --- a/edx/analytics/tasks/warehouse/financial/tests/test_reconcile.py +++ b/edx/analytics/tasks/warehouse/financial/tests/test_reconcile.py @@ -1,4 +1,6 @@ """Tests for Order-transaction reconciliation and reporting.""" +from __future__ import absolute_import + import uuid from unittest import TestCase @@ -10,11 +12,13 @@ LOW_ORDER_ID_SHOPPINGCART_ORDERS, BaseOrderItemRecord, BaseTransactionRecord, OrderItemRecord, OrderTransactionRecord, ReconcileOrdersAndTransactionsTask, TransactionRecord ) +import six +from six.moves import zip TEST_DATE = '2015-06-01' TEST_LATER_DATE = '2015-06-10' DEFAULT_REF_ID = "EDX-12345" -HIVE_NULL = '\\N' +HIVE_NULL = r'\N' FIRST_ORDER_ITEM = '2345678' SECOND_ORDER_ITEM = '2345679' FIRST_TRANSACTION = '123423453456' @@ -206,7 +210,7 @@ def record_sort_key(record): # so that column names can be used instead of numbers. output_dict = record._asdict() # pylint: disable=no-member,protected-access expected_columns.update(**extra_values) - for column_num, expected_value in expected_columns.iteritems(): + for column_num, expected_value in six.iteritems(expected_columns): self.assertEquals(output_dict[column_num], expected_value) def test_no_transaction(self): diff --git a/edx/analytics/tasks/warehouse/lms_courseware_link_clicked.py b/edx/analytics/tasks/warehouse/lms_courseware_link_clicked.py index c945f51558..3cc52ed9af 100644 --- a/edx/analytics/tasks/warehouse/lms_courseware_link_clicked.py +++ b/edx/analytics/tasks/warehouse/lms_courseware_link_clicked.py @@ -2,7 +2,7 @@ Tasks for collecting link click data per course, per day, and uploading that data to Vertica. """ import logging -from urlparse import urlparse +from six.moves.urllib.parse import urlparse import luigi.task diff --git a/edx/analytics/tasks/warehouse/tests/test_course_subjects.py b/edx/analytics/tasks/warehouse/tests/test_course_subjects.py index 591b2ed516..af16b70247 100644 --- a/edx/analytics/tasks/warehouse/tests/test_course_subjects.py +++ b/edx/analytics/tasks/warehouse/tests/test_course_subjects.py @@ -110,8 +110,8 @@ def test_course_no_subjects(self): expected = { 'course_id': 'foo', 'date': '2015-06-25', - 'subject_uri': '\N', # pylint: disable-msg=anomalous-unicode-escape-in-string - 'subject_title': '\N', # pylint: disable-msg=anomalous-unicode-escape-in-string + 'subject_uri': r'\N', # pylint: disable-msg=anomalous-unicode-escape-in-string + 'subject_title': r'\N', # pylint: disable-msg=anomalous-unicode-escape-in-string 'subject_language': 'en', } self.assertTrue(self.check_subject_entry(data, 0, expected)) @@ -250,7 +250,7 @@ def test_catalog_missing_keys(self): 'course_id': 'bar', 'date': '2015-06-25', 'subject_uri': '/course/subject/testing', - 'subject_title': '\N', # pylint: disable-msg=anomalous-unicode-escape-in-string + 'subject_title': r'\N', # pylint: disable-msg=anomalous-unicode-escape-in-string 'subject_language': 'en' } # We expect only one row, a row for the course with a course_id. diff --git a/requirements/base.in b/requirements/base.in index d542595403..48a45f7625 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -3,13 +3,11 @@ # Workaround for https://github.com/ansible/ansible/issues/8875 --no-binary ansible -ansible==1.4.5 # GPL v3 License +ansible<2.9.0 # GPL v3 License boto==2.48.0 # MIT ecdsa==0.13 # MIT Jinja2 # BSD pycrypto==2.6.1 # public domain wheel==0.30.0 - -# Add this here, not because it's needed here, but because -# 1.11.0 will collide with a dependency in default.in. -six==1.10.0 +future # MIT +six # MIT diff --git a/requirements/base.txt b/requirements/base.txt index 10249852c5..237fb7291e 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -6,23 +6,17 @@ # --no-binary ansible -ansible==1.4.5 +ansible==2.8.3 asn1crypto==0.24.0 # via cryptography -bcrypt==3.1.6 # via paramiko boto==2.48.0 -cffi==1.12.3 # via bcrypt, cryptography, pynacl -cryptography==2.6.1 # via paramiko +cffi==1.12.3 # via cryptography +cryptography==2.7 # via ansible ecdsa==0.13 -enum34==1.1.6 # via cryptography -httplib2==0.12.3 # via ansible -ipaddress==1.0.22 # via cryptography +future==0.17.1 jinja2==2.10.1 markupsafe==1.1.1 # via jinja2 -paramiko==2.4.2 # via ansible -pyasn1==0.4.5 # via paramiko pycparser==2.19 # via cffi pycrypto==2.6.1 -pynacl==1.3.0 # via paramiko -pyyaml==5.1 # via ansible -six==1.10.0 +pyyaml==5.1.1 # via ansible +six==1.12.0 wheel==0.30.0 diff --git a/requirements/default.in b/requirements/default.in index 9660a50b47..a07eaa39d1 100644 --- a/requirements/default.in +++ b/requirements/default.in @@ -4,7 +4,7 @@ -r base.txt argparse==1.2.1 # Python Software Foundation License -boto3==1.4.8 # Apache 2.0 +boto3 # Apache 2.0 ciso8601==1.0.3 # MIT edx-ccx-keys==0.2.1 # AGPL edx-opaque-keys==0.4 # AGPL @@ -18,24 +18,27 @@ html5lib==1.0b3 # MIT isoweek==1.3.3 # BSD numpy==1.11.3 # BSD paypalrestsdk==1.9.0 # Paypal SDK License -psycopg2==2.6.2 # LGPL +psycopg2 # LGPL pygeoip==0.3.2 # LGPL -python-cjson==1.1.0 # LGPL +python-cjson; python_version <= "2.7" # LGPL +ujson; python_version > "2.7" # BSD python-dateutil==2.6.1 # BSD # There seems to be an issue(see: https://pagure.io/python-daemon/issue/18) with dependency installation on the # latest version of python-daemon(2.2.0), so we pin it to an earlier version. -python-daemon==2.1.2 +python-daemon python-gnupg==0.3.9 # BSD pytz==2017.3 # ZPL requests==2.18.4 # Apache 2.0 -six==1.10.0 # MIT +six # MIT stevedore==1.19.1 # Apache 2.0 ua-parser==0.3.6 # Apache urllib3==1.22 # MIT user-agents==0.3.2 # MIT vertica-python==0.6.11 # MIT yarn-api-client==0.2.3 # BSD -snowflake-connector-python==1.7.9 +snowflake-connector-python +futures; python_version == "2.7" +cybersource-rest-client-python==0.0.8 -e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi # Apache License 2.0 diff --git a/requirements/default.txt b/requirements/default.txt index 2d0904fe77..2b7bfb5b2d 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -8,35 +8,42 @@ -e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi -e git+https://github.com/edx/pyinstrument.git@a35ff76df4c3d5ff9a2876d859303e33d895e78f#egg=pyinstrument -ansible==1.4.5 +ansible==2.8.3 argparse==1.2.1 asn1crypto==0.24.0 -azure-common==1.1.21 # via azure-storage-blob, azure-storage-common, snowflake-connector-python -azure-nspkg==3.0.2 # via azure-common, azure-storage-nspkg +attrs==19.1.0 # via jsonschema +azure-common==1.1.23 # via azure-storage-blob, azure-storage-common, snowflake-connector-python azure-storage-blob==2.0.1 # via snowflake-connector-python azure-storage-common==2.0.0 # via azure-storage-blob azure-storage-nspkg==3.1.0 # via azure-storage-common backports-abc==0.5 # via tornado -bcrypt==3.1.6 -boto3==1.4.8 +bcrypt==3.1.7 # via cybersource-rest-client-python, paramiko +boto3==1.9.198 boto==2.48.0 -botocore==1.8.50 # via boto3, s3transfer, snowflake-connector-python +botocore==1.12.198 # via boto3, s3transfer, snowflake-connector-python cachetools==3.1.1 # via google-auth -certifi==2019.3.9 # via requests, snowflake-connector-python, tornado +certifi==2019.6.16 # via cybersource-rest-client-python, requests, snowflake-connector-python cffi==1.12.3 -chardet==3.0.4 # via requests +chardet==3.0.4 # via cybersource-rest-client-python, requests ciso8601==1.0.3 -cryptography==2.6.1 +configparser==3.7.4 # via cybersource-rest-client-python +coverage==4.5.3 # via cybersource-rest-client-python +crypto==1.4.1 # via cybersource-rest-client-python +cryptography==2.7 +cybersource-rest-client-python==0.0.8 +datetime==4.3 # via cybersource-rest-client-python distlib==0.2.2 docutils==0.14 # via botocore, python-daemon ecdsa==0.13 edx-ccx-keys==0.2.1 edx-opaque-keys==0.4 elasticsearch==1.7.0 -enum34==1.1.6 +enum34==1.1.6 # via cybersource-rest-client-python +extras==1.0.0 # via cybersource-rest-client-python, python-subunit, testtools filechunkio==1.8 -future==0.17.1 # via snowflake-connector-python, vertica-python -futures==3.2.0 # via azure-storage-blob, google-cloud-core, s3transfer +fixtures==3.0.0 # via cybersource-rest-client-python, testtools +funcsigs==1.0.2 # via cybersource-rest-client-python +future==0.17.1 google-api-python-client==1.7.7 google-auth-httplib2==0.0.3 # via google-api-python-client google-auth==1.6.3 # via google-api-python-client, google-auth-httplib2, google-cloud-bigquery, google-cloud-core @@ -47,49 +54,71 @@ googleapis-common-protos==1.6.0 # via google-cloud-core graphitesend==0.10.0 gspread==3.1.0 html5lib==1.0b3 -httplib2==0.12.3 -idna==2.6 # via requests, snowflake-connector-python -ijson==2.3 # via snowflake-connector-python -ipaddress==1.0.22 +httplib2==0.13.1 # via google-api-python-client, google-auth-httplib2 +idna==2.6 # via cybersource-rest-client-python, requests, snowflake-connector-python +ijson==2.4 # via snowflake-connector-python +ipaddress==1.0.22 # via cybersource-rest-client-python isoweek==1.3.3 jinja2==2.10.1 jmespath==0.9.4 # via boto3, botocore +jsonschema==3.0.1 # via cybersource-rest-client-python +linecache2==1.0.0 # via cybersource-rest-client-python, traceback2 lockfile==0.12.2 # via python-daemon +logger==1.4 # via cybersource-rest-client-python markupsafe==1.1.1 +naked==0.1.31 # via crypto, cybersource-rest-client-python +nose==1.3.7 # via cybersource-rest-client-python numpy==1.11.3 -paramiko==2.4.2 +paramiko==2.6.0 # via cybersource-rest-client-python paypalrestsdk==1.9.0 -pbr==5.2.1 # via stevedore -protobuf==3.8.0 # via google-cloud-core, googleapis-common-protos -psycopg2==2.6.2 -pyasn1-modules==0.2.5 # via google-auth, snowflake-connector-python -pyasn1==0.4.5 +pbr==5.4.1 # via cybersource-rest-client-python, fixtures, stevedore, testtools +protobuf==3.9.0 # via google-cloud-core, googleapis-common-protos +psycopg2==2.8.3 +pyasn1-modules==0.2.5 # via google-auth +pyasn1==0.4.5 # via cybersource-rest-client-python, pyasn1-modules, rsa, x509 pycparser==2.19 pycrypto==2.6.1 -pycryptodomex==3.8.2 # via snowflake-connector-python +pycryptodome==3.8.2 # via cybersource-rest-client-python +pycryptodomex==3.8.2 # via cybersource-rest-client-python, snowflake-connector-python pygeoip==0.3.2 -pyjwt==1.7.1 # via snowflake-connector-python +pyjwt==1.7.1 # via cybersource-rest-client-python, snowflake-connector-python pymongo==3.8.0 # via edx-opaque-keys -pynacl==1.3.0 -pyopenssl==19.0.0 # via paypalrestsdk, snowflake-connector-python -python-cjson==1.1.0 -python-daemon==2.1.2 +pynacl==1.3.0 # via cybersource-rest-client-python, paramiko +pyopenssl==19.0.0 # via cybersource-rest-client-python, paypalrestsdk, snowflake-connector-python +pypi==2.1 # via cybersource-rest-client-python +pyrsistent==0.15.4 # via jsonschema +python-daemon==2.2.3 python-dateutil==2.6.1 python-gnupg==0.3.9 +python-mimeparse==1.6.0 # via cybersource-rest-client-python, testtools +python-subunit==1.3.0 # via cybersource-rest-client-python +python-toolbox==1.0.7 # via cybersource-rest-client-python pytz==2017.3 -pyyaml==5.1 +pyyaml==5.1.1 requests==2.18.4 -rsa==4.0 # via google-auth -s3transfer==0.1.13 # via boto3 +rsa==4.0 # via cybersource-rest-client-python, google-auth +s3transfer==0.2.1 # via boto3 +shellescape==3.4.1 # via crypto, cybersource-rest-client-python singledispatch==3.4.0.3 # via tornado -six==1.10.0 -snowflake-connector-python==1.7.9 +six==1.12.0 +snowflake-connector-python==1.8.6 stevedore==1.19.1 +testtools==2.3.0 # via cybersource-rest-client-python, fixtures, python-subunit tornado==4.5.3 +traceback2==1.4.0 # via cybersource-rest-client-python, testtools, unittest2 +typing==3.7.4 # via cybersource-rest-client-python ua-parser==0.3.6 +ujson==1.35 ; python_version > "2.7" +unittest2==1.1.0 # via testtools uritemplate==3.0.0 # via google-api-python-client urllib3==1.22 user-agents==0.3.2 vertica-python==0.6.11 wheel==0.30.0 -yarn-api-client==0.2.3 \ No newline at end of file +x509==0.1 # via cybersource-rest-client-python +yarn-api-client==0.2.3 +zope.interface==4.6.0 # via cybersource-rest-client-python, datetime + +# The following packages are considered to be unsafe in a requirements file: +# pip==19.2.1 # via cybersource-rest-client-python +# setuptools==41.0.1 # via cybersource-rest-client-python, google-cloud-core, jsonschema, protobuf, python-daemon, python-toolbox, zope.interface diff --git a/requirements/docs.txt b/requirements/docs.txt index baca202202..55c117facb 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -9,26 +9,25 @@ -e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi -e git+https://github.com/edx/pyinstrument.git@a35ff76df4c3d5ff9a2876d859303e33d895e78f#egg=pyinstrument alabaster==0.7.12 # via sphinx -ansible==1.4.5 +ansible==2.8.3 argparse==1.2.1 asn1crypto==0.24.0 -azure-common==1.1.18 +azure-common==1.1.23 azure-nspkg==3.0.2 -azure-storage-blob==1.5.0 -azure-storage-common==1.4.0 +azure-storage-blob==2.0.1 +azure-storage-common==2.0.0 azure-storage-nspkg==3.1.0 -babel==2.6.0 # via sphinx +babel==2.7.0 # via sphinx backports-abc==0.5 -bcrypt==3.1.6 -boto3==1.4.8 +boto3==1.9.198 boto==2.48.0 -botocore==1.8.50 -cachetools==3.1.0 -certifi==2019.3.9 -cffi==1.12.2 +botocore==1.12.198 +cachetools==3.1.1 +certifi==2019.6.16 +cffi==1.12.3 chardet==3.0.4 ciso8601==1.0.3 -cryptography==2.6.1 +cryptography==2.7 distlib==0.2.2 docutils==0.14 ecdsa==0.13 @@ -38,56 +37,55 @@ elasticsearch==1.7.0 enum34==1.1.6 filechunkio==1.8 future==0.17.1 -futures==3.2.0 +futures==3.3.0 ; python_version == "2.7" google-api-python-client==1.7.7 google-auth-httplib2==0.0.3 google-auth==1.6.3 google-cloud-bigquery==0.27.0 google-cloud-core==0.27.1 google-resumable-media==0.3.2 -googleapis-common-protos==1.5.9 +googleapis-common-protos==1.6.0 graphitesend==0.10.0 +gspread==3.1.0 html5lib==1.0b3 -httplib2==0.12.1 +httplib2==0.13.1 idna==2.6 -ijson==2.3 +ijson==2.4 imagesize==1.1.0 # via sphinx ipaddress==1.0.22 isoweek==1.3.3 -jinja2==2.8.1 +jinja2==2.10.1 jmespath==0.9.4 lockfile==0.12.2 markupsafe==1.1.1 numpy==1.11.3 -paramiko==2.4.2 paypalrestsdk==1.9.0 -pbr==5.1.3 -protobuf==3.7.1 -psycopg2==2.6.2 -pyasn1-modules==0.2.4 +pbr==5.4.1 +protobuf==3.9.0 +psycopg2==2.8.3 +pyasn1-modules==0.2.5 pyasn1==0.4.5 pycparser==2.19 pycrypto==2.6.1 -pycryptodomex==3.8.0 +pycryptodomex==3.8.2 pygeoip==0.3.2 -pygments==2.3.1 # via sphinx +pygments==2.4.2 # via sphinx pyjwt==1.7.1 -pymongo==3.7.2 -pynacl==1.3.0 +pymongo==3.8.0 pyopenssl==19.0.0 -python-cjson==1.1.0 -python-daemon==2.1.2 +python-cjson==1.2.1 ; python_version <= "2.7" +python-daemon==2.2.3 python-dateutil==2.6.1 python-gnupg==0.3.9 pytz==2017.3 -pyyaml==5.1 +pyyaml==5.1.1 requests==2.18.4 rsa==4.0 -s3transfer==0.1.13 +s3transfer==0.2.1 singledispatch==3.4.0.3 -six==1.10.0 -snowballstemmer==1.2.1 # via sphinx -snowflake-connector-python==1.7.9 +six==1.12.0 +snowballstemmer==1.9.0 # via sphinx +snowflake-connector-python==1.8.6 sphinx==1.5.1 stevedore==1.19.1 tornado==4.5.3 @@ -97,4 +95,7 @@ urllib3==1.22 user-agents==0.3.2 vertica-python==0.6.11 wheel==0.30.0 -yarn-api-client==0.2.3 \ No newline at end of file +yarn-api-client==0.2.3 + +# The following packages are considered to be unsafe in a requirements file: +# setuptools==41.0.1 # via google-cloud-core, protobuf, python-daemon diff --git a/requirements/extra.txt b/requirements/extra.txt index 43f6804127..2ee387709f 100644 --- a/requirements/extra.txt +++ b/requirements/extra.txt @@ -1,2 +1,10 @@ # Generic URLs are not yet supported by pip-compile. http://cdn.mysql.com/Downloads/Connector-Python/mysql-connector-python-1.2.2.zip # GPL v2 with FOSS License Exception + +# When pip-compile is run under python 3, it omits all packages with a python 2 +# condition. Re-add them here, pre-pinned. +python-cjson==1.1.0 ; python_version <= "2.7" + +# When pip-compile is run under python 2, it omits all packages with a python 3 +# condition. Re-add them here, pre-pinned. +ujson==1.35 ; python_version > "2.7" diff --git a/requirements/pip-tools.txt b/requirements/pip-tools.txt index 547630ffa4..fb165b7848 100644 --- a/requirements/pip-tools.txt +++ b/requirements/pip-tools.txt @@ -5,5 +5,5 @@ # make upgrade # click==7.0 # via pip-tools -pip-tools==3.7.0 +pip-tools==4.0.0 six==1.10.0 diff --git a/requirements/pip.txt b/requirements/pip.txt index 42d92b4bbb..893fe72454 100644 --- a/requirements/pip.txt +++ b/requirements/pip.txt @@ -1,2 +1,2 @@ -pip==9.0.1 +pip==19.1.1 setuptools==36.4.0 diff --git a/requirements/test.in b/requirements/test.in index ff6b7b828b..b0c3fe8547 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -23,7 +23,7 @@ pandas==0.13.0 # Stuff for quality -isort==4.2.15 +isort pycodestyle==2.3.1 pylint==1.6.4 diff --git a/requirements/test.txt b/requirements/test.txt index 269583e50b..a454103c90 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -8,31 +8,30 @@ -e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi -e git+https://github.com/edx/pyinstrument.git@a35ff76df4c3d5ff9a2876d859303e33d895e78f#egg=pyinstrument -ansible==1.4.5 +ansible==2.8.3 argparse==1.2.1 asn1crypto==0.24.0 astroid==1.4.9 # via pylint -azure-common==1.1.18 +azure-common==1.1.23 azure-nspkg==3.0.2 -azure-storage-blob==1.5.0 -azure-storage-common==1.4.0 +azure-storage-blob==2.0.1 +azure-storage-common==2.0.0 azure-storage-nspkg==3.1.0 backports-abc==0.5 -backports.functools-lru-cache==1.5 # via pylint -bcrypt==3.1.6 -boto3==1.4.8 +backports.functools-lru-cache==1.5 # via isort, pylint +boto3==1.9.198 boto==2.48.0 -botocore==1.8.50 -cachetools==3.1.0 -certifi==2019.3.9 -cffi==1.12.2 +botocore==1.12.198 +cachetools==3.1.1 +certifi==2019.6.16 +cffi==1.12.3 chardet==3.0.4 ciso8601==1.0.3 configparser==3.7.4 # via pylint coverage==4.3.1 -cryptography==2.6.1 +cryptography==2.7 ddt==1.1.1 -diff-cover==1.0.7 +diff-cover==2.3.0 distlib==0.2.2 docutils==0.14 ecdsa==0.13 @@ -44,28 +43,29 @@ filechunkio==1.8 freezegun==0.3.9 funcsigs==1.0.2 # via mock future==0.17.1 -futures==3.2.0 +futures==3.3.0 ; python_version == "2.7" google-api-python-client==1.7.7 google-auth-httplib2==0.0.3 google-auth==1.6.3 google-cloud-bigquery==0.27.0 google-cloud-core==0.27.1 google-resumable-media==0.3.2 -googleapis-common-protos==1.5.9 +googleapis-common-protos==1.6.0 graphitesend==0.10.0 +gspread==3.1.0 html5lib==1.0b3 -httplib2==0.12.1 +httplib2==0.13.1 httpretty==0.8.14 idna==2.6 -ijson==2.3 +ijson==2.4 inflect==2.1.0 # via jinja2-pluralize ipaddress==1.0.22 -isort==4.2.15 +isort==4.3.21 isoweek==1.3.3 jinja2-pluralize==0.3.0 # via diff-cover -jinja2==2.8.1 +jinja2==2.10.1 jmespath==0.9.4 -lazy-object-proxy==1.3.1 # via astroid +lazy-object-proxy==1.4.1 # via astroid lockfile==0.12.2 markupsafe==1.1.1 mccabe==0.6.1 # via pylint @@ -74,36 +74,34 @@ nose-ignore-docstring==0.2 nose==1.3.7 numpy==1.11.3 pandas==0.13.0 -paramiko==2.4.2 paypalrestsdk==1.9.0 -pbr==5.1.3 -protobuf==3.7.1 -psycopg2==2.6.2 -pyasn1-modules==0.2.4 +pbr==5.4.1 +protobuf==3.9.0 +psycopg2==2.8.3 +pyasn1-modules==0.2.5 pyasn1==0.4.5 pycodestyle==2.3.1 pycparser==2.19 pycrypto==2.6.1 -pycryptodomex==3.8.0 +pycryptodomex==3.8.2 pygeoip==0.3.2 -pygments==2.3.1 # via diff-cover +pygments==2.4.2 # via diff-cover pyjwt==1.7.1 pylint==1.6.4 -pymongo==3.7.2 -pynacl==1.3.0 +pymongo==3.8.0 pyopenssl==19.0.0 -python-cjson==1.1.0 -python-daemon==2.1.2 +python-cjson==1.2.1 ; python_version <= "2.7" +python-daemon==2.2.3 python-dateutil==2.6.1 python-gnupg==0.3.9 pytz==2017.3 -pyyaml==5.1 +pyyaml==5.1.1 requests==2.18.4 rsa==4.0 -s3transfer==0.1.13 +s3transfer==0.2.1 singledispatch==3.4.0.3 -six==1.10.0 -snowflake-connector-python==1.7.9 +six==1.12.0 +snowflake-connector-python==1.8.6 stevedore==1.19.1 tornado==4.5.3 ua-parser==0.3.6 @@ -112,5 +110,8 @@ urllib3==1.22 user-agents==0.3.2 vertica-python==0.6.11 wheel==0.30.0 -wrapt==1.11.1 # via astroid -yarn-api-client==0.2.3 \ No newline at end of file +wrapt==1.11.2 # via astroid +yarn-api-client==0.2.3 + +# The following packages are considered to be unsafe in a requirements file: +# setuptools==41.0.1 # via google-cloud-core, protobuf, python-daemon diff --git a/share/task.yml b/share/task.yml index 748ef890f1..705b44d1ac 100644 --- a/share/task.yml +++ b/share/task.yml @@ -3,7 +3,7 @@ - name: Configure luigi hosts: "{{ name }}" gather_facts: True - sudo: True + become: True vars: write_luigi_config: "yes" common_debian_variants: @@ -27,7 +27,7 @@ - working_dir: "{{ root_data_dir }}/{{ uuid }}" - log_dir: "{{ root_log_dir }}/{{ uuid}}" - working_venv_dir: "{{ working_dir }}/venv" - - virtualenv_python: "/usr/bin/python2.7" + - virtualenv_python: "/usr/bin/python" - virtualenv_extra_args: '' - git_servers: # Analytics repositories are currently hosted on github. @@ -60,93 +60,100 @@ register: home_output - name: set the home variable - set_fact: home="{{ home_output.stdout }}" + set_fact: "home={{ home_output.stdout }}" - name: known_hosts file exists command: touch {{ home }}/.ssh/known_hosts creates={{ home }}/.ssh/known_hosts - name: git server in known_hosts file - lineinfile: > - dest={{ home }}/.ssh/known_hosts - regexp=^{{item.hostname}} - line="{{ item.hostname }} {{ item.public_key }}" - with_items: git_servers + lineinfile: + dest: "{{ home }}/.ssh/known_hosts" + regexp: "^{{item.hostname}}" + line: "{{ item.hostname }} {{ item.public_key }}" + loop: "{{ git_servers }}" - name: root directories created file: path={{ item }} state=directory owner=root group=root - sudo: True + become: True with_items: - "{{ root_data_dir }}" - "{{ root_log_dir }}" - name: working directories created file: path={{ item }} state=directory mode=777 owner={{ ansible_ssh_user }} group={{ ansible_ssh_user }} - sudo: True + become: True with_items: - "{{ working_dir }}" - "{{ working_venv_dir }}" - name: log directory created file: path={{ item }} state=directory mode=777 owner={{ ansible_ssh_user }} group={{ ansible_ssh_user }} - sudo: True + become: True with_items: - "{{ log_dir }}" - name: make sure git is available on the Debian server command: apt-get install -q -y git - sudo: True + become: True when: ansible_distribution in common_debian_variants - name: make sure git is available on the RHEL server yum: pkg=git state=present - sudo: True + become: True when: ansible_distribution in common_redhat_variants - name: repositories checked out - git: > - repo={{ item.url }} - dest={{ working_dir }}/{{ item.dir_name }} - version=master - with_items: repos + git: + repo: "{{ item.url }}" + dest: "{{ working_dir }}/{{ item.dir_name }}" + version: master + force: yes + loop: "{{ repos }}" - name: branches fetched - command: git fetch --all chdir={{ working_dir }}/{{ item.dir_name }} - with_items: repos + command: "git fetch --all chdir={{ working_dir }}/{{ item.dir_name }}" + loop: "{{ repos }}" - name: origin/HEAD updated - command: git remote set-head origin --auto chdir={{ working_dir }}/{{ item.dir_name }} - with_items: repos + command: "git remote set-head origin --auto chdir={{ working_dir }}/{{ item.dir_name }}" + loop: "{{ repos }}" - name: branches checked out - command: git checkout {{ item.branch }} chdir={{ working_dir }}/{{ item.dir_name }} - with_items: repos + command: "git checkout {{ item.branch }} chdir={{ working_dir }}/{{ item.dir_name }}" + loop: "{{ repos }}" - name: ensure system packages are installed command: make system-requirements chdir={{ working_repo_dir }} - sudo: True + become: True - name: bootstrap pip on Debian command: apt-get install -q -y python-pip - sudo: True + become: True when: ansible_distribution in common_debian_variants - name: bootstrap pip on RHEL command: yum install -q -y python-pip - sudo: True + become: True when: ansible_distribution in common_redhat_variants - name: virtualenv installed - pip: name=virtualenv version=1.10.1 - sudo: True + pip: name=virtualenv version=16.6.1 + become: True - - name: check if virtualenv already created - stat: path={{ working_venv_dir }}/bin/activate - register: virtualenv_created + #- name: check if virtualenv already created + # stat: + # path: "{{ working_venv_dir }}/bin/activate" + # register: virtualenv_created + + # This is required for running unit tests inside docker, at least until the + # default image comes with a py3 compatible virtualenv. + - name: virtualenv deleted + shell: "rm -rf {{ working_venv_dir }}/*" - name: virtualenv created command: > - virtualenv --python={{ virtualenv_python }} {{ virtualenv_extra_args }} {{ working_venv_dir }} - when: not virtualenv_created.stat.exists + virtualenv --clear --python={{ virtualenv_python }} {{ virtualenv_extra_args }} {{ working_venv_dir }} + #when: not virtualenv_created.stat.exists - name: update pip command: > @@ -155,28 +162,32 @@ - name: virtualenv initialized on Debian shell: > . {{ working_venv_dir }}/bin/activate && make install - chdir={{ working_repo_dir }} + args: + chdir: "{{ working_repo_dir }}" when: ansible_distribution in common_debian_variants - name: virtualenv initialized on RHEL shell: > . {{ working_venv_dir }}/bin/activate && make install - chdir={{ working_repo_dir }} + args: + chdir: "{{ working_repo_dir }}" when: ansible_distribution in common_redhat_variants - name: additional packages installed on Debian shell: > . {{ working_venv_dir }}/bin/activate && pip install {{ item }} - chdir={{ working_repo_dir }} + args: + chdir: "{{ working_repo_dir }}" when: ansible_distribution in common_debian_variants - with_items: packages + loop: "{{ packages }}" - name: additional packages installed on RHEL shell: > . {{ working_venv_dir }}/bin/activate && pip install {{ item }} - chdir={{ working_repo_dir }} + args: + chdir: "{{ working_repo_dir }}" when: ansible_distribution in common_redhat_variants - with_items: packages + loop: "{{ packages }}" - name: logging configured template: src=logging.cfg.j2 dest={{ working_repo_dir }}/logging.cfg