Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
spark sql performance is better than broadcast join
Browse files Browse the repository at this point in the history
  • Loading branch information
rao-abdul-mannan committed Feb 28, 2018
1 parent 1fbaca1 commit 7d6b0c0
Showing 1 changed file with 28 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,33 +124,34 @@ def spark_job(self, *args):
sep='\t',
schema=self._get_user_activity_table_schema()
)
# self._sql_context.registerDataFrameAsTable(auth_user_df, 'auth_user')
# self._sql_context.registerDataFrameAsTable(user_activity_df, 'user_activity')
# query = """
# SELECT
# au.id,
# ua.course_id,
# ua.date,
# ua.category,
# ua.count
# FROM auth_user au
# JOIN user_activity ua
# ON au.username = ua.username
# """
# result = self._sql_context.sql(query)
# result.coalesce(2).write.csv(self.output().path, mode='overwrite', sep='\t')
from pyspark.sql.functions import broadcast
auth_df = broadcast(auth_user_df)
user_activity_df.join(
auth_df,
auth_df.username == user_activity_df.username
).select(
auth_df['id'],
user_activity_df['course_id'],
user_activity_df['date'],
user_activity_df['category'],
user_activity_df['count'],
).coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t')
self._sql_context.registerDataFrameAsTable(auth_user_df, 'auth_user')
self._sql_context.registerDataFrameAsTable(user_activity_df, 'user_activity')
query = """
SELECT
au.id,
ua.course_id,
ua.date,
ua.category,
ua.count
FROM auth_user au
JOIN user_activity ua
ON au.username = ua.username
"""
result = self._sql_context.sql(query)
result.coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t')
# using broadcast join
# from pyspark.sql.functions import broadcast
# auth_df = broadcast(auth_user_df)
# user_activity_df.join(
# auth_df,
# auth_df.username == user_activity_df.username
# ).select(
# auth_df['id'],
# user_activity_df['course_id'],
# user_activity_df['date'],
# user_activity_df['category'],
# user_activity_df['count'],
# ).coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t')

def output(self):
return get_target_from_url(
Expand Down

0 comments on commit 7d6b0c0

Please sign in to comment.