diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py index bfa2dce584..e5a20d854b 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py @@ -124,33 +124,34 @@ def spark_job(self, *args): sep='\t', schema=self._get_user_activity_table_schema() ) - # self._sql_context.registerDataFrameAsTable(auth_user_df, 'auth_user') - # self._sql_context.registerDataFrameAsTable(user_activity_df, 'user_activity') - # query = """ - # SELECT - # au.id, - # ua.course_id, - # ua.date, - # ua.category, - # ua.count - # FROM auth_user au - # JOIN user_activity ua - # ON au.username = ua.username - # """ - # result = self._sql_context.sql(query) - # result.coalesce(2).write.csv(self.output().path, mode='overwrite', sep='\t') - from pyspark.sql.functions import broadcast - auth_df = broadcast(auth_user_df) - user_activity_df.join( - auth_df, - auth_df.username == user_activity_df.username - ).select( - auth_df['id'], - user_activity_df['course_id'], - user_activity_df['date'], - user_activity_df['category'], - user_activity_df['count'], - ).coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') + self._sql_context.registerDataFrameAsTable(auth_user_df, 'auth_user') + self._sql_context.registerDataFrameAsTable(user_activity_df, 'user_activity') + query = """ + SELECT + au.id, + ua.course_id, + ua.date, + ua.category, + ua.count + FROM auth_user au + JOIN user_activity ua + ON au.username = ua.username + """ + result = self._sql_context.sql(query) + result.coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') + # using broadcast join + # from pyspark.sql.functions import broadcast + # auth_df = broadcast(auth_user_df) + # user_activity_df.join( + # auth_df, + # auth_df.username == user_activity_df.username + # ).select( + # auth_df['id'], + # user_activity_df['course_id'], + # user_activity_df['date'], + # user_activity_df['category'], + # user_activity_df['count'], + # ).coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') def output(self): return get_target_from_url(