Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
rao-abdul-mannan committed Jan 31, 2018
1 parent e99e83c commit 4ce2ed2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
44 changes: 27 additions & 17 deletions edx/analytics/tasks/common/spark.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
import json
from luigi.contrib.spark import PySparkTask
from luigi.configuration import get_config
import os
import tempfile
import zipfile

import luigi.configuration
from luigi.contrib.spark import PySparkTask

from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin

_file_path_to_package_meta_path = {}


Expand Down Expand Up @@ -119,7 +122,6 @@ def add_files_for_package(sub_package_path, root_package_path, root_package_name
if metadata_path is not None:
add_files_for_package(metadata_path, metadata_path, os.path.basename(metadata_path), package.__name__)


return archives_list


Expand Down Expand Up @@ -198,10 +200,11 @@ class SparkJobTask(OverwriteOutputMixin, PySparkTask):
_spark_context = None
_sql_context = None
_hive_context = None
_tmp_dir = None

driver_memory = '2g'
executor_memory = '3g'
always_log_stderr = True
always_log_stderr = False # log stderr if spark fails, True for verbose log

def init_spark(self, sc):
"""
Expand All @@ -221,9 +224,12 @@ def spark_job(self):
raise NotImplementedError

def _load_internal_dependency_on_cluster(self):
"""creates a zip of package and loads it on spark worker nodes"""
# TODO: delete zipfile after loading on cluster completes
import tempfile
"""
creates a zip of package and loads it on spark worker nodes
Loading via luigi configuration does not work as it creates a tar file whereas spark does not load tar files
"""

# import packages to be loaded on cluster
import edx
import luigi
Expand All @@ -242,16 +248,14 @@ def _load_internal_dependency_on_cluster(self):
import requests

dependencies_list = []
egg_files = get_config().get('spark', 'edx_egg_files', None)
print '\n egg_files {}'.format(egg_files)
egg_files = luigi.configuration.get_config().get('spark', 'edx_egg_files', None)
if isinstance(egg_files, basestring):
dependencies_list = json.loads(egg_files)
packages = [edx, luigi, opaque_keys, stevedore, bson, ccx_keys, cjson, boto, filechunkio, ciso8601, chardet,
urllib3, certifi, idna, requests]
tmp_dir = tempfile.mkdtemp()
dependencies_list += create_packages_archive(packages, tmp_dir)
dependencies_list.append('s3://edx-analytics-scratch/egg_files/edx_opaque_keys-0.4-py2.7.egg')
print '\n dependencies_list {}'.format(dependencies_list)
self._tmp_dir = tempfile.mkdtemp()
dependencies_list += create_packages_archive(packages, self._tmp_dir)
# dependencies_list.append('s3://edx-analytics-scratch/egg_files/edx_opaque_keys-0.4-py2.7.egg')
if len(dependencies_list) > 0:
for file in dependencies_list:
self._spark_context.addPyFile(file)
Expand All @@ -260,7 +264,13 @@ def run(self):
self.remove_output_on_overwrite()
super(SparkJobTask, self).run()

def _clean(self):
"""Do any cleanup after job here"""
import shutil
shutil.rmtree(self._tmp_dir)

def main(self, sc, *args):
self.init_spark(sc)
self._load_internal_dependency_on_cluster() # load internal dependency for spark worker nodes on cluster
self.spark_job()
self._load_internal_dependency_on_cluster() # load packages on EMR cluster for spark worker nodes
self.spark_job()
self._clean()
5 changes: 3 additions & 2 deletions edx/analytics/tasks/insights/user_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import luigi.date_interval

import edx.analytics.tasks.util.eventlog as eventlog
import edx.analytics.tasks.util.opaque_key_util as opaque_key_util
from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin, MultiOutputMapReduceJobTask
from edx.analytics.tasks.common.mysql_load import MysqlInsertTask
from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, EventLogSelectionMixin
Expand All @@ -17,9 +18,9 @@
from edx.analytics.tasks.util.decorators import workflow_entry_point
from edx.analytics.tasks.util.hive import BareHiveTableTask, HivePartitionTask, WarehouseMixin, hive_database_name
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
import edx.analytics.tasks.util.opaque_key_util as opaque_key_util
from edx.analytics.tasks.util.url import get_target_from_url, url_path_join
from edx.analytics.tasks.util.weekly_interval import WeeklyIntervalMixin

log = logging.getLogger(__name__)

logging.getLogger('boto').setLevel(logging.INFO)
Expand Down Expand Up @@ -385,4 +386,4 @@ def insert_source_task(self):
n_reduce_tasks=self.n_reduce_tasks,
overwrite=self.overwrite_hive,
overwrite_n_days=self.overwrite_n_days,
)
)
8 changes: 1 addition & 7 deletions edx/analytics/tasks/util/spark_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ def get_course_id(event_context, from_url=False):
return course_id

return ''
# spark worker nodes return unexpected results with opaque_key filteration
# if course_id:
# if opaque_key_util.is_valid_course_id(course_id):
# return course_id
# else:
# return '' # we'll filter out empty course since string is expected

# TODO : make it work with url as well
# Try to get the course_id from the URLs in `event_type` (for implicit
Expand All @@ -81,4 +75,4 @@ def get_course_id(event_context, from_url=False):
# if course_key:
# return unicode(course_key)
#
# return ''
# return ''

0 comments on commit 4ce2ed2

Please sign in to comment.