forked from spark-examples/pyspark-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyspark-rdd-actions.py
91 lines (72 loc) · 2.22 KB
/
pyspark-rdd-actions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# -*- coding: utf-8 -*-
"""
Created on Sun Jun 14 10:20:19 2020
"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = spark.sparkContext.parallelize(data)
listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])
#aggregate
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
agg=listRdd.aggregate(0, seqOp, combOp)
print(agg) # output 20
#aggregate 2
seqOp2 = (lambda x, y: (x[0] + y, x[1] + 1))
combOp2 = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
agg2=listRdd.aggregate((0, 0), seqOp2, combOp2)
print(agg2) # output (20,7)
agg2=listRdd.treeAggregate(0,seqOp, combOp)
print(agg2) # output 20
#fold
from operator import add
foldRes=listRdd.fold(0, add)
print(foldRes) # output 20
#reduce
redRes=listRdd.reduce(add)
print(redRes) # output 20
#treeReduce. This is similar to reduce
add = lambda x, y: x + y
redRes=listRdd.treeReduce(add)
print(redRes) # output 20
#Collect
data = listRdd.collect()
print(data)
#count, countApprox, countApproxDistinct
print("Count : "+str(listRdd.count()))
#Output: Count : 20
print("countApprox : "+str(listRdd.countApprox(1200)))
#Output: countApprox : (final: [7.000, 7.000])
print("countApproxDistinct : "+str(listRdd.countApproxDistinct()))
#Output: countApproxDistinct : 5
print("countApproxDistinct : "+str(inputRDD.countApproxDistinct()))
#Output: countApproxDistinct : 5
#countByValue, countByValueApprox
print("countByValue : "+str(listRdd.countByValue()))
#first
print("first : "+str(listRdd.first()))
#Output: first : 1
print("first : "+str(inputRDD.first()))
#Output: first : (Z,1)
#top
print("top : "+str(listRdd.top(2)))
#Output: take : 5,4
print("top : "+str(inputRDD.top(2)))
#Output: take : (Z,1),(C,40)
#min
print("min : "+str(listRdd.min()))
#Output: min : 1
print("min : "+str(inputRDD.min()))
#Output: min : (A,20)
#max
print("max : "+str(listRdd.max()))
#Output: max : 5
print("max : "+str(inputRDD.max()))
#Output: max : (Z,1)
#take, takeOrdered, takeSample
print("take : "+str(listRdd.take(2)))
#Output: take : 1,2
print("takeOrdered : "+ str(listRdd.takeOrdered(2)))
#Output: takeOrdered : 1,2
print("take : "+str(listRdd.takeSample()))