-
Notifications
You must be signed in to change notification settings - Fork 116
WIP-Spark Tasks #476
base: master
Are you sure you want to change the base?
WIP-Spark Tasks #476
Conversation
Codecov Report
@@ Coverage Diff @@
## master #476 +/- ##
=========================================
- Coverage 77.68% 76.79% -0.9%
=========================================
Files 193 196 +3
Lines 21344 22004 +660
=========================================
+ Hits 16582 16898 +316
- Misses 4762 5106 +344
Continue to review full report at Codecov.
|
4ce2ed2
to
b89c71d
Compare
9dba8a9
to
f65be75
Compare
cc759f3
to
7f1b57d
Compare
dfab4d1
to
7d6b0c0
Compare
9def6c5
to
f12ec82
Compare
894f1b5
to
fb527a4
Compare
07343a0
to
ae0277f
Compare
2ab865d
to
7c1d21f
Compare
fe100b0
to
5fff726
Compare
6309079
to
2f1ec9c
Compare
e75c441
to
744684e
Compare
c40aded
to
e204e49
Compare
50be7dc
to
86fd011
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good progress! Sorry to raise questions about marker files, but it's a challenge in general in any schema to figure out what has been done and what needs to be done.
FROM ( | ||
SELECT | ||
event_date as dt, user_id, course_id, timestamp, ip, | ||
ROW_NUMBER() over ( PARTITION BY event_date, user_id, course_id ORDER BY timestamp desc) as rank |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for consistency, I would uppercase all keywords, including OVER and AS (here and the line above).
WHERE rank = 1 | ||
""" | ||
result = self._spark.sql(query) | ||
result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to add a comment here, that this will produce four TSV files in the dt= subdirectory under the output_dir().
But a question: is there a reason why the mode is 'append'?
And a second question: I've noticed that write() generally creates a _SUCCESS file separate from the actual output files but in the same directory. Will this use of partitionBy() do the same, and if so, where would it write this file? In each partition, or in the output_dir()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will write to the output_dir() ( not the individual partition )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With overwrite mode, it won't keep other partitions i.e. those partitions which aren't in this interval, will be deleted from the output_dir() path. So I'm using append mode and explicitly remove those partitions which will be written by the job to avoid duplicates.
class LastDailyIpAddressOfUserTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask): | ||
"""Spark alternate of LastDailyIpAddressOfUserTask""" | ||
|
||
output_parent_dir = 'last_ip_of_user_id' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about "output_parent_dirname"? When I first saw this below, I was unclear whether it was a name or a path.
""" | ||
Output directory for spark task | ||
""" | ||
return get_target_from_url( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this is a target instead of just a URL? Is that the easiest way to get .exists() functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is for .exists() functionality.
for target in self.output_paths(): | ||
if target.exists(): | ||
target.remove() | ||
super(LastDailyIpAddressOfUserTaskSpark, self).run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original, each date in the interval is checked to see if it produces actual data. If the data is sparse enough, then not all date partitions will be created. This was done because the downstream class needed to have all dates exist, even if empty, so as to know what dates had been processed. This was also dealt with using the downstream_input_tasks() to do this check in the LastCountryOfUser task.
But maybe this can be addressed when the follow-on (i.e. non-historic) workflow is written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My approach is to handle it with non-historic task and if it isn't possible then I'll make adjustments here.
2) create marker in separate dir just like hadoop multi-mapreduce task | ||
Former approach can fail in cases, when there is no data for some dates as spark will not generate empty | ||
partitions. | ||
Later approach is more consistent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So this is how you're doing user location. You chose approach 2 over approach 1. That may be fine, though I'm wary about using marker files where we can avoid it, because they are magical and hard to track down. It is really hard to know why a job is not producing output when a marker file exists somewhere with an opaque hash for a name. Also, because the hash comes from hashing the task, which effectively hashes the (significant) parameters, one will get a different hash for each interval. So if I run different but overlapping intervals, it will do an overwrite, but it doesn't actually check that the intervals that have been run are contiguous, or overlapping, or whatever. They only check that the same exact interval has or hasn't been run before.
That's why the location workflow verified partitions for all dates in the interval, and that the run() method created empty partitions for dates that had no data after the multi-mapreduce was done. (That's not just the issue of spark but an issue of multi-mapreduce in general, whether spark or map-reduce.)
But perhaps there are other reasons to go with the marker over the partition, like performance. But I'm wary that they're worth the complexity for the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark doesn't create empty partitions if there is no data when writing output using partitionBy(). In such cases, checking for partitions existence will fail so I opted for the marker approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll echo Brian's thoughts on using an obscure named marker file to indicate if a job has succeeded in the past. We've obviously done it in the past, and in this specific example we've done it. I'd like to challenge engineers to see if we can come up with a better method even if all we find out is that our current implementation is the cleanest option. In this case I'm fine with continuing our past behaviors.
|
||
def on_success(self): # pragma: no cover | ||
"""Overload the success method to touch the _SUCCESS file. Any class that uses a separate Marker file from the | ||
data file will need to override the base on_success() call to create this marker.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make docstring multiline....
if self.output_dir().exists(): # only check partitions if parent dir exists | ||
for target in self.output_paths(): | ||
if target.exists(): | ||
target.remove() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to put in a comment here, because I'm not remembering if we did this because we were writing in append mode, or because of issues if we reduced the number of partitions we coalesced down to on output, or because Spark really doesn't want the files around when it's planning on writing them out. Do you remember?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done to avoid duplicates in case of append mode.
86fd011
to
ceda01d
Compare
ceda01d
to
64308be
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comment as I read the code. If I remember correctly the changes in the spark base PR was getting merged and these were being left unmerged. I could be wrong. The comments I wrote are all forward looking to future PRs. If we aren't merging this guy no additional work is necessary.
|
||
def user_activity_hive_table_path(self, *args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think as we convert jobs we'll not want to reference the word "hive". It definitely makes sense now when you are interleaving Spark components. But in the future we'll have no hive components and names like this will be confusing. Again though, no change in this PR, just for future reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, i'll keep it in mind.
2) create marker in separate dir just like hadoop multi-mapreduce task | ||
Former approach can fail in cases, when there is no data for some dates as spark will not generate empty | ||
partitions. | ||
Later approach is more consistent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll echo Brian's thoughts on using an obscure named marker file to indicate if a job has succeeded in the past. We've obviously done it in the past, and in this specific example we've done it. I'd like to challenge engineers to see if we can come up with a better method even if all we find out is that our current implementation is the cleanest option. In this case I'm fine with continuing our past behaviors.
def get_event_time_string(event_time): | ||
"""Returns the time of the event as an ISO8601 formatted string.""" | ||
try: | ||
# Get entry, and strip off time zone information. Keep microseconds, if any. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dah, time string processing is a hobgoblin. I suspect all of our event times are converted to UTC. We may want to confirm that and not drop the timezone field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is exactly the same as the one we use for hadoop.
This PR includes following tasks:
TotalEventsDailyTask
UserActivityTask
CourseActivityPartitionTask
InternalReportingUserActivityPartitionTask
Note: Not to be merged with master branch yet