-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq3.py
110 lines (73 loc) · 5.71 KB
/
q3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
import sys
# https://vanishingcodes.wordpress.com/2016/06/09/pyspark-tutorial-building-a-random-forest-binary-classifier-on-unbalanced-dataset/
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
def q():
conf = (SparkConf().set("spark.driver.maxResultSize", "20g"))
conf.set("spark.sql.crossJoin.enabled","true")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
df = sqlCtx.read.format('com.databricks.spark.csv').option('header', 'true').option('inferSchema', 'true').load('yg2014.csv')
df = df.withColumn("day", floor(col("tf") / 24) % 7 )
df = df.withColumn("hour", col("tf") % 24 )
l = [1, 20, 48, 106, 131, 146, 167, 185, 244, 286, 315, 331, 332, 359, 360]
is_holiday = udf(lambda x: 1 if x in l else 0, IntegerType())
df = df.withColumn("is_holiday", is_holiday(col('tf') % 365))
df.show(30)
df2 = sqlCtx.read.format('com.databricks.spark.csv').option('header', 'true').option('inferSchema', 'true').load('yg2015.csv')
df2 = df2.withColumn("day", floor(col("tf") / 24) % 7 )
df2 = df2.withColumn("hour", col("tf") % 24 )
l = [1, 19, 47, 106, 130, 145, 172, 184, 250, 285, 315, 330, 331, 359]
is_holiday = udf(lambda x: 1 if x in l else 0, IntegerType())
df2 = df2.withColumn("is_holiday", is_holiday(col('tf') % 365))
# apply random forest
#
cols_now = ['day',
'hour',
'is_holiday',
'lat',
'lon',
'd_cnt']
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
labelIndexer = StringIndexer(inputCol='p_cnt', outputCol="label")
tmp = [assembler_features, labelIndexer]
pipeline = Pipeline(stages=tmp)
trainingData = pipeline.fit(df).transform(df)
testData = pipeline.fit(df2).transform(df2)
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainingData)
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures", numTrees=200)
# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on test data = %g" % r2)
rfModel = model.stages[1]
print(rfModel) # summary only
return
if __name__ == "__main__":
q()