forked from ConvergenceDA/visdom
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiterator.R
536 lines (507 loc) · 22.2 KB
/
iterator.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
# Copyright 2016 The Board of Trustees of the Leland Stanford Junior University.
# Direct inquiries to Sam Borgeson ([email protected])
# or professor Ram Rajagopal ([email protected])
#' @title
#' Run feature algorithms specified in the ctx environment
#'
#' @description
#' This function loads the meter data associated with the passed \code{meterId} and creates
#' a MeterDataClass instance using that data. It then passes that meter data object into
#' every feature function in the list found in the ctx environment under \code{fnVector},
#' concatenating all the results into a single list with named values.
#'
#' @param meterDataClass The \code{\link{MeterDataClass}} object that contains the data to be analyzed.
#' @param ctx The ctx environment that contains the list of functions to run under the name \code{fnVector} and
#' configures feature runs and provides a place to store and pass data across feature function calls.
#' @param ... Additional arguments that will be passed into the feature functions.
#'
#' @export
iterator.callAllFromCtx = function(meterDataClass,ctx,...) {
return( iterator.callAll(meterDataClass, ctx=ctx, fnList=ctx$fnVector, ... ) )
}
#' @title
#' Run multiple feature algorithms specified in a list of functions
#'
#' @description
#' This function passes the meterDataClass object into
#' every feature function in the list found in the parameter \code{fnList},
#' concatenating all the results into a single list with named values.
#'
#' @param meterDataClass The \code{\link{MeterDataClass}} object that contains the data to be analyzed.
#' @param ctx The ctx environment that configures feature runs and provides a place to store and pass data across feature function calls.
#' @param fnList The list of feature extraction functions to be run against the \code{\link{MeterDataClass}}.
#' @param ... Additional arguments that will be passed into the feature functions.
#'
#' @export
iterator.callAll = function(meterDataClass, ctx, fnList, ...) {
out = list()
for(i in 1:length(fnList)) {
fn = fnList[[i]]
out = c(out, fn(meterDataClass,ctx,...))
}
return(out)
}
# iterator.todf = function(lofl) {
# blanks = which( laply(lofl,length)<2)
# print(paste('Removing',length(blanks),'blank entries of',length(lofl)))
# bdf = as.data.frame(do.call(rbind,lofl[-blanks]))
# for (col in names(bdf)) {
# bdf[,name] = as.numeric(unlist(unlist(bdf[,name])))
# }
# return(bdf)
# }
#' @title
#' Flatten feature data returned by iterator functions into a \code{data.frame}.
#'
#' @description
#' Itertor functions like \code{iterator.iterateMeters} return lists of lists of derived features indexed by meterIds.
#' This function flattens the scalar values in these lists into data.frame, with one row per meterId and columns for every named feature found.
#'
#' @param lofl The list of lists for feature values that should be flattened into a single data frame, with one row per meterId.
#'
#' @details
#' This function ignores non-scalars, so diagnostic data and other complex data structures can be returned by feature algorithms without
#' interfering with the task of creating clean vectors of features for each meter. The columns of the returned data.frame are
#' a super set of all the features returned for every MeterDataClass with computed features. Missing features are given values of NA.
#'
#' @export
iterator.todf = function(lofl) {
tic('list of lists to data.frame')
#print(length(lofl))
print('Removing non-scalars')
lofdf = plyr::llply( lofl,function(x) {
data.frame(x[plyr::laply(x,length)==1]) }, # remove non-scalars before creating the data frame
.progress='text')
if("dplyr" %in% rownames(installed.packages())) {
print('Running dplyr::bind_rows on the remaining data (fast)')
fulldf = dplyr::bind_rows(lofdf)
} else {
print('Running plyr::rbind.fill on the remaining data (slow)')
fulldf = plyr::rbind.fill(lofdf)
}
toc('list of lists to data.frame')
return(fulldf)
}
#' @title
#' Flatten valudation error data returned by iterator functions into a \code{data.frame}.
#'
#' @description
#' Itertor functions like \code{iterator.iterateMeters} return lists of lists of derived features indexed by meterIds.
#' This function flattens any valudation errors in these lists into data.frame, with one row per meterId and columns for every validation error found.
#'
#' @param lofl The list of lists for feature results that should be flattened into a single valudation summary data frame, with one row per meterId.
#'
#' @details
#'
#' This funtion relies on the validation logic storing validation errors in a data frame under the name "issues"
#' in the returned list of features for each customer.
#'
#' @export
iterator.issues.todf = function(lofl) {
do.call(what = plyr::rbind.fill, args = plyr::llply(lofl, .fun=function(x) x$ISSUES ) )
}
#' @title
#' Optimize single meter lookup in a data.frame containing many meters
#'
#' @description
#' This function does a fast search for the first and last indices for each meter's data in a data.frame
#' with many meters and caches the resulting indices. These indices can be used to very quickly access data
#' for individual meters.
#'
#' @param ctx The ctx environment that configures a feature run. This function is specifically looking
#' to build an index for \code{ctx$RAW_DATA}, if present and saves the results as \code{ctx$idxLookup}. It allows
#' for RAW_DATA to be set as a data.frame with data from a large number of meters loaded at once and in
#' advance of the feature extraction runs to reduce runtimes.
#'
#' @details
#' Standard methods of searching for all data for a given meterId would use boolean expressions like
#' \code{ctx$RAW_DATA[ctx$RAW_DATA$meterId == 'some_id',]}. It turns out that this is pretty inefficient
#' for large data.frames becaus it generates values for all rows and then does the necessary comparisons for
#' all rows. Direct numerical indexing avoids this overhead and such indices can be quickly computed using
#' \code{\link{match}} and the fact that all data for each meter must be returned from the data source in
#' contigious rows. Finally, the constructor for a \code{MeterDataClass} checks for these \code{ctx$idxLookup} if
#' \code{ctx$RAW_DATA} is found and uses them to pull the subset of data associated with the meterId passed in
#' to that constructor.
#'
#' @export
iterator.build.idx = function(ctx) {
ids = unique(ctx$RAW_DATA$id)
first = match(ids,ctx$RAW_DATA$id)
if(length(first) > 1) { # if there is more than one id
if( max(diff(first)) < 10 ) {
print('Diagnostic head from iterator.build.idx')
print( head( ctx$RAW_DATA,5 ) )
warning( paste('iterator.build.idx called with ctx$RAW_DATA where match() values on ids',
'are fewer than 10 rows apart. This can occur if a data source does not order',
'getAllData() return values by id, date, which is an assumption of the fast',
'indexing used by the fuction. See the print of the head of the meter data data.frame above.') )
}
}
last = c(first[-1]-1,length(ctx$RAW_DATA$id))
idxLookup = data.frame(ids,first,last)
ctx$idxLookup = idxLookup
return(idxLookup)
}
iterator.rbind.scalars = function(a,b) {
# trim a down to scalars is.character, is.numeric
# trim b down to scalars
}
#' @title Iterate over zip codes, extracting features for meters in each
#'
#' @description
#' Function that iterates through all passed zip codes, looks up local weather data (once) and
#' looks up the list of meter ids for each and calls iterator.iterateMeters with those meter
#' ids and the per-loaded weather data. This runs faster than calling ids individually, which
#' can load similar weather data over and over.
#'
#' @param zipList List of all zip codes to iterate over
#'
#' @param custFn The feature function(s) to call on \code{MeterDataClass} instances from within each zip code
#' If it is a list of feature functions, the results will be \code{c()}'d together, so make sure the names of their
#' return values are unique!
#'
#' @param cacheResults A boolean flag that indicates whether feature results should be cached as RData.
#' If true, the cached data will be looked up prior to feature extraction to bypass running
#' features for the given zip code while returning the cached results. This can prevent duplicate processing
#' and allow an interrupted batch process to resume processing where it left off.
#' The cache directory is `getwd()` by default, but can be overridden using `ctx$resultsCache`.
#'
#' @param ctx The ctx environment that configures the feature run.
#'
#' @param clearCachedResultsFromRAM A boolean that instructs the code to drop previous results from RAM if caching
#' is in effect. This allows for larer runs to be executed and cached without accumulating in RAM. The idea is
#' that after the run, this function will be called again, with useCache=TRUE and this flag set to FALSE, which
#' will simply load and concatenate results from disk to re-create the full set.
#'
#' @param as_df Boolean (defaulted to FALSE) that determines whether \code{iterator.todf} is run on
#' the results before returning them.
#'
#' @param ... Arguments to be passed into the feature function(s).
#'
#' @export
iterator.iterateZip = function(zipList, custFn, cacheResults=FALSE, ctx=new.env(), clearCachedResultsFromRAM=FALSE, as_df=FALSE, ...) {
tic(name='zip code run')
if(is.null(ctx)) { ctx = list() }
out = list()
n = length(zipList)
i = 0
for (z in zipList) {
i = i + 1
print( paste('Processing zip ',z,' (',i,'/',n,')',sep='') )
tic(name='zip')
if(cacheResults & clearCachedResultsFromRAM){
out = iterator.runZip(z,custFn,cacheResults=cacheResults,ctx=ctx,...)
} else {
out = c(out,iterator.runZip(z,custFn,cacheResults=cacheResults,ctx=ctx,...))
}
}
gc()
toc( name='zip code run' )
if(as_df) out = iterator.todf( out )
return(out)
}
#' @title Iterate over all meters in a zip code, extracting features for each in a performance optimized manner
#'
#' @description
#' Function that looks up local weather and all meter data for the passed zip code
#' (once) and caches them and then looks up all meter ids in the passed zip code
#' calls iterator.iterateMeters with those meter ids and the pre-loaded weather and meter data.
#' This runs faster than calling ids individually, which load individual meter data and similar
#' weather data over and over.
#'
#' @param zip The zip code to use for the weather, meter ids, and meter data lookups
#'
#' @param custFn The feature function(s) to call on \code{MeterDataClass} instances from within the zip code
#' If it is a list of feature functions, the results will be \code{c()}'d together, so make sure the names of their
#' return values are unique!
#'
#' @param cacheResults A boolean flag that indicates whether results should be cached as RData.
#' If true, the cached data will be looked up prior to feature extraction to bypass running
#' features for the given zip code while returning the cached results. This can prevent duplicate processing
#' and allow an interrupted batch process to resume processing where it left off.
#' The cache directory is `getwd()` by default, but can be overridden using `ctx$resultsCache`.
#'
#' @param ctx The ctx environment that configures the feature run.
#'
#' @param as_df Boolean (defaulted to FALSE) that determines whether \code{iterator.todf} is run on
#' the results before returning them.
#'
#' @param ... Arguments to be passed into the feature function(s).
#'
#' @export
iterator.runZip = function(zip,custFn,cacheResults=F,ctx=new.env(),as_df=FALSE,...) {
featureList = NULL
weatherFeatures = NULL
if(is.null(ctx$weatherFeatures)) { ctx$weatherFeatures = list() }
cacheFile = paste(ctx$resultsCache,'/results',zip,'.RData',sep='')
ctx$zip = zip
if ( ! is.null(ctx$CLIMATE_ZIP) ) { # if there is a climate zip map, use it add the climate zone
zone = ctx$CLIMATE_ZIP[ctx$CLIMATE_ZIP[,1] == ctx$zip,2]
ctx$cz = paste('Z',zone,sep='')
}
if(cacheResults & file.exists(cacheFile)) {
tryCatch( expr = {
print(paste('Loading cache file',cacheFile))
load(file=cacheFile)
featureList = featureList.saved
weatherFeatures = weatherFeatures.saved
print(paste( length(featureList), 'features retrieved from cache for',zip))
},
error = function(e) {
print('Load from cache failed:')
print(e) } ) # pass. If the load fails, we will run the features.
}
if( ! is.null(weatherFeatures) & ! is.null(featureList) ) { # if the data load from the cache was successful
print(paste('Features and weather features for',zip,'loaded from cache',cacheFile))
ctx$weatherFeatures[[zip]] = weatherFeatures
} else {
tryCatch( expr= {
ctx$weather = getWeather(zip)
tryCatch( expr = {
weatherFeatures = weatherFeatures(ctx$weather)
ctx$weatherFeatures[[zip]] = weatherFeatures },
error = function(e) {
print(paste(' WARNING: Could not run weather features for zipcode:',zip))
print(e)
})
print('[iterator$iterateZip] weather data loaded')
#zipIds = DATA_SOURCE$getIds(ctx$zip,useCache=T)
# load all raw data for the zip code and indicate that it has not yet been date filtered
ctx$RAW_DATA = DATA_SOURCE$getAllData(ctx$zip,useCache=T)
ctx$ALREADY_DATE_FILTERED = F
runDateFilterIfNeeded(ctx)
# it is important that this happens after any filtering ocurrs
idx = iterator.build.idx(ctx)
zipIds = ctx$idxLookup$ids
if('factor' %in% class(zipIds)) { zipIds = as.character(zipIds) }
print('[iterator$iterateZip] raw zip code usage data loaded')
featureList = iterator.iterateMeters(zipIds, custFn, ctx, ...)
toc( name='zip',prefixStr=paste('Processed',length(zipIds),'entries in zipcode',zip) )
if(cacheResults) {
featureList.saved = featureList
weatherFeatures.saved = weatherFeatures
save(list=c('featureList.saved','weatherFeatures.saved'),file=cacheFile)
}
},
error = function(e) {
print(paste(' WARNING: Could not load or process data for zipcode:',zip))
print(e)
},
finally = function() { } )
}
if(as_df) {
out = iterator.todf( featureList )
} else {
out = featureList
}
return(out)
}
#' @title Run features for a single meterId
#'
#' @description
#' Utility function that runs the configured set of feature functions on a single passed
#' meter. This is useful for testing and feature development, but also as the function
#' called by parallelizable methods like apply, and the *ply function of plyr to run feature
#' extraction in on multiple cores of a computer.
#'
#' @param meterId The meterId to use to instantiate a \code{MeterDataClass} object
#'
#' @param custFn The feature extraction function(s) to run on the instance of \code{MeterDataClass}.
#' If it is a list of feature functions, the results will be \code{c()}'d together, so make sure the names of their
#' return values are unique!
#'
#' @param ctx The ctx environment for the feature extraction.
#'
#' @param ... Additional arguments to be passed to the feature extraction function.
#'
#' @export
iterator.runMeter = function(meterId, custFn, ctx, ...) {
fnClass = class(custFn)
tic(name=' process meter')
meterData = NULL
result = tryCatch(
expr={
if( 'MeterDataClass' %in% class(meterId) ) {
meterData = meterId
} else {
meterData = getMeterDataClass(meterId, ctx)
}
issues = validateRes(meterData,minDays=60, ctx=ctx) # validate function from classes-customer.R checks zeros, too few observations
if(length(issues)>1) # all issues will return with an id, so > 1 indicates a problem
{
print(paste(' WARNING: Failed validation: ',paste(colnames(issues),collapse=', ')))
result = list(ISSUES = issues)
}
else {
if( fnClass == 'list') {
result = iterator.callAll(meterData, ctx, custFn, ...)
} else {
result = custFn(meterData, ctx, ...)
}
# in case id isn't already there, add it.
result$id = meterData$id
toc(name=' process meter')
}
result
},
error = function(e) {
msg = capture.output( { print(conditionMessage(e)) } )
result = list(ISSUES=data.frame(id=meterId, error=msg ) )
if( is.null(ctx$STOP_ON_ERROR) ) {
print(paste(' WARNING: Could not compute meter data features. Skipping:',meterId))
print(e)
return(result)
} else {
if( ctx$STOP_ON_ERROR ) {
stop(e)
}
}
}
)
return(result)
}
#' @title Run features for a list of meter ids
#'
#' @description
#' Utility function that runs the configured set of feature functions on the data from a list of passed
#' meter ids. This is useful for scripting feature extraction on an arbitrary number of meters.
#'
#' @param meterList The list of meterIds to use to instantiate a \code{MeterDataClass} objects
#'
#' @param custFn The feature extraction function to run on each instance of \code{MeterDataClass}.
#' Can also be a list of feature functions, whose results will be \code{c()}'d together, so make sure the names of their
#' return values are unique!
#'
#' @param ctx The ctx environment for the feature extraction.
#'
#' @param as_df Boolean (defaulted to FALSE) that determines whether \code{iterator.todf} is run on
#' the results before returning them.
#'
#' @param ... Additional arguments to be passed to the feature extraction function.
#'
#' @export
iterator.iterateMeters = function(meterList, custFn, ctx=new.env(), as_df=FALSE, ...) {
fnClass = class(custFn) # used to determine how to run the feature functions
out = list()
i = 0
tic('iterator.iterateMeters')
n = length(meterList)
for (meterId in meterList) {
i = i + 1
toc('iterator.iterateMeters')
print( paste(' Loading meter ',meterId, ' ', ctx$zip,' (',i,'/',n,')',sep='') )
tic(name=' process meterData')
meterData = NULL
result = tryCatch(
expr={
meterData = getMeterDataClass(meterId, ctx)
#toc(name=' process meterData',prefixStr='getMeterDataClass')
issues = validateRes(meterData,minDays=60, ctx=ctx) # validate function from classes-customer.R checks zeros, too few observations
if(length(issues)>1) # all issues will return with an id, so > 1 indicates a problem
{
print(paste(' WARNING: Bad or insufficient data. Skipping:',paste(colnames(issues),collapse=', ')))
result = list(ISSUES=issues)
}
else {
tic('features')
if( fnClass == 'list') {
newFeatures = iterator.callAll(meterData, ctx, custFn, ...)
} else {
newFeatures = custFn(meterData, ctx, ...)
}
# in case the id isn't there, add it
newFeatures$id = meterData$id
result = newFeatures
toc('features')
toc(name=' process meterData')
}
result
},
error = function(e) {
msg = capture.output( { print(conditionMessage(e)) } )
res = list(ISSUES=data.frame(id=meterId, error=msg ) )
if( is.null(ctx$STOP_ON_ERROR) ) {
print(paste(' WARNING: Could not compute meter data features. Skipping:',meterId))
print(e)
return(res)
} else {
if( ctx$STOP_ON_ERROR ) {
traceback(2)
stop(e)
}
}
}
)
out[[i]] = result
}
toc('iterator.iterateMeters',prefixStr=paste(n,'meters'))
if(as_df) out = iterator.todf( out )
return(out)
}
# returns a list of meter ids associated with a zip code
getMeterIds = function(zip) {
return( DATA_SOURCE$getSPs(zip, useCache=T) )
}
#' @title Run date filters on raw meter data
#'
#' @description Runs \code{applyDateFilters} if the appropriate values are found in the ctx and the data is not yet
#' flagged as filtered. This function retrieves data from \code{ctx$RAW_DATA} and writes its results to the same field.
#'
#' @param ctx Context object that is an R environment with named parameters
#'
#' @export
runDateFilterIfNeeded = function(ctx) {
# filters only apply to RAW_DATA
if(is.null( ctx$RAW_DATA) ) {
return()
}
# to ensure that date filtering can be applied to the raw data once and only once
# we track whether it has been applied with a boolean flag
if(is.null(ctx$ALREADY_DATE_FILTERED)) ctx$ALREADY_DATE_FILTERED = FALSE
if (! ctx$ALREADY_DATE_FILTERED) {
flt = ctx$dateFilter
print('Applying date filter')
norig = nrow(ctx$RAW_DATA)
ctx$RAW_DATA = applyDateFilters( ctx$RAW_DATA, flt ) # if flt is NULL, this will just return the original data
print( sprintf('Filtered from %d -> %d', norig, nrow(ctx$RAW_DATA) ) )
ctx$ALREADY_DATE_FILTERED = TRUE
# it is important that this happens after any filtering ocurrs
idx = iterator.build.idx(ctx)
}
return()
}
# returns a meter's populated data class, using the unique identifier meterId
# which should be returned by getMeterIds and other related functions.
# Supports optional data from the ctx for speeding up access to zip code, weather
# data, and meter data from the relevant zip code
getMeterDataClass = function(meterId,ctx) {
meterData = NULL
# if the data was already looked up in advance and passed into the context
# use the subset belonging to the specified meter
runDateFilterIfNeeded(ctx)
if(!is.null(ctx$idxLookup)) {
tic('fast id lookup')
idxVals = ctx$idxLookup[match(meterId,ctx$idxLookup$ids),]
meterData = ctx$RAW_DATA[idxVals$first:idxVals$last,]
toc('fast id lookup')
}
else {
rawData = ctx$RAW_DATA # rd
if (!is.null(rawData)) {
tic('id subrows')
subrows = which(rawData$id == meterId)
meterData = rawData[subrows,]
toc('id subrows')
}
}
md = DATA_SOURCE$getMeterDataClass( id = meterId,
geo = ctx$zip,
weather = ctx$weather,
data = meterData )
return(md)
}
getWeather = function(zip) {
print(paste('Loading weather for',zip))
weather = WeatherClass(zip,useCache=T,doSG=F)
return(weather)
}