forked from andkret/Cookbook
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added the code from podcast andkret#102 And the Movies.txt file I got from : https://perso.telecom-paristech.fr/eagan/class/igr204/datasets
- Loading branch information
andkret
committed
Jul 17, 2019
1 parent
6d79824
commit 05da0a0
Showing
9 changed files
with
1,721 additions
and
316 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
//Read in the textfile | ||
val input = sc.textFile("/notebook/Movies.txt") | ||
|
||
case class MovieLine(Line: String) | ||
|
||
val movieline = input.map(line => MovieLine(line)) | ||
|
||
movieline.toDF().registerTempTable("MovieLine") | ||
|
||
|
||
// Lets map the date and the genre | ||
case class DateAndGenre(myDate: String, Genre: String) | ||
|
||
val dateandgenre = input.map(line => line.split(";")).map(s => DateAndGenre( s(0),s(3) )) | ||
|
||
dateandgenre.toDF().registerTempTable("DateAndGenre") | ||
|
||
// count how many movies per year | ||
case class MovieDate(Line: String, myCount: Int) | ||
|
||
val countdate = input.map(line => line.split(";")).map(s => (s(0),1)) | ||
countdate.toDF().registerTempTable("countdate") | ||
|
||
val reduceddate = countdate.reduceByKey((a,b) => a + b).map(s => MovieDate(s._1,s._2)) | ||
|
||
reduceddate.toDF().registerTempTable("MovieDate") | ||
|
||
//flatten every word into a new line in the RDD | ||
val flatmappedinput = input.flatMap(line => line.split(";") ) | ||
flatmappedinput.toDF().registerTempTable("flatinput") | ||
|
||
// read input directly to dataframe | ||
val inputasdf = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("/notebook/Movies.txt") | ||
inputasdf.registerTempTable("inputdf") | ||
|
||
/* //Use this to store the dataframe as parquet on the local drive | ||
val reduceddf = reduceddate.toDF() | ||
reduceddf.write.parquet("/notebook/movie.parquet") | ||
*/ | ||
|
||
//read the parquetfile | ||
val parquetFileDF = spark.read.parquet("/notebook/movie.parquet") | ||
parquetFileDF.registerTempTable("ParquetRead") | ||
|
||
|
||
//SparkSQL Queries: | ||
|
||
//Visualize the raw RDD | ||
%sql select * from MovieLine | ||
|
||
//Visualize the map reduced RDD with count of movies per year | ||
%sql select Line, myCount from MovieDate order by myCount desc | ||
|
||
//Visualize the maped RDD and count the nr. of movies per year in SparkSQL | ||
%sql select myDate, count(myDate) as counted from DateAndGenre group by myDate order by counted desc | ||
|
||
%sql select * from flatinput | ||
|
||
%sql select * from ParquetRead |
Large diffs are not rendered by default.
Oops, something went wrong.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
Binary file not shown.
Binary file not shown.
This file was deleted.
Oops, something went wrong.