forked from spark-examples/pyspark-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyspark-join.py
82 lines (62 loc) · 2.47 KB
/
pyspark-join.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# -*- coding: utf-8 -*-
"""
Created on Sun Jun 14 10:20:19 2020
@author: prabha
"""
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
emp = [(1,"Smith",-1,"2018","10","M",3000), \
(2,"Rose",1,"2010","20","M",4000), \
(3,"Williams",1,"2010","10","M",1000), \
(4,"Jones",2,"2005","10","F",2000), \
(5,"Brown",2,"2010","40","",-1), \
(6,"Brown",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)
dept = [("Finance",10), \
("Marketing",20), \
("Sales",30), \
("IT",40) \
]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"inner") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"outer") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"full") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"fullouter") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"left") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"leftouter") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"right") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"rightouter") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"leftsemi") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"leftanti") \
.show(truncate=False)
empDF.alias("emp1").join(empDF.alias("emp2"), \
col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
.select(col("emp1.emp_id"),col("emp1.name"), \
col("emp2.emp_id").alias("superior_emp_id"), \
col("emp2.name").alias("superior_emp_name")) \
.show(truncate=False)
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
.show(truncate=False)
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
.show(truncate=False)