From 097b82621d89fb22a5d0a2a1a3ea5bc112b0eaf6 Mon Sep 17 00:00:00 2001 From: rao-abdul-mannan Date: Wed, 24 Jan 2018 23:42:15 +0500 Subject: [PATCH 01/14] convert TotalEventsDailyTask task to spark #486 --- edx/analytics/tasks/common/pathutil.py | 19 ++++++ edx/analytics/tasks/monitor/overall_events.py | 58 ++++++++++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/edx/analytics/tasks/common/pathutil.py b/edx/analytics/tasks/common/pathutil.py index ee6cacd6eb..ae56dc01b3 100644 --- a/edx/analytics/tasks/common/pathutil.py +++ b/edx/analytics/tasks/common/pathutil.py @@ -309,3 +309,22 @@ def get_map_input_file(self): log.warn('mapreduce_map_input_file not defined in os.environ, unable to determine input file path') self.incr_counter('Event', 'Missing map_input_file', 1) return '' + + +class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): + """ + Extract events corresponding to a specified time interval. + """ + path_targets = None + + def __init__(self, *args, **kwargs): + super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs) + self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member + self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member + path_targets = PathSelectionByDateIntervalTask( + source=self.source, + interval=self.interval, + pattern=self.pattern, + date_pattern=self.date_pattern, + ).output() + self.path_targets = [task.path for task in path_targets] diff --git a/edx/analytics/tasks/monitor/overall_events.py b/edx/analytics/tasks/monitor/overall_events.py index 58f4505974..5bb5c3eb9d 100644 --- a/edx/analytics/tasks/monitor/overall_events.py +++ b/edx/analytics/tasks/monitor/overall_events.py @@ -5,8 +5,9 @@ import luigi from edx.analytics.tasks.common.mapreduce import MapReduceJobTask -from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin +from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin, EventLogSelectionMixinSpark from edx.analytics.tasks.util.url import get_target_from_url +from luigi.contrib.spark import PySparkTask log = logging.getLogger(__name__) @@ -34,3 +35,58 @@ def reducer(self, key, values): def output(self): return get_target_from_url(self.output_root) + + +class SparkTotalEventsDailyTask(EventLogSelectionMixinSpark, PySparkTask): + """Produce a dataset for total events within a given time period.""" + + driver_memory = '2g' + executor_memory = '3g' + + output_root = luigi.Parameter() + + def __init__(self, *args, **kwargs): + super(SparkTotalEventsDailyTask, self).__init__(*args, **kwargs) + + # TODO: rename this method to output after testing is complete + def output_dir(self): + return get_target_from_url(self.output_root) + + def main(self, sc, *args): + from pyspark.sql import SparkSession + from pyspark.sql.types import * + from pyspark.sql.functions import to_date, udf, struct, date_format + spark = SparkSession.builder.getOrCreate() + event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True) + module_schema = StructType().add("display_name", StringType(), True) \ + .add("original_usage_key", StringType(), True) \ + .add("original_usage_version", StringType(), True) \ + .add("usage_key", StringType(), True) + context_schema = StructType().add("command", StringType(), True) \ + .add("course_id", StringType(), True) \ + .add("module", module_schema) \ + .add("org_id", StringType(), True) \ + .add("path", StringType(), True) \ + .add("user_id", StringType(), True) + + event_log_schema = StructType() \ + .add("username", StringType(), True) \ + .add("event_type", StringType(), True) \ + .add("ip", StringType(), True) \ + .add("agent", StringType(), True) \ + .add("host", StringType(), True) \ + .add("referer", StringType(), True) \ + .add("accept_language", StringType(), True) \ + .add("event", event_schema) \ + .add("event_source", StringType(), True) \ + .add("context", context_schema) \ + .add("time", StringType(), True) \ + .add("name", StringType(), True) \ + .add("page", StringType(), True) \ + .add("session", StringType(), True) + + df = spark.read.format('json').load(self.path_targets, schema=event_log_schema) + df = df.withColumn('event_date', date_format(to_date(df['time']), 'yyyy-MM-dd')) + df = df.filter(df['event_date'] == self.lower_bound_date_string).groupBy('event_date').count() + df.repartition(1).write.csv(self.output_dir().path, mode='overwrite', sep='\t') + # df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path) From d615749fefbd0c261fe529a310f0e1fe69ff9ca0 Mon Sep 17 00:00:00 2001 From: rao-abdul-mannan Date: Thu, 25 Jan 2018 00:11:30 +0500 Subject: [PATCH 02/14] convert user activity task to spark --- edx/analytics/tasks/common/pathutil.py | 19 - edx/analytics/tasks/common/spark.py | 326 ++++++++++++++++++ .../insights/tests/test_user_activity.py | 45 +-- edx/analytics/tasks/insights/user_activity.py | 232 ++++++++++++- edx/analytics/tasks/launchers/remote.py | 2 +- edx/analytics/tasks/monitor/overall_events.py | 62 +--- .../tasks/monitor/total_events_report.py | 17 +- edx/analytics/tasks/util/constants.py | 7 + edx/analytics/tasks/util/spark_util.py | 78 +++++ 9 files changed, 678 insertions(+), 110 deletions(-) create mode 100644 edx/analytics/tasks/common/spark.py create mode 100644 edx/analytics/tasks/util/constants.py create mode 100644 edx/analytics/tasks/util/spark_util.py diff --git a/edx/analytics/tasks/common/pathutil.py b/edx/analytics/tasks/common/pathutil.py index ae56dc01b3..ee6cacd6eb 100644 --- a/edx/analytics/tasks/common/pathutil.py +++ b/edx/analytics/tasks/common/pathutil.py @@ -309,22 +309,3 @@ def get_map_input_file(self): log.warn('mapreduce_map_input_file not defined in os.environ, unable to determine input file path') self.incr_counter('Event', 'Missing map_input_file', 1) return '' - - -class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): - """ - Extract events corresponding to a specified time interval. - """ - path_targets = None - - def __init__(self, *args, **kwargs): - super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs) - self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member - self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member - path_targets = PathSelectionByDateIntervalTask( - source=self.source, - interval=self.interval, - pattern=self.pattern, - date_pattern=self.date_pattern, - ).output() - self.path_targets = [task.path for task in path_targets] diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py new file mode 100644 index 0000000000..fa73144e0d --- /dev/null +++ b/edx/analytics/tasks/common/spark.py @@ -0,0 +1,326 @@ +import ast +import json +import os +import tempfile +import zipfile + +import luigi.configuration +from luigi.contrib.spark import PySparkTask + +from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask +from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin + +_file_path_to_package_meta_path = {} + + +def get_package_metadata_paths(): + """ + List of package metadata to be loaded on EMR cluster + """ + from distlib.database import DistributionPath + + if len(_file_path_to_package_meta_path) > 0: + return _file_path_to_package_meta_path + + dist_path = DistributionPath(include_egg=True) + for distribution in dist_path.get_distributions(): + metadata_path = distribution.path + for installed_file_path, _hash, _size in distribution.list_installed_files(): + absolute_installed_file_path = installed_file_path + if not os.path.isabs(installed_file_path): + absolute_installed_file_path = os.path.join(os.path.dirname(metadata_path), installed_file_path) + normalized_file_path = os.path.realpath(absolute_installed_file_path) + _file_path_to_package_meta_path[normalized_file_path] = metadata_path + + return _file_path_to_package_meta_path + + +def dereference(f): + if os.path.islink(f): + # by joining with the dirname we are certain to get the absolute path + return dereference(os.path.join(os.path.dirname(f), os.readlink(f))) + else: + return f + + +def create_packages_archive(packages, archive_dir_path): + """ + Create a zip archive for all the packages listed in packages and returns the list of zip file location. + """ + import zipfile + archives_list = [] + package_metadata_paths = get_package_metadata_paths() + metadata_to_add = dict() + + package_zip_path = os.path.join(archive_dir_path, 'packages.zip') + package_zip = zipfile.ZipFile(package_zip_path, "w", compression=zipfile.ZIP_DEFLATED) + archives_list.append(package_zip_path) + + def add(src, dst, package_name): + # Ensure any entry points and other egg-info metadata is also transmitted along with + # this file. If it is associated with any egg-info directories, ship them too. + metadata_path = package_metadata_paths.get(os.path.realpath(src)) + if metadata_path: + metadata_to_add[package_name] = metadata_path + + package_zip.write(src, dst) + + def add_files_for_package(sub_package_path, root_package_path, root_package_name, package_name): + for root, dirs, files in os.walk(sub_package_path): + if '.svn' in dirs: + dirs.remove('.svn') + for f in files: + if not f.endswith(".pyc") and not f.startswith("."): + add(dereference(root + "/" + f), + root.replace(root_package_path, root_package_name) + "/" + f, + package_name) + + for package in packages: + # Archive each package + if not getattr(package, "__path__", None) and '.' in package.__name__: + package = __import__(package.__name__.rpartition('.')[0], None, None, 'non_empty') + + n = package.__name__.replace(".", "/") + + # Check length of path, because the attribute may exist and be an empty list. + if len(getattr(package, "__path__", [])) > 0: + # TODO: (BUG) picking only the first path does not + # properly deal with namespaced packages in different + # directories + p = package.__path__[0] + + if p.endswith('.egg') and os.path.isfile(p): + raise 'Not going to archive egg files!!!' + # Add the entire egg file + # p = p[:p.find('.egg') + 4] + # add(dereference(p), os.path.basename(p)) + + else: + # include __init__ files from parent projects + root = [] + for parent in package.__name__.split('.')[0:-1]: + root.append(parent) + module_name = '.'.join(root) + directory = '/'.join(root) + + add(dereference(__import__(module_name, None, None, 'non_empty').__path__[0] + "/__init__.py"), + directory + "/__init__.py", + package.__name__) + + add_files_for_package(p, p, n, package.__name__) + + else: + f = package.__file__ + if f.endswith("pyc"): + f = f[:-3] + "py" + if n.find(".") == -1: + add(dereference(f), os.path.basename(f), package.__name__) + else: + add(dereference(f), n + ".py", package.__name__) + + # include metadata in the same zip file + metadata_path = metadata_to_add.get(package.__name__) + if metadata_path is not None: + add_files_for_package(metadata_path, metadata_path, os.path.basename(metadata_path), package.__name__) + + return archives_list + + +class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): + """ + Extract events corresponding to a specified time interval. + """ + path_targets = None + + def __init__(self, *args, **kwargs): + """ + Call path selection task to get list of log files matching the pattern + """ + super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs) + self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member + self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member + + def get_log_schema(self): + """ + Get spark based schema for processing event logs + :return: Spark schema + """ + from pyspark.sql.types import StructType, StringType + event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True) + module_schema = StructType().add("display_name", StringType(), True) \ + .add("original_usage_key", StringType(), True) \ + .add("original_usage_version", StringType(), True) \ + .add("usage_key", StringType(), True) + context_schema = StructType().add("command", StringType(), True) \ + .add("course_id", StringType(), True) \ + .add("module", module_schema) \ + .add("org_id", StringType(), True) \ + .add("path", StringType(), True) \ + .add("user_id", StringType(), True) + + event_log_schema = StructType() \ + .add("username", StringType(), True) \ + .add("event_type", StringType(), True) \ + .add("ip", StringType(), True) \ + .add("agent", StringType(), True) \ + .add("host", StringType(), True) \ + .add("referer", StringType(), True) \ + .add("accept_language", StringType(), True) \ + .add("event", event_schema) \ + .add("event_source", StringType(), True) \ + .add("context", context_schema) \ + .add("time", StringType(), True) \ + .add("name", StringType(), True) \ + .add("page", StringType(), True) \ + .add("session", StringType(), True) + + return event_log_schema + + def get_event_log_dataframe(self, spark, *args, **kwargs): + from pyspark.sql.functions import to_date, udf, struct, date_format + path_targets = PathSelectionByDateIntervalTask( + source=self.source, + interval=self.interval, + pattern=self.pattern, + date_pattern=self.date_pattern, + ).output() + self.path_targets = [task.path for task in path_targets] + dataframe = spark.read.format('json').load(self.path_targets, schema=self.get_log_schema()) + dataframe = dataframe.filter(dataframe['time'].isNotNull()) \ + .withColumn('event_date', date_format(to_date(dataframe['time']), 'yyyy-MM-dd')) + dataframe = dataframe.filter( + (dataframe['event_date'] >= self.lower_bound_date_string) & + (dataframe['event_date'] < self.upper_bound_date_string) + ) + return dataframe + + +class SparkJobTask(OverwriteOutputMixin, PySparkTask): + """ + Wrapper for spark task + """ + + _spark = None + _spark_context = None + _sql_context = None + _hive_context = None + _tmp_dir = None + + driver_memory = luigi.Parameter( + config_path={'section': 'spark', 'name': 'driver-memory'}, + description='Memory for spark driver', + significant=False, + ) + executor_memory = luigi.Parameter( + config_path={'section': 'spark', 'name': 'executor-memory'}, + description='Memory for each executor', + significant=False, + ) + executor_cores = luigi.Parameter( + config_path={'section': 'spark', 'name': 'executor-cores'}, + description='No. of cores for each executor', + significant=False, + ) + always_log_stderr = False # log stderr if spark fails, True for verbose log + + def init_spark(self, sc): + """ + Initialize spark, sql and hive context + :param sc: Spark context + """ + from pyspark.sql import SparkSession, SQLContext, HiveContext + self._sql_context = SQLContext(sc) + self._spark_context = sc + self._spark = SparkSession.builder.getOrCreate() + self._hive_context = HiveContext(sc) + + def spark_job(self): + """ + Spark code for the job + """ + raise NotImplementedError + + def get_config_from_args(self, key, *args, **kwargs): + """ + Returns `value` of `key` after parsing string argument + """ + default_value = kwargs.get('default_value', None) + str_arg = args[0] + config_dict = ast.literal_eval(str_arg) + value = config_dict.get(key, default_value) + return value + + def _load_internal_dependency_on_cluster(self, *args): + """ + creates a zip of package and loads it on spark worker nodes + + Loading via luigi configuration does not work as it creates a tar file whereas spark does not load tar files + """ + + # import packages to be loaded on cluster + import edx + import luigi + import opaque_keys + import stevedore + import bson + import ccx_keys + import cjson + import boto + import filechunkio + import ciso8601 + import chardet + import urllib3 + import certifi + import idna + import requests + import six + + dependencies_list = [] + # get cluster dependencies from *args + cluster_dependencies = self.get_config_from_args('cluster_dependencies', *args, default_value=None) + if cluster_dependencies is not None: + cluster_dependencies = json.loads(cluster_dependencies) + if isinstance(cluster_dependencies, list): + dependencies_list += cluster_dependencies + + packages = [edx, luigi, opaque_keys, stevedore, bson, ccx_keys, cjson, boto, filechunkio, ciso8601, chardet, + urllib3, certifi, idna, requests, six] + self._tmp_dir = tempfile.mkdtemp() + dependencies_list += create_packages_archive(packages, self._tmp_dir) + if len(dependencies_list) > 0: + for file in dependencies_list: + self._spark_context.addPyFile(file) + + def get_luigi_configuration(self): + """ + Return luigi configuration as dict for spark task + + luigi configuration cannot be retrieved directly from luigi's get_config method inside spark task + """ + + return None + + def app_options(self): + """ + List of options that needs to be passed to spark task + """ + options = {} + task_config = self.get_luigi_configuration() # load task dependencies first if any + if isinstance(task_config, dict): + options = task_config + configuration = luigi.configuration.get_config() + cluster_dependencies = configuration.get('spark', 'edx_egg_files', None) # spark worker nodes dependency + if cluster_dependencies is not None: + options['cluster_dependencies'] = cluster_dependencies + return [options] + + def _clean(self): + """Do any cleanup after job here""" + import shutil + shutil.rmtree(self._tmp_dir) + + def main(self, sc, *args): + self.init_spark(sc) # initialize spark contexts + self._load_internal_dependency_on_cluster(*args) # load packages on EMR cluster for spark worker nodes + self.spark_job(*args) # execute spark job + self._clean() # cleanup after spark job diff --git a/edx/analytics/tasks/insights/tests/test_user_activity.py b/edx/analytics/tasks/insights/tests/test_user_activity.py index f33d92582e..97e16cfaf3 100644 --- a/edx/analytics/tasks/insights/tests/test_user_activity.py +++ b/edx/analytics/tasks/insights/tests/test_user_activity.py @@ -12,8 +12,9 @@ from edx.analytics.tasks.common.tests.map_reduce_mixins import MapperTestMixin, ReducerTestMixin from edx.analytics.tasks.insights.user_activity import ( - ACTIVE_LABEL, PLAY_VIDEO_LABEL, POST_FORUM_LABEL, PROBLEM_LABEL, InsertToMysqlCourseActivityTask, UserActivityTask + InsertToMysqlCourseActivityTask, UserActivityTask ) +from edx.analytics.tasks.util.constants import PredicateLabels from edx.analytics.tasks.util.tests.opaque_key_mixins import InitializeLegacyKeysMixin, InitializeOpaqueKeysMixin @@ -84,21 +85,21 @@ def test_illegal_course_id(self): def test_good_dummy_event(self): line = self.create_event_log_line() event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, ACTIVE_LABEL)),) + expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)),) self.assertEquals(event, expected) def test_play_video_event(self): line = self.create_event_log_line(event_source='browser', event_type='play_video') event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PLAY_VIDEO_LABEL))) + expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL))) self.assertEquals(event, expected) def test_problem_event(self): line = self.create_event_log_line(event_source='server', event_type='problem_check') event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PROBLEM_LABEL))) + expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.PROBLEM_LABEL))) self.assertEquals(event, expected) @data(('edx.forum.thread.created', True), ('edx.forum.response.created', True), ('edx.forum.comment.created', True), @@ -108,11 +109,11 @@ def test_post_forum_event(self, event_type, is_labeled_forum): line = self.create_event_log_line(event_source='server', event_type=event_type) event = tuple(self.task.mapper(line)) if is_labeled_forum: - expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, POST_FORUM_LABEL))) + expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.POST_FORUM_LABEL))) else: # The voted event is not a "discussion activity" and thus does not get the POST_FORUM_LABEL - expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, ACTIVE_LABEL)),) + expected = ((self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)),) self.assertEquals(event, expected) def test_exclusion_of_events_by_source(self): @@ -147,13 +148,13 @@ def test_multiple(self): outputs.append(output) expected = ( - (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PLAY_VIDEO_LABEL)), - (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PLAY_VIDEO_LABEL)), - ('2013-12-24', (str(self.user_id), self.encoded_course_id, '2013-12-24', ACTIVE_LABEL)), - ('2013-12-24', (str(self.user_id), self.encoded_course_id, '2013-12-24', PROBLEM_LABEL)), - ('2013-12-16', (str(self.user_id), self.encoded_course_id, '2013-12-16', ACTIVE_LABEL)), + (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL)), + (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (str(self.user_id), self.encoded_course_id, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL)), + ('2013-12-24', (str(self.user_id), self.encoded_course_id, '2013-12-24', PredicateLabels.ACTIVE_LABEL)), + ('2013-12-24', (str(self.user_id), self.encoded_course_id, '2013-12-24', PredicateLabels.PROBLEM_LABEL)), + ('2013-12-16', (str(self.user_id), self.encoded_course_id, '2013-12-16', PredicateLabels.ACTIVE_LABEL)), ) self.assertItemsEqual(outputs, expected) @@ -176,10 +177,10 @@ def setUp(self): def test_multiple(self): values = ( - (self.user_id, self.encoded_course_id, '2013-12-01', ACTIVE_LABEL), - (self.user_id, self.encoded_course_id, '2013-12-01', ACTIVE_LABEL), - (self.user_id, self.encoded_course_id, '2013-12-01', PLAY_VIDEO_LABEL), - (self.user_id, self.encoded_course_id, '2013-12-01', PLAY_VIDEO_LABEL), + (self.user_id, self.encoded_course_id, '2013-12-01', PredicateLabels.ACTIVE_LABEL), + (self.user_id, self.encoded_course_id, '2013-12-01', PredicateLabels.ACTIVE_LABEL), + (self.user_id, self.encoded_course_id, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL), + (self.user_id, self.encoded_course_id, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL), ) mock_output_file = Mock() @@ -187,9 +188,9 @@ def test_multiple(self): self.task.multi_output_reducer('2013-12-01', values, mock_output_file) self.assertEquals(len(mock_output_file.write.mock_calls), 4) - expected_string = '\t'.join((self.user_id, self.encoded_course_id, '2013-12-01', ACTIVE_LABEL, '2')) + expected_string = '\t'.join((self.user_id, self.encoded_course_id, '2013-12-01', PredicateLabels.ACTIVE_LABEL, '2')) self.assertIn(call(expected_string), mock_output_file.write.mock_calls) - expected_string = '\t'.join((self.user_id, self.encoded_course_id, '2013-12-01', PLAY_VIDEO_LABEL, '2')) + expected_string = '\t'.join((self.user_id, self.encoded_course_id, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL, '2')) self.assertIn(call(expected_string), mock_output_file.write.mock_calls) diff --git a/edx/analytics/tasks/insights/user_activity.py b/edx/analytics/tasks/insights/user_activity.py index 507b9f3973..e15cad7212 100644 --- a/edx/analytics/tasks/insights/user_activity.py +++ b/edx/analytics/tasks/insights/user_activity.py @@ -5,13 +5,17 @@ from collections import Counter import luigi +import luigi.configuration import luigi.date_interval import edx.analytics.tasks.util.eventlog as eventlog +import edx.analytics.tasks.util.opaque_key_util as opaque_key_util from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin, MultiOutputMapReduceJobTask from edx.analytics.tasks.common.mysql_load import MysqlInsertTask from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, EventLogSelectionMixin +from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask from edx.analytics.tasks.insights.calendar_task import CalendarTableTask +from edx.analytics.tasks.util.constants import PredicateLabels from edx.analytics.tasks.util.decorators import workflow_entry_point from edx.analytics.tasks.util.hive import BareHiveTableTask, HivePartitionTask, WarehouseMixin, hive_database_name from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin @@ -20,11 +24,7 @@ log = logging.getLogger(__name__) -ACTIVE_LABEL = "ACTIVE" -PROBLEM_LABEL = "ATTEMPTED_PROBLEM" -PLAY_VIDEO_LABEL = "PLAYED_VIDEO" -POST_FORUM_LABEL = "POSTED_FORUM" - +logging.getLogger('boto').setLevel(logging.INFO) class UserActivityTask(OverwriteOutputMixin, WarehouseMixin, EventLogSelectionMixin, MultiOutputMapReduceJobTask): """ @@ -75,18 +75,18 @@ def get_predicate_labels(self, event): if event_type.startswith('edx.course.enrollment.'): return [] - labels = [ACTIVE_LABEL] + labels = [PredicateLabels.ACTIVE_LABEL] if event_source == 'server': if event_type == 'problem_check': - labels.append(PROBLEM_LABEL) + labels.append(PredicateLabels.PROBLEM_LABEL) if event_type.startswith('edx.forum.') and event_type.endswith('.created'): - labels.append(POST_FORUM_LABEL) + labels.append(PredicateLabels.POST_FORUM_LABEL) if event_source in ('browser', 'mobile'): if event_type == 'play_video': - labels.append(PLAY_VIDEO_LABEL) + labels.append(PredicateLabels.PLAY_VIDEO_LABEL) return labels @@ -146,6 +146,83 @@ def run(self): return super(UserActivityTask, self).run() +class UserActivityTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask): + """ + UserActivityTask converted to spark + """ + + output_root = luigi.Parameter() + marker = luigi.Parameter( + config_path={'section': 'map-reduce', 'name': 'marker'}, + significant=False, + description='A URL location to a directory where a marker file will be written on task completion.', + ) + + def output_dir(self): + """ + Output directory for spark task + """ + return get_target_from_url(self.output_root) + + def output(self): + """ + Marker output path + + There were 2 approaches to verify output from spark multi output task: + 1) verify partitions for all dates in the interval + 2) create marker in separate dir just like hadoop multi-mapreduce task + Former approach can fail in cases, when there is no data for some dates as spark will not generate empty + partitions. + Later approach is more consistent. + """ + marker_url = url_path_join(self.marker, str(hash(self))) + return get_target_from_url(marker_url, marker=True) + + def output_paths(self): + """ + Output partition paths + """ + return map( + lambda date: get_target_from_url( + url_path_join(self.output_root, 'dt={}'.format(date.isoformat())) + ), + self.interval + ) + + def on_success(self): # pragma: no cover + """Overload the success method to touch the _SUCCESS file. Any class that uses a separate Marker file from the + data file will need to override the base on_success() call to create this marker.""" + self.output().touch_marker() + + def run(self): + self.remove_output_on_overwrite() + removed_partitions = [target.remove() for target in self.output_paths() if target.exists()] + super(UserActivityTaskSpark, self).run() + + def spark_job(self, *args): + from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id + from pyspark.sql.functions import udf, struct, split, explode, lit + from pyspark.sql.types import ArrayType, StringType + df = self.get_event_log_dataframe(self._spark) + # register udfs + get_labels = udf(get_event_predicate_labels, StringType()) + get_courseid = udf(get_course_id, StringType()) + df = df.filter( + (df['event_source'] != 'task') & + ~ df['event_type'].startswith('edx.course.enrollment.') & + (df['context.user_id'] != '') + ) + # passing complete row to UDF + df = df.withColumn('all_labels', get_labels(df['event_type'], df['event_source'])) \ + .withColumn('course_id', get_courseid(df['context'])) + df = df.filter(df['course_id'] != '') # remove rows with empty course_id + df = df.withColumn('label', explode(split(df['all_labels'], ','))) + result = df.select('context.user_id', 'course_id', 'event_date', 'label') \ + .groupBy('user_id', 'course_id', 'event_date', 'label').count() + result = result.withColumn('dt', lit(result['event_date'])) # generate extra column for partitioning + result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') + + class UserActivityDownstreamMixin(WarehouseMixin, EventLogSelectionDownstreamMixin, MapReduceJobTaskMixin): """All parameters needed to run the UserActivityTableTask task.""" @@ -203,6 +280,138 @@ def columns(self): ] +class CourseActivityPartitionTaskSpark(WeeklyIntervalMixin, UserActivityDownstreamMixin, SparkJobTask): + """ + Spark equivalent of CourseActivityPartitionTask + """ + + def run(self): + self.remove_output_on_overwrite() + super(CourseActivityPartitionTaskSpark, self).run() + + def output(self): + return get_target_from_url(self.hive_partition_path('course_activity', self.end_date.isoformat())) + + def get_luigi_configuration(self): + options = {} + config = luigi.configuration.get_config() + options['calendar_interval'] = config.get('calendar', 'interval', '') + return options + + def requires(self): + required_tasks = [ + CalendarTableTask( + warehouse_path=self.warehouse_path, + ) + ] + # bypassing UserActivityTableTask as we're not going to use hive with spark + if self.overwrite_n_days > 0: + overwrite_from_date = self.end_date - datetime.timedelta(days=self.overwrite_n_days) + overwrite_interval = luigi.date_interval.Custom(overwrite_from_date, self.end_date) + required_tasks.append( + UserActivityTaskSpark( + interval=overwrite_interval, + warehouse_path=self.warehouse_path, + output_root=self.user_activity_hive_table_path(), + overwrite=True, + ) + ) + yield required_tasks + + def user_activity_hive_table_path(self, *args): + return url_path_join( + self.warehouse_path, + 'user_activity_by_user' + ) + + def calendar_hive_table_path(self, *args): + calendar_interval = self.get_config_from_args('calendar_interval', *args, default_value='') + # spark returns error when reading data from directories with different partition name at the same level + # so hardcoding it to read from date_interval partition + return url_path_join( + self.warehouse_path, + 'calendar', + 'date_interval={}'.format(calendar_interval) + ) + + def get_user_activity_table_schema(self): + from pyspark.sql.types import StructType, StringType + schema = StructType().add("user_id", StringType(), True) \ + .add("course_id", StringType(), True) \ + .add("date", StringType(), True) \ + .add("category", StringType(), True) \ + .add("count", StringType(), True) \ + .add("dt", StringType(), True) + return schema + + def get_calendar_table_schema(self): + from pyspark.sql.types import StructType, StringType + schema = StructType().add("date", StringType(), True) \ + .add("year", StringType(), True) \ + .add("month", StringType(), True) \ + .add("day", StringType(), True) \ + .add("iso_weekofyear", StringType(), True) \ + .add("iso_week_start", StringType(), True) \ + .add("iso_week_end", StringType(), True) \ + .add("iso_weekday", StringType(), True) \ + .add("date_interval", StringType(), True) + return schema + + def spark_job(self, *args): + user_activity_df = self._spark.read.csv( + self.user_activity_hive_table_path(*args), + sep='\t', + schema=self.get_user_activity_table_schema() + ) + calendar_df = self._spark.read.csv( + self.calendar_hive_table_path(*args), + sep='\t', + schema=self.get_calendar_table_schema() + ) + user_activity_df.createOrReplaceTempView('user_activity_by_user') + calendar_df.createOrReplaceTempView('calendar') + query = """ + SELECT + act.course_id as course_id, + CONCAT(cal.iso_week_start, " 00:00:00") as interval_start, + CONCAT(cal.iso_week_end, " 00:00:00") as interval_end, + act.category as label, + COUNT (DISTINCT user_id) as count + FROM user_activity_by_user act + JOIN calendar cal + ON act.date = cal.date AND act.dt >= "{interval_start}" AND act.dt < "{interval_end}" + WHERE + cal.date >= "{interval_start}" AND cal.date < "{interval_end}" + GROUP BY + act.course_id, + cal.iso_week_start, + cal.iso_week_end, + act.category + """.format( + interval_start=self.interval.date_a.isoformat(), + interval_end=self.interval.date_b.isoformat(), + ) + result = self._spark.sql(query) + result.coalesce(4).write.csv(self.output().path, mode='overwrite', sep='\t') + # with dataframe + # from pyspark.sql.functions import concat, lit, countDistinct + # user_activity_df = user_activity_df.filter( + # (user_activity_df['date'] >= self.interval.date_a.isoformat()) & + # (user_activity_df['date'] < self.interval.date_b.isoformat()) + # ) + # calendar_df = calendar_df.filter( + # (calendar_df['date'] >= self.interval.date_a.isoformat()) & + # (calendar_df['date'] < self.interval.date_b.isoformat()) + # ) + # joined_df = user_activity_df.join(calendar_df, on=(user_activity_df['date'] == calendar_df['date'])) + # raw_df = joined_df.withColumn('interval_start', concat(joined_df['iso_week_start'], lit(' 00:00:00'))) \ + # .withColumn('interval_end', concat(joined_df['iso_week_end'], lit(' 00:00:00'))) + # result = raw_df.groupBy('course_id', 'interval_start', 'interval_end', 'category').agg( + # countDistinct('username').alias('count') + # ) + + + class CourseActivityTableTask(BareHiveTableTask): @property @@ -334,7 +543,7 @@ def __init__(self, *args, **kwargs): @property def table(self): - return "course_activity" + return "course_activity_spark_trial" @property def columns(self): @@ -355,11 +564,10 @@ def indexes(self): @property def insert_source_task(self): - return CourseActivityPartitionTask( + return CourseActivityPartitionTaskSpark( warehouse_path=self.warehouse_path, end_date=self.end_date, weeks=self.weeks, - n_reduce_tasks=self.n_reduce_tasks, overwrite=self.overwrite_hive, overwrite_n_days=self.overwrite_n_days, ) diff --git a/edx/analytics/tasks/launchers/remote.py b/edx/analytics/tasks/launchers/remote.py index f04109452b..672b4007e6 100755 --- a/edx/analytics/tasks/launchers/remote.py +++ b/edx/analytics/tasks/launchers/remote.py @@ -119,7 +119,7 @@ def run_task_playbook(inventory, arguments, uid): env_var_string = ' '.join('{0}={1}'.format(k, v) for k, v in env_vars.iteritems()) - command = 'cd {code_dir} && . $HOME/.bashrc && {env_vars}{bg}{data_dir}/venv/bin/launch-task {task_arguments}{end_bg}'.format( + command = 'cd {code_dir} && . $HOME/.bashrc && . {data_dir}/venv/bin/activate && {env_vars}{bg}launch-task {task_arguments}{end_bg}'.format( env_vars=env_var_string + ' ' if env_var_string else '', data_dir=data_dir, code_dir=code_dir, diff --git a/edx/analytics/tasks/monitor/overall_events.py b/edx/analytics/tasks/monitor/overall_events.py index 5bb5c3eb9d..103c8a9c5c 100644 --- a/edx/analytics/tasks/monitor/overall_events.py +++ b/edx/analytics/tasks/monitor/overall_events.py @@ -5,9 +5,9 @@ import luigi from edx.analytics.tasks.common.mapreduce import MapReduceJobTask -from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin, EventLogSelectionMixinSpark +from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin +from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask from edx.analytics.tasks.util.url import get_target_from_url -from luigi.contrib.spark import PySparkTask log = logging.getLogger(__name__) @@ -37,56 +37,20 @@ def output(self): return get_target_from_url(self.output_root) -class SparkTotalEventsDailyTask(EventLogSelectionMixinSpark, PySparkTask): +class SparkTotalEventsDailyTask(EventLogSelectionMixinSpark, SparkJobTask): """Produce a dataset for total events within a given time period.""" - driver_memory = '2g' - executor_memory = '3g' - output_root = luigi.Parameter() - def __init__(self, *args, **kwargs): - super(SparkTotalEventsDailyTask, self).__init__(*args, **kwargs) - - # TODO: rename this method to output after testing is complete - def output_dir(self): + def output(self): return get_target_from_url(self.output_root) - def main(self, sc, *args): - from pyspark.sql import SparkSession - from pyspark.sql.types import * - from pyspark.sql.functions import to_date, udf, struct, date_format - spark = SparkSession.builder.getOrCreate() - event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True) - module_schema = StructType().add("display_name", StringType(), True) \ - .add("original_usage_key", StringType(), True) \ - .add("original_usage_version", StringType(), True) \ - .add("usage_key", StringType(), True) - context_schema = StructType().add("command", StringType(), True) \ - .add("course_id", StringType(), True) \ - .add("module", module_schema) \ - .add("org_id", StringType(), True) \ - .add("path", StringType(), True) \ - .add("user_id", StringType(), True) - - event_log_schema = StructType() \ - .add("username", StringType(), True) \ - .add("event_type", StringType(), True) \ - .add("ip", StringType(), True) \ - .add("agent", StringType(), True) \ - .add("host", StringType(), True) \ - .add("referer", StringType(), True) \ - .add("accept_language", StringType(), True) \ - .add("event", event_schema) \ - .add("event_source", StringType(), True) \ - .add("context", context_schema) \ - .add("time", StringType(), True) \ - .add("name", StringType(), True) \ - .add("page", StringType(), True) \ - .add("session", StringType(), True) - - df = spark.read.format('json').load(self.path_targets, schema=event_log_schema) - df = df.withColumn('event_date', date_format(to_date(df['time']), 'yyyy-MM-dd')) - df = df.filter(df['event_date'] == self.lower_bound_date_string).groupBy('event_date').count() - df.repartition(1).write.csv(self.output_dir().path, mode='overwrite', sep='\t') - # df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path) + def run(self): + self.remove_output_on_overwrite() + super(SparkTotalEventsDailyTask, self).run() + + def spark_job(self, *args): + df = self.get_event_log_dataframe(self._spark) + df = df.groupBy('event_date').count() + df.coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') + # df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path) \ No newline at end of file diff --git a/edx/analytics/tasks/monitor/total_events_report.py b/edx/analytics/tasks/monitor/total_events_report.py index d4c50fd43f..4e46875e61 100644 --- a/edx/analytics/tasks/monitor/total_events_report.py +++ b/edx/analytics/tasks/monitor/total_events_report.py @@ -7,8 +7,10 @@ from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin -from edx.analytics.tasks.monitor.overall_events import TotalEventsDailyTask +from edx.analytics.tasks.monitor.overall_events import TotalEventsDailyTask, SparkTotalEventsDailyTask from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url +from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin + log = logging.getLogger(__name__) @@ -66,19 +68,20 @@ def run(self): output_file.write('\n') -class TotalEventsReportWorkflow(MapReduceJobTaskMixin, TotalEventsReport, EventLogSelectionDownstreamMixin): +class TotalEventsReportWorkflow( + MapReduceJobTaskMixin, + OverwriteOutputMixin, + TotalEventsReport, + EventLogSelectionDownstreamMixin): """ Generates report for an event count by date for all events. - """ def requires(self): - return TotalEventsDailyTask( - mapreduce_engine=self.mapreduce_engine, - lib_jar=self.lib_jar, - n_reduce_tasks=self.n_reduce_tasks, + return SparkTotalEventsDailyTask( source=self.source, output_root=self.counts, + overwrite=self.overwrite, pattern=self.pattern, interval=self.interval ) diff --git a/edx/analytics/tasks/util/constants.py b/edx/analytics/tasks/util/constants.py new file mode 100644 index 0000000000..1c7e8bf1b5 --- /dev/null +++ b/edx/analytics/tasks/util/constants.py @@ -0,0 +1,7 @@ +class PredicateLabels(object): + """Constants for predicate labels.""" + + ACTIVE_LABEL = "ACTIVE" + PROBLEM_LABEL = "ATTEMPTED_PROBLEM" + PLAY_VIDEO_LABEL = "PLAYED_VIDEO" + POST_FORUM_LABEL = "POSTED_FORUM" diff --git a/edx/analytics/tasks/util/spark_util.py b/edx/analytics/tasks/util/spark_util.py new file mode 100644 index 0000000000..b1f46dfb14 --- /dev/null +++ b/edx/analytics/tasks/util/spark_util.py @@ -0,0 +1,78 @@ +"""Support for spark tasks""" +import edx.analytics.tasks.util.opaque_key_util as opaque_key_util +from edx.analytics.tasks.util.constants import PredicateLabels + + +def get_event_predicate_labels(event_type, event_source): + """ + Creates labels by applying hardcoded predicates to a single event. + Don't pass whole event row to any spark UDF as it generates a different output than expected + """ + # We only want the explicit event, not the implicit form. + # return 'test' + + labels = PredicateLabels.ACTIVE_LABEL + + # task & enrollment events are filtered out by spark later as it speeds up due to less # of records + + if event_source == 'server': + if event_type == 'problem_check': + labels += ',' + PredicateLabels.PROBLEM_LABEL + + if event_type.startswith('edx.forum.') and event_type.endswith('.created'): + labels += ',' + PredicateLabels.POST_FORUM_LABEL + + if event_source in ('browser', 'mobile'): + if event_type == 'play_video': + labels += ',' + PredicateLabels.PLAY_VIDEO_LABEL + + return labels + + +def get_key_value_from_event(event, key, default_value=None): + """ + Get value from event dict by key + Pyspark does not support dict.get() method, so this approach seems reasonable + """ + try: + default_value = event[key] + except KeyError: + pass + return default_value + + +def get_course_id(event_context, from_url=False): + """ + Gets course_id from event's data. + Don't pass whole event row to any spark UDF as it generates a different output than expected + """ + if event_context == '' or event_context is None: + # Assume it's old, and not worth logging... + return '' + + # Get the course_id from the data, and validate. + course_id = opaque_key_util.normalize_course_id(get_key_value_from_event(event_context, 'course_id', '')) + if course_id: + if opaque_key_util.is_valid_course_id(course_id): + return course_id + + return '' + + # TODO : make it work with url as well + # Try to get the course_id from the URLs in `event_type` (for implicit + # server events) and `page` (for browser events). + # if from_url: + # source = get_key_value_from_event(event, 'event_source') + # + # if source == 'server': + # url = get_key_value_from_event(event, 'event_type', '') + # elif source == 'browser': + # url = get_key_value_from_event(event, 'page', '') + # else: + # url = '' + # + # course_key = opaque_key_util.get_course_key_from_url(url) + # if course_key: + # return unicode(course_key) + # + # return '' From d06f87fe66066a3dab83ad017896c5ffa5b138ca Mon Sep 17 00:00:00 2001 From: rao-abdul-mannan Date: Wed, 21 Feb 2018 17:52:55 +0500 Subject: [PATCH 03/14] move spark runtime options to luigi config --- config/devstack.cfg | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/config/devstack.cfg b/config/devstack.cfg index 264373fa23..32d84ed1ee 100644 --- a/config/devstack.cfg +++ b/config/devstack.cfg @@ -131,3 +131,8 @@ api_root_url = http://localhost:8000/api/courses/v1/courses/ [course-blocks] api_root_url = http://localhost:8000/api/courses/v1/blocks/ + +[spark] +driver-memory=3g +executor-memory=3g +executor-cores=1 \ No newline at end of file From 78a842f0346db243176778522367c79d670cf192 Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Sat, 5 May 2018 14:00:59 +0500 Subject: [PATCH 04/14] replace username with user_id --- .../tests/acceptance/fixtures/input/user_activity_tracking.log | 2 +- .../tasks/warehouse/load_internal_reporting_user_activity.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log b/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log index a26070966f..b62c49f5c4 100644 --- a/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log +++ b/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log @@ -204,4 +204,4 @@ {"username": "dummy_username_2", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10002, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-06-19T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} {"username": "dummy_username_3", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10003, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-06-19T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} {"username": "dummy_username_4", "event_type": "seq_next", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"old\":5,\"new\":6,\"id\":\"block-v1:edX+DemoX+Test_2014+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5\"}", "event_source": "browser", "context": {"user_id": 10004, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-06-19T17:56:03.028696+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/interactive_demonstrations/19a30717eff543078a5d94ae9d6c18a5/"} -{"username": "dummy_username_3", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10003, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-05-25T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} +{"username": "dummy_username_3", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10003, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-05-25T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} \ No newline at end of file diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py index fe37828e89..0b8ce669cc 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py @@ -1,6 +1,5 @@ """ Loads the user_activity table into the warehouse through the pipeline via Hive. - On the roadmap is to write a task that runs validation queries on the aggregated Hive data pre-load. """ import logging @@ -24,7 +23,6 @@ class LoadInternalReportingUserActivityToWarehouse(WarehouseMixin, VerticaCopyTask): """ Loads the user activity table from Hive into the Vertica data warehouse. - """ date = luigi.DateParameter() n_reduce_tasks = luigi.Parameter( From 0b8b937d710a5375f5b08244653ee09233ff0c0a Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Tue, 22 May 2018 17:18:52 +0500 Subject: [PATCH 05/14] rebased with master before location history job conversion --- edx/analytics/tasks/insights/tests/test_user_activity.py | 4 +--- edx/analytics/tasks/insights/user_activity.py | 2 +- edx/analytics/tasks/monitor/overall_events.py | 2 +- edx/analytics/tasks/monitor/total_events_report.py | 8 ++++---- .../acceptance/fixtures/input/user_activity_tracking.log | 2 +- .../warehouse/load_internal_reporting_user_activity.py | 2 ++ 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/edx/analytics/tasks/insights/tests/test_user_activity.py b/edx/analytics/tasks/insights/tests/test_user_activity.py index 97e16cfaf3..d814902c0a 100644 --- a/edx/analytics/tasks/insights/tests/test_user_activity.py +++ b/edx/analytics/tasks/insights/tests/test_user_activity.py @@ -11,9 +11,7 @@ from mock import Mock, call from edx.analytics.tasks.common.tests.map_reduce_mixins import MapperTestMixin, ReducerTestMixin -from edx.analytics.tasks.insights.user_activity import ( - InsertToMysqlCourseActivityTask, UserActivityTask -) +from edx.analytics.tasks.insights.user_activity import InsertToMysqlCourseActivityTask, UserActivityTask from edx.analytics.tasks.util.constants import PredicateLabels from edx.analytics.tasks.util.tests.opaque_key_mixins import InitializeLegacyKeysMixin, InitializeOpaqueKeysMixin diff --git a/edx/analytics/tasks/insights/user_activity.py b/edx/analytics/tasks/insights/user_activity.py index e15cad7212..49ab94c534 100644 --- a/edx/analytics/tasks/insights/user_activity.py +++ b/edx/analytics/tasks/insights/user_activity.py @@ -26,6 +26,7 @@ logging.getLogger('boto').setLevel(logging.INFO) + class UserActivityTask(OverwriteOutputMixin, WarehouseMixin, EventLogSelectionMixin, MultiOutputMapReduceJobTask): """ Categorize activity of users. @@ -411,7 +412,6 @@ def spark_job(self, *args): # ) - class CourseActivityTableTask(BareHiveTableTask): @property diff --git a/edx/analytics/tasks/monitor/overall_events.py b/edx/analytics/tasks/monitor/overall_events.py index 103c8a9c5c..29f1536fab 100644 --- a/edx/analytics/tasks/monitor/overall_events.py +++ b/edx/analytics/tasks/monitor/overall_events.py @@ -53,4 +53,4 @@ def spark_job(self, *args): df = self.get_event_log_dataframe(self._spark) df = df.groupBy('event_date').count() df.coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') - # df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path) \ No newline at end of file + # df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path) diff --git a/edx/analytics/tasks/monitor/total_events_report.py b/edx/analytics/tasks/monitor/total_events_report.py index 4e46875e61..117d400c1a 100644 --- a/edx/analytics/tasks/monitor/total_events_report.py +++ b/edx/analytics/tasks/monitor/total_events_report.py @@ -7,10 +7,9 @@ from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin -from edx.analytics.tasks.monitor.overall_events import TotalEventsDailyTask, SparkTotalEventsDailyTask -from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url +from edx.analytics.tasks.monitor.overall_events import SparkTotalEventsDailyTask, TotalEventsDailyTask from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin - +from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url log = logging.getLogger(__name__) @@ -72,7 +71,8 @@ class TotalEventsReportWorkflow( MapReduceJobTaskMixin, OverwriteOutputMixin, TotalEventsReport, - EventLogSelectionDownstreamMixin): + EventLogSelectionDownstreamMixin +): """ Generates report for an event count by date for all events. """ diff --git a/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log b/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log index b62c49f5c4..a26070966f 100644 --- a/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log +++ b/edx/analytics/tasks/tests/acceptance/fixtures/input/user_activity_tracking.log @@ -204,4 +204,4 @@ {"username": "dummy_username_2", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10002, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-06-19T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} {"username": "dummy_username_3", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10003, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-06-19T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} {"username": "dummy_username_4", "event_type": "seq_next", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"old\":5,\"new\":6,\"id\":\"block-v1:edX+DemoX+Test_2014+type@sequential+block@19a30717eff543078a5d94ae9d6c18a5\"}", "event_source": "browser", "context": {"user_id": 10004, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-06-19T17:56:03.028696+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/interactive_demonstrations/19a30717eff543078a5d94ae9d6c18a5/"} -{"username": "dummy_username_3", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10003, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-05-25T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} \ No newline at end of file +{"username": "dummy_username_3", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36", "host": "example.m.sandbox.edx.org", "session": "495604b91e7522ca25b9da1a15384aaa", "event": "{\"id\":\"0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":0,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"user_id": 10003, "org_id": "edX", "course_id": "course-v1:edX+DemoX+Test_2014", "path": "/event"}, "time": "2014-05-25T17:47:25.605078+00:00", "page": "http://example.m.sandbox.edx.org/courses/course-v1:edX+DemoX+Test_2014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"} diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py index 0b8ce669cc..fe37828e89 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py @@ -1,5 +1,6 @@ """ Loads the user_activity table into the warehouse through the pipeline via Hive. + On the roadmap is to write a task that runs validation queries on the aggregated Hive data pre-load. """ import logging @@ -23,6 +24,7 @@ class LoadInternalReportingUserActivityToWarehouse(WarehouseMixin, VerticaCopyTask): """ Loads the user activity table from Hive into the Vertica data warehouse. + """ date = luigi.DateParameter() n_reduce_tasks = luigi.Parameter( From 01f6b2b99336393861bedc50427519009a0f65ee Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Mon, 28 May 2018 16:38:46 +0500 Subject: [PATCH 06/14] user location history spark task --- edx/analytics/tasks/common/spark.py | 13 +++ .../tasks/insights/location_per_course.py | 80 +++++++++++++++++++ edx/analytics/tasks/util/spark_util.py | 12 +++ 3 files changed, 105 insertions(+) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index fa73144e0d..06ba4febab 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -221,6 +221,12 @@ class SparkJobTask(OverwriteOutputMixin, PySparkTask): description='No. of cores for each executor', significant=False, ) + spark_conf = luigi.Parameter( + config_path={'section': 'spark', 'name': 'conf'}, + description='Spark configuration', + significant=False, + default=None + ) always_log_stderr = False # log stderr if spark fails, True for verbose log def init_spark(self, sc): @@ -234,6 +240,13 @@ def init_spark(self, sc): self._spark = SparkSession.builder.getOrCreate() self._hive_context = HiveContext(sc) + @property + def conf(self): + """ + Adds spark configuration to spark-submit task + """ + return self._dict_config(self.spark_conf) + def spark_job(self): """ Spark code for the job diff --git a/edx/analytics/tasks/insights/location_per_course.py b/edx/analytics/tasks/insights/location_per_course.py index 77d69c12f8..402413da54 100644 --- a/edx/analytics/tasks/insights/location_per_course.py +++ b/edx/analytics/tasks/insights/location_per_course.py @@ -15,6 +15,7 @@ from edx.analytics.tasks.common.pathutil import ( EventLogSelectionDownstreamMixin, EventLogSelectionMixin, PathSelectionByDateIntervalTask ) +from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask from edx.analytics.tasks.insights.database_imports import ImportStudentCourseEnrollmentTask from edx.analytics.tasks.util import eventlog from edx.analytics.tasks.util.decorators import workflow_entry_point @@ -163,6 +164,85 @@ def run(self): target.open("w").close() # touch the file +class LastDailyIpAddressOfUserTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask): + """Spark alternate of LastDailyIpAddressOfUserTask""" + + output_parent_dir = 'last_ip_of_user_id' + marker = luigi.Parameter( + config_path={'section': 'map-reduce', 'name': 'marker'}, + significant=False, + description='A URL location to a directory where a marker file will be written on task completion.', + ) + + def output_dir(self): + """ + Output directory for spark task + """ + return get_target_from_url( + url_path_join( + self.warehouse_path, + self.output_parent_dir + ) + ) + + def output(self): + """ + Marker output path + """ + marker_url = url_path_join(self.marker, str(hash(self))) + return get_target_from_url(marker_url, marker=True) + + def output_paths(self): + """ + Output partition paths + """ + return map( + lambda date: get_target_from_url( + url_path_join( + self.hive_partition_path(self.output_parent_dir, date.isoformat()) + ) + ), + self.interval + ) + + def on_success(self): # pragma: no cover + self.output().touch_marker() + + def run(self): + self.remove_output_on_overwrite() + if self.output_dir().exists(): # only check partitions if parent dir exists + for target in self.output_paths(): + if target.exists(): + target.remove() + super(LastDailyIpAddressOfUserTaskSpark, self).run() + + def spark_job(self, *args): + from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id, get_event_time_string + from pyspark.sql.functions import udf + from pyspark.sql.window import Window + from pyspark.sql.types import StringType + df = self.get_event_log_dataframe(self._spark) + get_event_time = udf(get_event_time_string, StringType()) + get_courseid = udf(get_course_id, StringType()) + df = df.withColumn('course_id', get_courseid(df['context'])) \ + .withColumn('timestamp', get_event_time(df['time'])) + df.createOrReplaceTempView('location') + query = """ + SELECT + timestamp, ip, user_id, course_id, dt + FROM ( + SELECT + event_date as dt, context.user_id as user_id, course_id, timestamp, ip, + ROW_NUMBER() over ( PARTITION BY event_date, context.user_id, course_id ORDER BY timestamp desc) as rank + FROM location + WHERE ip <> '' AND timestamp <> '' AND context.user_id <> '' + ) user_location + WHERE rank = 1 + """ + result = self._spark.sql(query) + result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') + + class LastCountryOfUserDownstreamMixin( WarehouseMixin, OverwriteOutputMixin, diff --git a/edx/analytics/tasks/util/spark_util.py b/edx/analytics/tasks/util/spark_util.py index b1f46dfb14..8d03647e92 100644 --- a/edx/analytics/tasks/util/spark_util.py +++ b/edx/analytics/tasks/util/spark_util.py @@ -41,6 +41,18 @@ def get_key_value_from_event(event, key, default_value=None): return default_value +def get_event_time_string(event_time): + """Returns the time of the event as an ISO8601 formatted string.""" + try: + # Get entry, and strip off time zone information. Keep microseconds, if any. + timestamp = event_time.split('+')[0] + if '.' not in timestamp: + timestamp = '{datetime}.000000'.format(datetime=timestamp) + return timestamp + except Exception: # pylint: disable=broad-except + return '' + + def get_course_id(event_context, from_url=False): """ Gets course_id from event's data. From b863f58162ce5faa0d430be3c8be6efb0bb6f6b3 Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Fri, 1 Jun 2018 04:17:18 +0500 Subject: [PATCH 07/14] using manifest file as input source to spark --- edx/analytics/tasks/common/spark.py | 141 ++++++++++++++---- .../tasks/insights/location_per_course.py | 8 +- edx/analytics/tasks/util/manifest.py | 15 +- 3 files changed, 125 insertions(+), 39 deletions(-) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index 06ba4febab..61a69ed8a0 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -1,17 +1,25 @@ import ast import json +import logging import os import tempfile import zipfile +import luigi import luigi.configuration from luigi.contrib.spark import PySparkTask from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask +from edx.analytics.tasks.util.manifest import ( + ManifestInputTargetMixin, convert_to_manifest_input_if_necessary, remove_manifest_target_if_exists +) from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin +from edx.analytics.tasks.util.url import UncheckedExternalURL, get_target_from_url, url_path_join _file_path_to_package_meta_path = {} +log = logging.getLogger(__name__) + def get_package_metadata_paths(): """ @@ -126,17 +134,74 @@ def add_files_for_package(sub_package_path, root_package_path, root_package_name return archives_list +class SparkMixin(): + driver_memory = luigi.Parameter( + config_path={'section': 'spark', 'name': 'driver-memory'}, + description='Memory for spark driver', + significant=False, + ) + executor_memory = luigi.Parameter( + config_path={'section': 'spark', 'name': 'executor-memory'}, + description='Memory for each executor', + significant=False, + ) + executor_cores = luigi.Parameter( + config_path={'section': 'spark', 'name': 'executor-cores'}, + description='No. of cores for each executor', + significant=False, + ) + spark_conf = luigi.Parameter( + config_path={'section': 'spark', 'name': 'conf'}, + description='Spark configuration', + significant=False, + default=None + ) + always_log_stderr = False # log stderr if spark fails, True for verbose log + + +class PathSelectionTaskSpark(EventLogSelectionDownstreamMixin, luigi.WrapperTask): + """ + Path selection task with manifest feature for spark + """ + targets = None + manifest_id = luigi.Parameter( + description='File name for manifest' + ) + manifest_dir = luigi.Parameter( + description='Directory for manifest files' + ) + + def requires(self): + if not self.targets: + self.targets = self._get_targets() + return self.targets + + def _get_targets(self): + input = PathSelectionByDateIntervalTask( + source=self.source, + interval=self.interval, + pattern=self.pattern, + date_pattern=self.date_pattern + ).output() + targets = luigi.task.flatten( + convert_to_manifest_input_if_necessary(self.manifest_id, input, self.manifest_dir) + ) + return [UncheckedExternalURL(target.path) for target in targets] + + def output(self): + return [target.output() for target in self.requires()] + + class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): """ Extract events corresponding to a specified time interval. """ - path_targets = None def __init__(self, *args, **kwargs): """ Call path selection task to get list of log files matching the pattern """ - super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs) + super(EventLogSelectionMixinSpark, self).__init__(*args, **kwargs) self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member @@ -176,16 +241,32 @@ def get_log_schema(self): return event_log_schema - def get_event_log_dataframe(self, spark, *args, **kwargs): - from pyspark.sql.functions import to_date, udf, struct, date_format - path_targets = PathSelectionByDateIntervalTask( + def get_input_source(self, *args): + manifest_path = self.get_config_from_args('manifest_path', *args, default_value='') + targets = PathSelectionTaskSpark( source=self.source, interval=self.interval, pattern=self.pattern, date_pattern=self.date_pattern, + manifest_id=self.manifest_id, + manifest_dir=manifest_path, ).output() - self.path_targets = [task.path for task in path_targets] - dataframe = spark.read.format('json').load(self.path_targets, schema=self.get_log_schema()) + if len(targets) and 'manifest' in targets[0].path: + # Reading manifest as rdd with spark is alot faster as compared to hadoop. + # Currently, we're getting only 1 manifest file per request, so we will create a single rdd from it. + # If there are multiple manifest files, each file can be read as rdd and then union it with other manifest rdds + source_rdd = self._spark.sparkContext.textFile(targets[0].path) + broadcast_value = self._spark.sparkContext.broadcast(source_rdd.collect()) + else: + broadcast_value = self._spark.sparkContext.broadcast([target.path for target in targets]) + return broadcast_value + + def get_event_log_dataframe(self, spark, *args, **kwargs): + from pyspark.sql.functions import to_date, udf, struct, date_format + dataframe = spark.read.format('json').load( + self.get_input_source(*args).value, + schema=self.get_log_schema() + ) dataframe = dataframe.filter(dataframe['time'].isNotNull()) \ .withColumn('event_date', date_format(to_date(dataframe['time']), 'yyyy-MM-dd')) dataframe = dataframe.filter( @@ -195,7 +276,7 @@ def get_event_log_dataframe(self, spark, *args, **kwargs): return dataframe -class SparkJobTask(OverwriteOutputMixin, PySparkTask): +class SparkJobTask(SparkMixin, OverwriteOutputMixin, EventLogSelectionDownstreamMixin, PySparkTask): """ Wrapper for spark task """ @@ -205,29 +286,7 @@ class SparkJobTask(OverwriteOutputMixin, PySparkTask): _sql_context = None _hive_context = None _tmp_dir = None - - driver_memory = luigi.Parameter( - config_path={'section': 'spark', 'name': 'driver-memory'}, - description='Memory for spark driver', - significant=False, - ) - executor_memory = luigi.Parameter( - config_path={'section': 'spark', 'name': 'executor-memory'}, - description='Memory for each executor', - significant=False, - ) - executor_cores = luigi.Parameter( - config_path={'section': 'spark', 'name': 'executor-cores'}, - description='No. of cores for each executor', - significant=False, - ) - spark_conf = luigi.Parameter( - config_path={'section': 'spark', 'name': 'conf'}, - description='Spark configuration', - significant=False, - default=None - ) - always_log_stderr = False # log stderr if spark fails, True for verbose log + log = None def init_spark(self, sc): """ @@ -247,6 +306,26 @@ def conf(self): """ return self._dict_config(self.spark_conf) + @property + def manifest_id(self): + params = { + 'source': self.source, + 'interval': self.interval, + 'pattern': self.pattern, + 'date_pattern': self.date_pattern, + 'spark': 'for_some_difference_with_hadoop_manifest' + } + return str(hash(frozenset(params.items()))).replace('-', 'n') + + def get_manifest_path(self, *args): + manifest_path = self.get_config_from_args('manifest_path', *args, default_value='') + return get_target_from_url( + url_path_join( + manifest_path, + self.manifest_id + '.manifest' + ) + ) + def spark_job(self): """ Spark code for the job diff --git a/edx/analytics/tasks/insights/location_per_course.py b/edx/analytics/tasks/insights/location_per_course.py index 402413da54..1641ecd8d4 100644 --- a/edx/analytics/tasks/insights/location_per_course.py +++ b/edx/analytics/tasks/insights/location_per_course.py @@ -216,12 +216,18 @@ def run(self): target.remove() super(LastDailyIpAddressOfUserTaskSpark, self).run() + def get_luigi_configuration(self): + options = {} + config = luigi.configuration.get_config() + options['manifest_path'] = config.get('manifest', 'path', '') + return options + def spark_job(self, *args): from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id, get_event_time_string from pyspark.sql.functions import udf from pyspark.sql.window import Window from pyspark.sql.types import StringType - df = self.get_event_log_dataframe(self._spark) + df = self.get_event_log_dataframe(self._spark, *args) get_event_time = udf(get_event_time_string, StringType()) get_courseid = udf(get_course_id, StringType()) df = df.withColumn('course_id', get_courseid(df['context'])) \ diff --git a/edx/analytics/tasks/util/manifest.py b/edx/analytics/tasks/util/manifest.py index f8b4600a52..8cfec843fc 100644 --- a/edx/analytics/tasks/util/manifest.py +++ b/edx/analytics/tasks/util/manifest.py @@ -12,14 +12,14 @@ log = logging.getLogger(__name__) -def convert_to_manifest_input_if_necessary(manifest_id, targets): +def convert_to_manifest_input_if_necessary(manifest_id, targets, manifest_dir=None): targets = luigi.task.flatten(targets) threshold = configuration.get_config().getint(CONFIG_SECTION, 'threshold', -1) if threshold > 0 and len(targets) >= threshold: log.debug( 'Using manifest since %d inputs are greater than or equal to the threshold %d', len(targets), threshold ) - return [create_manifest_target(manifest_id, targets)] + return [create_manifest_target(manifest_id, targets, manifest_dir)] else: log.debug( 'Directly processing files since %d inputs are less than the threshold %d', len(targets), threshold @@ -27,14 +27,15 @@ def convert_to_manifest_input_if_necessary(manifest_id, targets): return targets -def get_manifest_file_path(manifest_id): +def get_manifest_file_path(manifest_id, manifest_dir=None): # Construct the manifest file URL from the manifest_id and the configuration - base_url = configuration.get_config().get(CONFIG_SECTION, 'path') - manifest_file_path = url_path_join(base_url, manifest_id + '.manifest') + if manifest_dir is None: + manifest_dir = configuration.get_config().get(CONFIG_SECTION, 'path') + manifest_file_path = url_path_join(manifest_dir, manifest_id + '.manifest') return manifest_file_path -def create_manifest_target(manifest_id, targets): +def create_manifest_target(manifest_id, targets, manifest_dir=None): # If we are running locally, we need our manifest file to be a local file target, however, if we are running on # a real Hadoop cluster, it has to be an HDFS file so that the input format can read it. Luigi makes it a little # difficult for us to construct a target that can be one or the other of those types of targets at runtime since @@ -42,7 +43,7 @@ def create_manifest_target(manifest_id, targets): # base class at runtime based on the URL of the manifest file. # Construct the manifest file URL from the manifest_id and the configuration - manifest_file_path = get_manifest_file_path(manifest_id) + manifest_file_path = get_manifest_file_path(manifest_id, manifest_dir) # Figure out the type of target that should be used to write/read the file. manifest_file_target_class, init_args, init_kwargs = get_target_class_from_url(manifest_file_path) From 744684ec1d87296bd7ce0c5f76514cabff43aaff Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Thu, 7 Jun 2018 21:59:29 +0500 Subject: [PATCH 08/14] Enable logging in spark tasks --- edx/analytics/tasks/common/spark.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index 61a69ed8a0..63eb311024 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -255,9 +255,11 @@ def get_input_source(self, *args): # Reading manifest as rdd with spark is alot faster as compared to hadoop. # Currently, we're getting only 1 manifest file per request, so we will create a single rdd from it. # If there are multiple manifest files, each file can be read as rdd and then union it with other manifest rdds + self.log.warn("PYSPARK LOGGER: Reading manifest file :: {} ".format(targets[0].path)) source_rdd = self._spark.sparkContext.textFile(targets[0].path) broadcast_value = self._spark.sparkContext.broadcast(source_rdd.collect()) else: + self.log.warn("PYSPARK LOGGER: Reading normal targets") broadcast_value = self._spark.sparkContext.broadcast([target.path for target in targets]) return broadcast_value @@ -298,6 +300,8 @@ def init_spark(self, sc): self._spark_context = sc self._spark = SparkSession.builder.getOrCreate() self._hive_context = HiveContext(sc) + log4jLogger = sc._jvm.org.apache.log4j # using spark logger + self.log = log4jLogger.LogManager.getLogger(__name__) @property def conf(self): From e204e498d948fa22b5dcc58571620f2a41e4a9e6 Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Fri, 8 Jun 2018 15:33:30 +0500 Subject: [PATCH 09/14] Process event logs directly --- edx/analytics/tasks/common/spark.py | 66 ++++++++----------- .../tasks/insights/location_per_course.py | 4 +- 2 files changed, 28 insertions(+), 42 deletions(-) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index 63eb311024..df645eb8dd 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -197,6 +197,11 @@ class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): Extract events corresponding to a specified time interval. """ + direct_eventlogs_processing = luigi.BoolParameter( + description='Whether or not to process event log source directly with spark', + default=False + ) + def __init__(self, *args, **kwargs): """ Call path selection task to get list of log files matching the pattern @@ -242,33 +247,34 @@ def get_log_schema(self): return event_log_schema def get_input_source(self, *args): - manifest_path = self.get_config_from_args('manifest_path', *args, default_value='') - targets = PathSelectionTaskSpark( + return PathSelectionByDateIntervalTask( source=self.source, interval=self.interval, pattern=self.pattern, - date_pattern=self.date_pattern, - manifest_id=self.manifest_id, - manifest_dir=manifest_path, + date_pattern=self.date_pattern ).output() - if len(targets) and 'manifest' in targets[0].path: - # Reading manifest as rdd with spark is alot faster as compared to hadoop. - # Currently, we're getting only 1 manifest file per request, so we will create a single rdd from it. - # If there are multiple manifest files, each file can be read as rdd and then union it with other manifest rdds - self.log.warn("PYSPARK LOGGER: Reading manifest file :: {} ".format(targets[0].path)) - source_rdd = self._spark.sparkContext.textFile(targets[0].path) - broadcast_value = self._spark.sparkContext.broadcast(source_rdd.collect()) - else: - self.log.warn("PYSPARK LOGGER: Reading normal targets") - broadcast_value = self._spark.sparkContext.broadcast([target.path for target in targets]) - return broadcast_value def get_event_log_dataframe(self, spark, *args, **kwargs): from pyspark.sql.functions import to_date, udf, struct, date_format - dataframe = spark.read.format('json').load( - self.get_input_source(*args).value, - schema=self.get_log_schema() - ) + schema = self.get_log_schema() + if self.direct_eventlogs_processing: + self.log.warn("\nPYSPARK => Processing event log source directly\n") + event_log_source = self.get_config_from_args('event_log_source', *args, default_value=None) + if event_log_source is not None: + event_log_source = json.loads(event_log_source) + self.log.warn("\nPYSPARK => Event log source : {}\n".format(event_log_source)) + dataframe = spark.read.format('json').load(event_log_source[0], schema=self.get_log_schema()) + source_list_count = len(event_log_source) + if source_list_count > 1: + for k in range(1, source_list_count): + dataframe = dataframe.union( + spark.read.format('json').load(event_log_source[k], schema=self.get_log_schema()) + ) + else: + self.log.warn("\nPYSPARK => Processing path selection output\n") + input_source = self.get_input_source(*args) + path_targets = [target.path for target in input_source] + dataframe = spark.read.format('json').load(path_targets, schema=self.get_log_schema()) dataframe = dataframe.filter(dataframe['time'].isNotNull()) \ .withColumn('event_date', date_format(to_date(dataframe['time']), 'yyyy-MM-dd')) dataframe = dataframe.filter( @@ -310,26 +316,6 @@ def conf(self): """ return self._dict_config(self.spark_conf) - @property - def manifest_id(self): - params = { - 'source': self.source, - 'interval': self.interval, - 'pattern': self.pattern, - 'date_pattern': self.date_pattern, - 'spark': 'for_some_difference_with_hadoop_manifest' - } - return str(hash(frozenset(params.items()))).replace('-', 'n') - - def get_manifest_path(self, *args): - manifest_path = self.get_config_from_args('manifest_path', *args, default_value='') - return get_target_from_url( - url_path_join( - manifest_path, - self.manifest_id + '.manifest' - ) - ) - def spark_job(self): """ Spark code for the job diff --git a/edx/analytics/tasks/insights/location_per_course.py b/edx/analytics/tasks/insights/location_per_course.py index 1641ecd8d4..2ad1241335 100644 --- a/edx/analytics/tasks/insights/location_per_course.py +++ b/edx/analytics/tasks/insights/location_per_course.py @@ -219,7 +219,7 @@ def run(self): def get_luigi_configuration(self): options = {} config = luigi.configuration.get_config() - options['manifest_path'] = config.get('manifest', 'path', '') + options['event_log_source'] = config.get('event-logs', 'source', '') return options def spark_job(self, *args): @@ -246,7 +246,7 @@ def spark_job(self, *args): WHERE rank = 1 """ result = self._spark.sql(query) - result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') + result.coalesce(2).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') class LastCountryOfUserDownstreamMixin( From 2b7b94be6032e3ba07b5c57a50e638c8e2c58980 Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Sun, 10 Jun 2018 15:51:11 +0500 Subject: [PATCH 10/14] using wildcards with dataframe source --- edx/analytics/tasks/common/spark.py | 14 +++----------- .../tasks/insights/location_per_course.py | 8 +------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index df645eb8dd..640c1b6aff 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -259,17 +259,9 @@ def get_event_log_dataframe(self, spark, *args, **kwargs): schema = self.get_log_schema() if self.direct_eventlogs_processing: self.log.warn("\nPYSPARK => Processing event log source directly\n") - event_log_source = self.get_config_from_args('event_log_source', *args, default_value=None) - if event_log_source is not None: - event_log_source = json.loads(event_log_source) - self.log.warn("\nPYSPARK => Event log source : {}\n".format(event_log_source)) - dataframe = spark.read.format('json').load(event_log_source[0], schema=self.get_log_schema()) - source_list_count = len(event_log_source) - if source_list_count > 1: - for k in range(1, source_list_count): - dataframe = dataframe.union( - spark.read.format('json').load(event_log_source[k], schema=self.get_log_schema()) - ) + source = [src.encode('utf-8') for src in self.source] + self.log.warn("\nPYSPARK => Event log source : {}\n".format(source)) + dataframe = spark.read.format('json').load(source, schema=self.get_log_schema()) else: self.log.warn("\nPYSPARK => Processing path selection output\n") input_source = self.get_input_source(*args) diff --git a/edx/analytics/tasks/insights/location_per_course.py b/edx/analytics/tasks/insights/location_per_course.py index 2ad1241335..5f767256cf 100644 --- a/edx/analytics/tasks/insights/location_per_course.py +++ b/edx/analytics/tasks/insights/location_per_course.py @@ -216,12 +216,6 @@ def run(self): target.remove() super(LastDailyIpAddressOfUserTaskSpark, self).run() - def get_luigi_configuration(self): - options = {} - config = luigi.configuration.get_config() - options['event_log_source'] = config.get('event-logs', 'source', '') - return options - def spark_job(self, *args): from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id, get_event_time_string from pyspark.sql.functions import udf @@ -246,7 +240,7 @@ def spark_job(self, *args): WHERE rank = 1 """ result = self._spark.sql(query) - result.coalesce(2).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') + result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') class LastCountryOfUserDownstreamMixin( From 64308befa0672d75709c2a4b40ffb3c9ff4d83fc Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Tue, 12 Jun 2018 23:19:37 +0500 Subject: [PATCH 11/14] with rdd --- edx/analytics/tasks/common/spark.py | 40 ++++++++++++++ .../tasks/insights/location_per_course.py | 16 +++--- edx/analytics/tasks/util/spark_util.py | 54 +++++++++++++++++++ 3 files changed, 101 insertions(+), 9 deletions(-) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index 640c1b6aff..fb3bc17310 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -14,6 +14,7 @@ ManifestInputTargetMixin, convert_to_manifest_input_if_necessary, remove_manifest_target_if_exists ) from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin +from edx.analytics.tasks.util.spark_util import load_and_filter from edx.analytics.tasks.util.url import UncheckedExternalURL, get_target_from_url, url_path_join _file_path_to_package_meta_path = {} @@ -201,6 +202,14 @@ class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): description='Whether or not to process event log source directly with spark', default=False ) + cache_rdd = luigi.BoolParameter( + description="Whether to cache rdd or not", + default=False + ) + rdd_checkpoint_directory = luigi.Parameter( + description="Path to directory where rdd can be checkpointed", + default=None + ) def __init__(self, *args, **kwargs): """ @@ -275,6 +284,37 @@ def get_event_log_dataframe(self, spark, *args, **kwargs): ) return dataframe + def get_user_location_schema(self): + from pyspark.sql.types import StructType, StringType, IntegerType + schema = StructType().add("user_id", IntegerType(), True) \ + .add("course_id", StringType(), True) \ + .add("ip", StringType(), True) \ + .add("timestamp", StringType(), True) \ + .add("event_date", StringType(), True) + + def get_dataframe(self, spark, *args, **kwargs): + from pyspark.sql.functions import to_date, udf, struct, date_format + input_source = self.get_input_source(*args) + user_location_schema = self.get_user_location_schema() + master_rdd = spark.sparkContext.union( + # filter out unwanted data as much as possible within each rdd before union + map( + lambda target: load_and_filter(spark, target.path, self.lower_bound_date_string, + self.upper_bound_date_string), + input_source + ) + ) + if self.rdd_checkpoint_directory: + # set checkpoint location before checkpointing + spark.sparkContext.setCheckpointDir(self.rdd_checkpoint_directory) + master_rdd.localCheckpoint() + if self.cache_rdd: + master_rdd.cache() + dataframe = spark.createDataFrame(master_rdd, schema=user_location_schema) + if 'user_id' not in dataframe.columns: # rename columns if they weren't named properly by createDataFrame + dataframe = dataframe.toDF('user_id', 'course_id', 'ip', 'timestamp', 'event_date') + return dataframe + class SparkJobTask(SparkMixin, OverwriteOutputMixin, EventLogSelectionDownstreamMixin, PySparkTask): """ diff --git a/edx/analytics/tasks/insights/location_per_course.py b/edx/analytics/tasks/insights/location_per_course.py index 5f767256cf..98cd76c563 100644 --- a/edx/analytics/tasks/insights/location_per_course.py +++ b/edx/analytics/tasks/insights/location_per_course.py @@ -217,25 +217,23 @@ def run(self): super(LastDailyIpAddressOfUserTaskSpark, self).run() def spark_job(self, *args): - from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id, get_event_time_string + from edx.analytics.tasks.util.spark_util import validate_course_id from pyspark.sql.functions import udf from pyspark.sql.window import Window from pyspark.sql.types import StringType - df = self.get_event_log_dataframe(self._spark, *args) - get_event_time = udf(get_event_time_string, StringType()) - get_courseid = udf(get_course_id, StringType()) - df = df.withColumn('course_id', get_courseid(df['context'])) \ - .withColumn('timestamp', get_event_time(df['time'])) + df = self.get_dataframe(self._spark, *args) + validate_courseid = udf(validate_course_id, StringType()) + df = df.withColumn('course_id', validate_courseid(df['course_id'])) df.createOrReplaceTempView('location') query = """ SELECT timestamp, ip, user_id, course_id, dt FROM ( SELECT - event_date as dt, context.user_id as user_id, course_id, timestamp, ip, - ROW_NUMBER() over ( PARTITION BY event_date, context.user_id, course_id ORDER BY timestamp desc) as rank + event_date as dt, user_id, course_id, timestamp, ip, + ROW_NUMBER() over ( PARTITION BY event_date, user_id, course_id ORDER BY timestamp desc) as rank FROM location - WHERE ip <> '' AND timestamp <> '' AND context.user_id <> '' + WHERE ip <> '' ) user_location WHERE rank = 1 """ diff --git a/edx/analytics/tasks/util/spark_util.py b/edx/analytics/tasks/util/spark_util.py index 8d03647e92..74fbeb9337 100644 --- a/edx/analytics/tasks/util/spark_util.py +++ b/edx/analytics/tasks/util/spark_util.py @@ -1,7 +1,12 @@ """Support for spark tasks""" +import json +import re + import edx.analytics.tasks.util.opaque_key_util as opaque_key_util from edx.analytics.tasks.util.constants import PredicateLabels +PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$') + def get_event_predicate_labels(event_type, event_source): """ @@ -53,6 +58,55 @@ def get_event_time_string(event_time): return '' +def filter_event_logs(row, lower_bound_date_string, upper_bound_date_string): + if row is None: + return () + context = row.get('context', '') + raw_time = row.get('time', '') + if not context or not raw_time: + return () + course_id = context.get('course_id', '').encode('utf-8') + user_id = context.get('user_id', None) + time = get_event_time_string(raw_time).encode('utf-8') + ip = row.get('ip', '').encode('utf-8') + if not user_id or not time: + return () + date_string = raw_time.split("T")[0].encode('utf-8') + if date_string < lower_bound_date_string or date_string >= upper_bound_date_string: + return () # discard events outside the date interval + return (user_id, course_id, ip, time, date_string) + + +def parse_json_event(line, nested=False): + """ + Parse a tracking log input line as JSON to create a dict representation. + """ + try: + parsed = json.loads(line) + except Exception: + if not nested: + json_match = PATTERN_JSON.match(line) + if json_match: + return parse_json_event(json_match.group(1), nested=True) + return None + return parsed + + +def load_and_filter(spark_session, file, lower_bound_date_string, upper_bound_date_string): + return spark_session.sparkContext.textFile(file) \ + .map(parse_json_event) \ + .map(lambda row: filter_event_logs(row, lower_bound_date_string, upper_bound_date_string)) \ + .filter(bool) + + +def validate_course_id(course_id): + course_id = opaque_key_util.normalize_course_id(course_id) + if course_id: + if opaque_key_util.is_valid_course_id(course_id): + return course_id + return '' + + def get_course_id(event_context, from_url=False): """ Gets course_id from event's data. From 1a42858fccc422e3e019387adff1f6674f58a136 Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Wed, 13 Jun 2018 16:36:36 +0500 Subject: [PATCH 12/14] enable verbose logging --- edx/analytics/tasks/common/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index fb3bc17310..23f1c0d1f4 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -157,7 +157,7 @@ class SparkMixin(): significant=False, default=None ) - always_log_stderr = False # log stderr if spark fails, True for verbose log + always_log_stderr = True # log stderr if spark fails, True for verbose log class PathSelectionTaskSpark(EventLogSelectionDownstreamMixin, luigi.WrapperTask): From 5f6726dca4933f8a5bce1341a94e27a4e98678b0 Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Thu, 14 Jun 2018 15:23:38 +0500 Subject: [PATCH 13/14] address PR review changes --- .../tasks/insights/location_per_course.py | 11 ++++++----- edx/analytics/tasks/insights/user_activity.py | 15 +++++++++++---- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/edx/analytics/tasks/insights/location_per_course.py b/edx/analytics/tasks/insights/location_per_course.py index 98cd76c563..21ef480958 100644 --- a/edx/analytics/tasks/insights/location_per_course.py +++ b/edx/analytics/tasks/insights/location_per_course.py @@ -167,7 +167,7 @@ def run(self): class LastDailyIpAddressOfUserTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask): """Spark alternate of LastDailyIpAddressOfUserTask""" - output_parent_dir = 'last_ip_of_user_id' + output_parent_dirname = 'last_ip_of_user_id' marker = luigi.Parameter( config_path={'section': 'map-reduce', 'name': 'marker'}, significant=False, @@ -181,7 +181,7 @@ def output_dir(self): return get_target_from_url( url_path_join( self.warehouse_path, - self.output_parent_dir + self.output_parent_dirname ) ) @@ -199,7 +199,7 @@ def output_paths(self): return map( lambda date: get_target_from_url( url_path_join( - self.hive_partition_path(self.output_parent_dir, date.isoformat()) + self.hive_partition_path(self.output_parent_dirname, date.isoformat()) ) ), self.interval @@ -230,14 +230,15 @@ def spark_job(self, *args): timestamp, ip, user_id, course_id, dt FROM ( SELECT - event_date as dt, user_id, course_id, timestamp, ip, - ROW_NUMBER() over ( PARTITION BY event_date, user_id, course_id ORDER BY timestamp desc) as rank + event_date AS dt, user_id, course_id, timestamp, ip, + ROW_NUMBER() OVER ( PARTITION BY event_date, user_id, course_id ORDER BY timestamp DESC ) AS rank FROM location WHERE ip <> '' ) user_location WHERE rank = 1 """ result = self._spark.sql(query) + # write 4 tsv files in each partitioned directory result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') diff --git a/edx/analytics/tasks/insights/user_activity.py b/edx/analytics/tasks/insights/user_activity.py index 49ab94c534..da642dc968 100644 --- a/edx/analytics/tasks/insights/user_activity.py +++ b/edx/analytics/tasks/insights/user_activity.py @@ -191,13 +191,18 @@ def output_paths(self): ) def on_success(self): # pragma: no cover - """Overload the success method to touch the _SUCCESS file. Any class that uses a separate Marker file from the - data file will need to override the base on_success() call to create this marker.""" + """ + Overload the success method to touch the _SUCCESS file. Any class that uses a separate Marker file from + the data file will need to override the base on_success() call to create this marker. + """ self.output().touch_marker() def run(self): self.remove_output_on_overwrite() - removed_partitions = [target.remove() for target in self.output_paths() if target.exists()] + if self.output_dir().exists(): # only check partitions if parent dir exists + for target in self.output_paths(): + if target.exists(): + target.remove() super(UserActivityTaskSpark, self).run() def spark_job(self, *args): @@ -221,6 +226,9 @@ def spark_job(self, *args): result = df.select('context.user_id', 'course_id', 'event_date', 'label') \ .groupBy('user_id', 'course_id', 'event_date', 'label').count() result = result.withColumn('dt', lit(result['event_date'])) # generate extra column for partitioning + # Using overwrite mode deletes all the partitions from the output_dir() so we use append mode but + # we explicitly delete those partitions in the run() method ( to avoid duplicates ) + # before they are written by job. result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') @@ -413,7 +421,6 @@ def spark_job(self, *args): class CourseActivityTableTask(BareHiveTableTask): - @property def table(self): return 'course_activity' From e5d3e87a24c57149a9384611230c347a62e99755 Mon Sep 17 00:00:00 2001 From: Abdul Mannan Date: Wed, 20 Jun 2018 23:00:28 +0500 Subject: [PATCH 14/14] Parallelize file loading to reduce in-activity delays --- edx/analytics/tasks/common/spark.py | 42 +++++++++++++++++++++++++- edx/analytics/tasks/util/spark_util.py | 15 +++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index 23f1c0d1f4..425c0c0fae 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -14,7 +14,8 @@ ManifestInputTargetMixin, convert_to_manifest_input_if_necessary, remove_manifest_target_if_exists ) from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin -from edx.analytics.tasks.util.spark_util import load_and_filter +from edx.analytics.tasks.util.spark_util import load_and_filter, load_and_filter_rdd, parse_json_event, \ + filter_event_logs from edx.analytics.tasks.util.url import UncheckedExternalURL, get_target_from_url, url_path_join _file_path_to_package_meta_path = {} @@ -292,6 +293,29 @@ def get_user_location_schema(self): .add("timestamp", StringType(), True) \ .add("event_date", StringType(), True) + def build_dataframe_from_rdds(self, spark, *args, **kwargs): + from pyspark.sql.functions import to_date, udf, struct, date_format + input_source = self.get_input_source(*args) + user_location_schema = self.get_user_location_schema() + master_rdd = spark.sparkContext.union( + # filter out unwanted data as much as possible within each rdd before union + map( + lambda target: load_and_filter(spark, target.path, self.lower_bound_date_string, + self.upper_bound_date_string), + input_source + ) + ) + if self.rdd_checkpoint_directory: + # set checkpoint location before checkpointing + spark.sparkContext.setCheckpointDir(self.rdd_checkpoint_directory) + master_rdd.localCheckpoint() + if self.cache_rdd: + master_rdd.cache() + dataframe = spark.createDataFrame(master_rdd, schema=user_location_schema) + if 'user_id' not in dataframe.columns: # rename columns if they weren't named properly by createDataFrame + dataframe = dataframe.toDF('user_id', 'course_id', 'ip', 'timestamp', 'event_date') + return dataframe + def get_dataframe(self, spark, *args, **kwargs): from pyspark.sql.functions import to_date, udf, struct, date_format input_source = self.get_input_source(*args) @@ -315,6 +339,22 @@ def get_dataframe(self, spark, *args, **kwargs): dataframe = dataframe.toDF('user_id', 'course_id', 'ip', 'timestamp', 'event_date') return dataframe + def get_dataframe_from_distributed_loaded_files(self, spark, *args, **kwargs): + """ + For parallelizing s3 files and loading them on each executore + """ + input_source = self.get_input_source(*args) + user_location_schema = self.get_user_location_schema() + files_rdd = spark.sparkContext.parallelize([target.path for target in input_source]) + master_rdd = files_rdd.flatMap(load_and_filter_rdd) \ + .map(parse_json_event) \ + .map(lambda row: filter_event_logs(row, self.lower_bound_date_string, self.upper_bound_date_string)) \ + .filter(bool) + dataframe = spark.createDataFrame(master_rdd, schema=user_location_schema) + if 'user_id' not in dataframe.columns: # rename columns if they weren't named properly by createDataFrame + dataframe = dataframe.toDF('user_id', 'course_id', 'ip', 'timestamp', 'event_date') + return dataframe + class SparkJobTask(SparkMixin, OverwriteOutputMixin, EventLogSelectionDownstreamMixin, PySparkTask): """ diff --git a/edx/analytics/tasks/util/spark_util.py b/edx/analytics/tasks/util/spark_util.py index 74fbeb9337..1b91bfba30 100644 --- a/edx/analytics/tasks/util/spark_util.py +++ b/edx/analytics/tasks/util/spark_util.py @@ -92,6 +92,21 @@ def parse_json_event(line, nested=False): return parsed +def load_and_filter_rdd(path): + from edx.analytics.tasks.util.s3_util import ScalableS3Client + from pickle import Pickler + import gzip + try: + from cStringIO import StringIO + except ImportError: + from StringIO import StringIO + s3_conn = ScalableS3Client() + raw_data = StringIO(s3_conn.get_as_string(path)) + gzipfile = gzip.GzipFile(fileobj=raw_data) # this can be improved + content_string = gzipfile.read().encode('utf-8') + return [line for line in content_string.split("\n") if line] + + def load_and_filter(spark_session, file, lower_bound_date_string, upper_bound_date_string): return spark_session.sparkContext.textFile(file) \ .map(parse_json_event) \