-
Notifications
You must be signed in to change notification settings - Fork 116
WIP-Spark Tasks #476
base: master
Are you sure you want to change the base?
WIP-Spark Tasks #476
Changes from all commits
097b826
d615749
d06f87f
78a842f
0b8b937
01f6b2b
b863f58
744684e
e204e49
2b7b94b
64308be
1a42858
5f6726d
e5d3e87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
from edx.analytics.tasks.common.pathutil import ( | ||
EventLogSelectionDownstreamMixin, EventLogSelectionMixin, PathSelectionByDateIntervalTask | ||
) | ||
from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask | ||
from edx.analytics.tasks.insights.database_imports import ImportStudentCourseEnrollmentTask | ||
from edx.analytics.tasks.util import eventlog | ||
from edx.analytics.tasks.util.decorators import workflow_entry_point | ||
|
@@ -163,6 +164,84 @@ def run(self): | |
target.open("w").close() # touch the file | ||
|
||
|
||
class LastDailyIpAddressOfUserTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask): | ||
"""Spark alternate of LastDailyIpAddressOfUserTask""" | ||
|
||
output_parent_dirname = 'last_ip_of_user_id' | ||
marker = luigi.Parameter( | ||
config_path={'section': 'map-reduce', 'name': 'marker'}, | ||
significant=False, | ||
description='A URL location to a directory where a marker file will be written on task completion.', | ||
) | ||
|
||
def output_dir(self): | ||
""" | ||
Output directory for spark task | ||
""" | ||
return get_target_from_url( | ||
url_path_join( | ||
self.warehouse_path, | ||
self.output_parent_dirname | ||
) | ||
) | ||
|
||
def output(self): | ||
""" | ||
Marker output path | ||
""" | ||
marker_url = url_path_join(self.marker, str(hash(self))) | ||
return get_target_from_url(marker_url, marker=True) | ||
|
||
def output_paths(self): | ||
""" | ||
Output partition paths | ||
""" | ||
return map( | ||
lambda date: get_target_from_url( | ||
url_path_join( | ||
self.hive_partition_path(self.output_parent_dirname, date.isoformat()) | ||
) | ||
), | ||
self.interval | ||
) | ||
|
||
def on_success(self): # pragma: no cover | ||
self.output().touch_marker() | ||
|
||
def run(self): | ||
self.remove_output_on_overwrite() | ||
if self.output_dir().exists(): # only check partitions if parent dir exists | ||
for target in self.output_paths(): | ||
if target.exists(): | ||
target.remove() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be good to put in a comment here, because I'm not remembering if we did this because we were writing in append mode, or because of issues if we reduced the number of partitions we coalesced down to on output, or because Spark really doesn't want the files around when it's planning on writing them out. Do you remember? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is done to avoid duplicates in case of append mode. |
||
super(LastDailyIpAddressOfUserTaskSpark, self).run() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the original, each date in the interval is checked to see if it produces actual data. If the data is sparse enough, then not all date partitions will be created. This was done because the downstream class needed to have all dates exist, even if empty, so as to know what dates had been processed. This was also dealt with using the downstream_input_tasks() to do this check in the LastCountryOfUser task. But maybe this can be addressed when the follow-on (i.e. non-historic) workflow is written. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My approach is to handle it with non-historic task and if it isn't possible then I'll make adjustments here. |
||
|
||
def spark_job(self, *args): | ||
from edx.analytics.tasks.util.spark_util import validate_course_id | ||
from pyspark.sql.functions import udf | ||
from pyspark.sql.window import Window | ||
from pyspark.sql.types import StringType | ||
df = self.get_dataframe(self._spark, *args) | ||
validate_courseid = udf(validate_course_id, StringType()) | ||
df = df.withColumn('course_id', validate_courseid(df['course_id'])) | ||
df.createOrReplaceTempView('location') | ||
query = """ | ||
SELECT | ||
timestamp, ip, user_id, course_id, dt | ||
FROM ( | ||
SELECT | ||
event_date AS dt, user_id, course_id, timestamp, ip, | ||
ROW_NUMBER() OVER ( PARTITION BY event_date, user_id, course_id ORDER BY timestamp DESC ) AS rank | ||
FROM location | ||
WHERE ip <> '' | ||
) user_location | ||
WHERE rank = 1 | ||
""" | ||
result = self._spark.sql(query) | ||
# write 4 tsv files in each partitioned directory | ||
result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') | ||
|
||
|
||
class LastCountryOfUserDownstreamMixin( | ||
WarehouseMixin, | ||
OverwriteOutputMixin, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this is a target instead of just a URL? Is that the easiest way to get .exists() functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is for .exists() functionality.