forked from AI4Finance-Foundation/FinRL_Crypto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
function_CPCV.py
402 lines (356 loc) · 17.8 KB
/
function_CPCV.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
import itertools as itt
import numbers
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from abc import abstractmethod
from typing import Iterable, Tuple, List
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm
class BaseTimeSeriesCrossValidator:
"""
Abstract class for time series cross-validation.
Time series cross-validation requires each sample has a prediction time pred_time, at which the features are used to
predict the response, and an evaluation time eval_time, at which the response is known and the error can be
computed. Importantly, it means that unlike in standard sklearn cross-validation, the samples X, response y,
pred_times and eval_times must all be pandas dataframe/series having the same index. It is also assumed that the
samples are time-ordered with respect to the prediction time (i.e. pred_times is non-decreasing).
Parameters
----------
n_splits : int, default=10
Number of folds. Must be at least 2.
"""
def __init__(self, n_splits=10):
if not isinstance(n_splits, numbers.Integral):
raise ValueError(f"The number of folds must be of Integral type. {n_splits} of type {type(n_splits)}"
f" was passed.")
n_splits = int(n_splits)
if n_splits <= 1:
raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting n_splits = 2 "
f"or more, got n_splits = {n_splits}.")
self.n_splits = n_splits
self.pred_times = None
self.eval_times = None
self.indices = None
@abstractmethod
def split(self, X: pd.DataFrame, y: pd.Series = None,
pred_times: pd.Series = None, eval_times: pd.Series = None):
if not isinstance(X, pd.DataFrame) and not isinstance(X, pd.Series):
raise ValueError('X should be a pandas DataFrame/Series.')
if not isinstance(y, pd.Series) and y is not None:
raise ValueError('y should be a pandas Series.')
if not isinstance(pred_times, pd.Series):
raise ValueError('pred_times should be a pandas Series.')
if not isinstance(eval_times, pd.Series):
raise ValueError('eval_times should be a pandas Series.')
if y is not None and (X.index == y.index).sum() != len(y):
raise ValueError('X and y must have the same index')
if (X.index == pred_times.index).sum() != len(pred_times):
raise ValueError('X and pred_times must have the same index')
if (X.index == eval_times.index).sum() != len(eval_times):
raise ValueError('X and eval_times must have the same index')
# if not pred_times.equals(pred_times.sort_values()):
# raise ValueError('pred_times should be sorted')
# if not eval_times.equals(eval_times.sort_values()):
# raise ValueError('eval_times should be sorted')
self.pred_times = pred_times
self.eval_times = eval_times
self.indices = np.arange(X.shape[0])
class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator):
"""
Purged and embargoed combinatorial cross-validation
As described in Advances in financial machine learning, Marcos Lopez de Prado, 2018.
The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. In each cross
validation round, n_test_splits folds are used as the test set, while the other folds are used as the train set.
There are as many rounds as n_test_splits folds among the n_splits folds.
Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. The split is such
that the intervals [pred_times, eval_times] associated to samples in the train and test set do not overlap. (The
overlapping samples are dropped.) In addition, an "embargo" period is defined, giving the minimal time between an
evaluation time in the test set and a prediction time in the training set. This is to avoid, in the presence of
temporal correlation, a contamination of the test set by the train set.
Parameters
----------
n_splits : int, default=10
Number of folds. Must be at least 2.
n_test_splits : int, default=2
Number of folds used in the test set. Must be at least 1.
embargo_td : pd.Timedelta, default=0
Embargo period (see explanations above).
"""
def __init__(self, n_splits=10, n_test_splits=2, embargo_td=pd.Timedelta(minutes=0)):
super().__init__(n_splits)
if not isinstance(n_test_splits, numbers.Integral):
raise ValueError(f"The number of test folds must be of Integral type. {n_test_splits} of type "
f"{type(n_test_splits)} was passed.")
n_test_splits = int(n_test_splits)
if n_test_splits <= 0 or n_test_splits > self.n_splits - 1:
raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting "
f"n_test_splits between 1 and n_splits - 1, got n_test_splits = {n_test_splits}.")
self.n_test_splits = n_test_splits
if not isinstance(embargo_td, pd.Timedelta):
raise ValueError(f"The embargo time should be of type Pandas Timedelta. {embargo_td} of type "
f"{type(embargo_td)} was passed.")
if embargo_td < pd.Timedelta(minutes=0):
raise ValueError(f"The embargo time should be positive, got embargo = {embargo_td}.")
self.embargo_td = embargo_td
def split(self, X: pd.DataFrame, y: pd.Series = None,
pred_times: pd.Series = None, eval_times: pd.Series = None) -> Iterable[Tuple[np.ndarray, np.ndarray]]:
"""
Yield the indices of the train and test sets.
Although the samples are passed in the form of a pandas dataframe, the indices returned are position indices,
not labels.
Parameters
----------
X : pd.DataFrame, shape (n_samples, n_features), required
Samples. Only used to extract n_samples.
y : pd.Series, not used, inherited from _BaseKFold
pred_times : pd.Series, shape (n_samples,), required
Times at which predictions are made. pred_times.index has to coincide with X.index.
eval_times : pd.Series, shape (n_samples,), required
Times at which the response becomes available and the error can be computed. eval_times.index has to
coincide with X.index.
Returnst
-------
train_indices: np.ndarray
A numpy array containing all the indices in the train set.
test_indices : np.ndarray
A numpy array containing all the indices in the test set.
"""
super().split(X, y, pred_times, eval_times)
# Fold boundariesindices
fold_bounds = [(fold[0], fold[-1] + 1) for fold in np.array_split(self.indices, self.n_splits)]
# List of all combinations of n_test_splits folds selected to become test sets
selected_fold_bounds = list(itt.combinations(fold_bounds, self.n_test_splits))
# In order for the first round to have its whole test set at the end of the dataset
selected_fold_bounds.reverse()
for fold_bound_list in selected_fold_bounds:
# Computes the bounds of the test set, and the corresponding indices
test_fold_bounds, test_indices = self.compute_test_set(fold_bound_list)
# Computes the train set indices
train_indices = self.compute_train_set(test_fold_bounds, test_indices)
yield train_indices, test_indices
def compute_train_set(self, test_fold_bounds: List[Tuple[int, int]], test_indices: np.ndarray) -> np.ndarray:
"""
Compute the position indices of samples in the train set.
Parameters
----------
test_fold_bounds : List of tuples of position indices
Each tuple records the bounds of a block of indices in the test set.
test_indices : np.ndarray
A numpy array containing all the indices in the test set.
Returns
-------
train_indices: np.ndarray
A numpy array containing all the indices in the train set.
"""
# As a first approximation, the train set is the complement of the test set
train_indices = np.setdiff1d(self.indices, test_indices)
# But we now have to purge and embargo
for test_fold_start, test_fold_end in test_fold_bounds:
# Purge
train_indices = purge(self, train_indices, test_fold_start, test_fold_end)
# Embargo
train_indices = embargo(self, train_indices, test_indices, test_fold_end)
return train_indices
def compute_test_set(self, fold_bound_list: List[Tuple[int, int]]) -> Tuple[List[Tuple[int, int]], np.ndarray]:
"""
Compute the indices of the samples in the test set.
Parameterst
----------
fold_bound_list: List of tuples of position indices
Each tuple records the bounds of the folds belonging to the test set.
Returns
-------
test_fold_bounds: List of tuples of position indices
Like fold_bound_list, but witest_fold_boundsth the neighboring folds in the test set merged.
test_indices: np.ndarray
A numpy array containing the test indices.
"""
test_indices = np.empty(0)
test_fold_bounds = []
for fold_start, fold_end in fold_bound_list:
# Records the boundaries of the current test split
if not test_fold_bounds or fold_start != test_fold_bounds[-1][-1]:
test_fold_bounds.append((fold_start, fold_end))
# If the current test split is contiguous to the previous one, simply updates the endpoint
elif fold_start == test_fold_bounds[-1][-1]:
test_fold_bounds[-1] = (test_fold_bounds[-1][0], fold_end)
test_indices = np.union1d(test_indices, self.indices[fold_start:fold_end]).astype(int)
return test_fold_bounds, test_indices
def compute_fold_bounds(cv: BaseTimeSeriesCrossValidator, split_by_time: bool) -> List[int]:
"""
Compute a list containing the fold (left) boundaries.
Parameters
----------
cv: BaseTimeSeriesCrossValidator
Cross-validation object for which the bounds need to be computed.
split_by_time: bool
If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical
time intervals.
"""
if split_by_time:
full_time_span = cv.pred_times.max() - cv.pred_times.min()
fold_time_span = full_time_span / cv.n_splits
fold_bounds_times = [cv.pred_times.iloc[0] + fold_time_span * n for n in range(cv.n_splits)]
return cv.pred_times.searchsorted(fold_bounds_times)
else:
return [fold[0] for fold in np.array_split(cv.indices, cv.n_splits)]
def embargo(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray,
test_indices: np.ndarray, test_fold_end: int) -> np.ndarray:
"""
Apply the embargo procedure to part of the train set.
This amounts to dropping the train set samples whose prediction time occurs within self.embargo_dt of the test
set sample evaluation times. This method applies the embargo only to the part of the training set immediately
following the end of the test set determined by test_fold_end.
Parameters
-------mestamps of p[t-1] values
df0 = prices.inde---
cv: Cross-validation class
Needs to have the attributes cv.pred_times, cv.eval_times, cv.embargo_dt and cv.indices.
train_indices: np.ndarray
A numpy array containing all the indices of the samples currently included in the train set.
test_indices : np.ndarray
A numpy array containing all the indices of the samples in the test set.
test_fold_end : int
Index corresponding to the end of a test set block.
Returns
-------
train_indices: np.ndarray
The same array, with the indices subject to embargo removed.
"""
if not hasattr(cv, 'embargo_td'):
raise ValueError("The passed cross-validation object should have a member cv.embargo_td defining the embargo"
"time.")
last_test_eval_time = cv.eval_times.iloc[test_indices[test_indices <= test_fold_end]].max()
min_train_index = len(cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td])
if min_train_index < cv.indices.shape[0]:
allowed_indices = np.concatenate((cv.indices[:test_fold_end], cv.indices[min_train_index:]))
train_indices = np.intersect1d(train_indices, allowed_indices)
return train_indices
def purge(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray,
test_fold_start: int, test_fold_end: int) -> np.ndarray:
"""data_ohlcv
Purge part of the train set.
Given a left boundary index test_fold_start of the test set, this method removes from the train set all the
samples whose evaluation time is posterior to the prediction time of the first test sample after the boundary.
Parameters
----------combinatorial purged k fold
cv: Cross-validation class
Needs to have the attributes cv.pred_times, cv.eval_times and cv.indices.
train_indices: np.ndarray
A numpy array containing all the indices of the samples currently included in the train set.
test_fold_start : int
Index corresponding to the start of a test set block.
test_fold_end : int
Index corresponding to the end of the same test set block.
Returns
-------
train_indices: np.ndarray
A numpy array containing the train indices purged at test_fold_start.
"""
time_test_fold_start = cv.pred_times.iloc[test_fold_start]
# The train indices before the start of the test fold, purged.
train_indices_1 = np.intersect1d(train_indices, cv.indices[cv.eval_times < time_test_fold_start])
# The train indices after the end of the test fold.
train_indices_2 = np.intersect1d(train_indices, cv.indices[test_fold_end:])
return np.concatenate((train_indices_1, train_indices_2))
def back_test_paths_generator(X, y, cv, t_span, n, k, prediction_times, evaluation_times, verbose=True):
# split data into N groups, with N << T
# this will assign each index position to a group position
group_num = np.arange(t_span) // (t_span // n)
group_num[group_num == n] = n - 1
# generate the combinations
test_groups = np.array(list(itt.combinations(np.arange(n), k))).reshape(-1, k)
C_nk = len(test_groups)
n_paths = C_nk * k // n
if verbose:
print('n_sim:', C_nk)
print('n_paths:', n_paths)
# is_test is a T x C(n, k) array where each column is a logical array
# indicating which observation in in the test set
is_test_group = np.full((n, C_nk), fill_value=False)
is_test = np.full((t_span, C_nk), fill_value=False)
# assign test folds for each of the C(n, k) simulations
for k, pair in enumerate(test_groups):
i, j = pair
is_test_group[[i, j], k] = True
# assigning the test folds
mask = (group_num == i) | (group_num == j)
is_test[mask, k] = True
# for each path, connect the folds from different simulations to form a backtest path
# the fold coordinates are: the fold number, and the simulation index e.g. simulation 0, fold 0 etc
path_folds = np.full((n, n_paths), fill_value=np.nan)
for i in range(n_paths):
for j in range(n):
s_idx = is_test_group[j, :].argmax().astype(int)
path_folds[j, i] = s_idx
is_test_group[j, s_idx] = False
cv.split(X, y, pred_times=prediction_times, eval_times=evaluation_times)
# finally, for each path we indicate which simulation we're building the path from and the time indices
paths = np.full((t_span, n_paths), fill_value=np.nan)
for p in range(n_paths):
for i in range(n):
mask = (group_num == i)
paths[mask, p] = int(path_folds[i, p])
# paths = paths_# .astype(int)
return is_test, paths, path_folds
def plot_cv_indices(cv, X, y, group, ax, n_paths, k, paths, prediction_times, evaluation_times, lw=5):
"""Create a sample plot for indices of a cross-validation object."""
# generate the combinations
N = n_paths + 1
test_groups = np.array(list(itt.combinations(np.arange(N), k))).reshape(-1, k)
n_splits = len(test_groups)
# Generate the training/testing visualizations for each CV split
for ii, (tr, tt) in enumerate(cv.split(X, y, pred_times=prediction_times, eval_times=evaluation_times)):
# print('fold', ii, '\n')
# print(tr, '\n')
# print(tt, '\n')
# Fill in indices with the training/test groups
indices = np.array([np.nan] * len(X))
indices[tt] = 1
indices[tr] = 0
indices[np.isnan(indices)] = 2
# Visualize the train_results
ax.scatter(
[ii + 0.5] * len(indices),
range(len(indices)),
c=[indices],
marker="_",
lw=lw,
cmap=cmap_cv,
vmin=-0.2,
vmax=1.2
)
# Plot the data classes and groups at the end
# ax.scatter(
# [ii + 1.5] * len(X),
# range(len(X)),
# c=y,
# marker="_",
# lw=lw,
# cmap=cmap_data
# )
# ax.scatter(
# [ii + 2.5] * len(X),
# range(len(X)),
# c=group,
# marker="_",
# lw=lw,
# cmap=cmap_data
# )
# Formatting
xlabelz = list(range(n_splits, 0, -1))
xlabelz = [str(x) for x in xlabelz]
xticklabels = xlabelz
ax.yaxis.grid(True)
ax.set(
xticks=np.arange(n_splits) + 0.45,
xticklabels=xticklabels,
#ylabel="Time",
#xlabel="CV Iteration",
xlim=[n_splits, -0.2],
#ylim=[0, X.shape[0]],
)
#ax.set_title("{}".format(type(cv).__name__), fontsize=5)
#ax.xaxis.tick_top()
return ax