-
Notifications
You must be signed in to change notification settings - Fork 93
/
serial_prophet_forecast.py
159 lines (139 loc) · 6.23 KB
/
serial_prophet_forecast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""Transformer that uses FB Prophet for time series prediction.
Please see the parallel implementation for more information"""
import importlib
from h2oaicore.transformer_utils import CustomTimeSeriesTransformer
import datatable as dt
import numpy as np
import pandas as pd
import gc
import os
class suppress_stdout_stderr(object):
def __init__(self):
self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)]
self.save_fds = [os.dup(1), os.dup(2)]
def __enter__(self):
os.dup2(self.null_fds[0], 1)
os.dup2(self.null_fds[1], 2)
def __exit__(self, *_):
os.dup2(self.save_fds[0], 1)
os.dup2(self.save_fds[1], 2)
for fd in self.null_fds + self.save_fds:
os.close(fd)
class MySerialProphetTransformer(CustomTimeSeriesTransformer):
_binary = False
_multiclass = False
_unsupervised = False # uses target
_uses_target = True # uses target
# some package dependencies are best sequential to overcome known issues
froms3 = True
if froms3:
_root_path = "https://s3.amazonaws.com/artifacts.h2o.ai/deps/dai/recipes"
_suffix = "-cp311-cp311-linux_x86_64.whl"
_modules_needed_by_name = [
'%s/setuptools_git-1.2%s' % (_root_path, _suffix),
'%s/LunarCalendar-0.0.9%s' % (_root_path, _suffix),
'%s/ephem-4.1.5%s' % (_root_path, _suffix),
'%s/cmdstanpy-1.2.2%s' % (_root_path, _suffix),
'%s/pystan-3.9.1%s' % (_root_path, _suffix),
'%s/httpstan-4.12.0%s' % (_root_path, _suffix),
'%s/prophet-1.1.5%s' % (_root_path, _suffix),
]
else:
_modules_needed_by_name = ['holidays==0.47', 'convertdate', 'lunarcalendar', 'pystan==3.9.1',
'prophet==1.1.5']
# _modules_needed_by_name = ['fbprophet']
_included_model_classes = None # ["gblinear"] for strong trends - can extrapolate
_testing_can_skip_failure = False # ensure tested as if shouldn't fail
@staticmethod
def is_enabled():
return True
@staticmethod
def can_use(accuracy, interpretability, **kwargs):
return False
@staticmethod
def get_default_properties():
return dict(col_type="time_column", min_cols=1, max_cols=1, relative_importance=1)
@staticmethod
def acceptance_test_timeout():
return 20.0
def fit(self, X: dt.Frame, y: np.array = None):
mod = importlib.import_module('prophet')
Prophet = getattr(mod, "Prophet")
# from fbprophet import Prophet
self.models = {}
XX = X[:, self.tgc].to_pandas()
XX.rename(columns={self.time_column: "ds"}, inplace=True)
XX['y'] = np.array(y)
self.nan_value = np.mean(y)
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
if len(tgc_wo_time) > 0:
XX_grp = XX.groupby(tgc_wo_time)
else:
XX_grp = [([None], XX)]
nb_groups = len(XX_grp)
for _i_g, (key, X) in enumerate(XX_grp):
if (100 * (_i_g + 1) // nb_groups) % 5 == 0:
print("FB Prophet - ", 100 * (_i_g + 1) // nb_groups, "%% of Groups Fitted")
model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=True)
key = key if isinstance(key, list) else [key]
grp_hash = '_'.join(map(str, key))
# print("prophet - fitting on data of shape: %s for group: %s" % (str(X.shape), grp_hash))
if X.shape[0] < 20:
print("prophet - small data work-around for group: %s" % grp_hash)
model = None
else:
with suppress_stdout_stderr():
model.fit(X[['ds', 'y']])
gc.collect()
self.models[grp_hash] = model
return self
def transform(self, X: dt.Frame):
XX = X[:, self.tgc].to_pandas()
XX.rename(columns={self.time_column: "ds"}, inplace=True)
# print("Input Frame")
# print(XX.head(20), flush=True)
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
if len(tgc_wo_time) > 0:
XX_grp = XX.groupby(tgc_wo_time)
else:
XX_grp = [([None], XX)]
preds = []
nb_groups = len(XX_grp)
for _i_g, (key, X) in enumerate(XX_grp):
if (_i_g + 1) % max(1, nb_groups // 20) == 0:
print("FB Prophet - ", 100 * (_i_g + 1) // nb_groups, "%% of Groups Transformed")
key = key if isinstance(key, list) else [key]
grp_hash = '_'.join(map(str, key))
# print("prophet - transforming data of shape: %s for group: %s" % (str(X.shape), grp_hash))
# Facebook Prophet returns the predictions ordered by time
# So we should keep track of the time order for each group so that
# predictions are ordered the same as the imput frame
# Keep track of the order
order = np.argsort(pd.to_datetime(X["ds"]))
# print("DateTime To Be transformed")
# print(X['ds'].head(20), flush=True)
if grp_hash in self.models:
model = self.models[grp_hash]
if model is not None:
# Run prophet
yhat = model.predict(X)
# print("Model Output")
# print(yhat.head(20), flush=True)
XX = yhat # pd.DataFrame(yhat['yat'], columns=['yhat'])
else:
# print("No Model")
XX = pd.DataFrame(np.full((X.shape[0], 1), self.nan_value), columns=['yhat']) # invalid model
else:
# print("No Group")
XX = pd.DataFrame(np.full((X.shape[0], 1), self.nan_value), columns=['yhat']) # unseen groups
# Reorder the index like prophet re-ordered the predictions
XX.index = X.index[order]
# print("Transformed Output for Group")
# print(XX.sort_index().head(20), flush=True)
preds.append(XX[['yhat']])
XX = pd.concat(tuple(preds), axis=0).sort_index()
# print("Transformed results")
# print(XX)
return XX
def fit_transform(self, X: dt.Frame, y: np.array = None):
return self.fit(X, y).transform(X)