-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatastore.py
287 lines (257 loc) · 9.08 KB
/
datastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Module that handles loading data from disk and supplying it to
requestors. It manages a dictionary of supplied data so that it
won't require reloading. Returns a copy of the data to remove
inter-routine problems. Data is safe to manipulate.
Common useage:
import datastore as ds
dst = ds.dst
dt = dst.best()
"""
# %% Setup
import pandas as pd
import numpy as np
import requests
import yaml
from collections import namedtuple
from io import StringIO
import datetime as dt
#%precision 2
yn, mn, dn, en, nn, sn = ['Year', 'Month', 'Data',
'Error', 'Normalized', 'Smooth']
emn, epn = ['Err-', 'Err+'] # difference from mean/median
sln, sdn = ['slope', 'Deviation']
intn = 'Integral'
# Edit `base` if a different location is desired for data.
base = ''
pre = base + 'Data/'
def make_spec(spec: dict):
"""
Return a namedtuple allowing the use of dot notation for dictionaries
Parameters
----------
spec: dict Dictionary to be turned into a namedtuple
Returns
-------
namedtuple containing other spec dictionaries
"""
sType = namedtuple('sType', list(spec))
return sType(**spec)
# %% Functions
def update_modern(f: str):
"""
Download modern values and save as tab-deliminated file
"""
spec = dst.specs[f]
fmt = spec['format']
headers = {'User-agent': 'Custom User Agent'}
name = spec['name']
url = spec['url']
table = spec.get('table', False)
origin = spec.get('origin', None) # used for julian dates
if table:
fmt['names'] = [yn] + list(range(1, 13))
fmt['usecols'] = list(range(13)) # year plus months
if 'start_year' in spec.keys():
# File has unwanted info at end of file that could mess
# up the loading process. Calculate the number of lines
# that are needed and load just those.
start_year = spec['start_year']
start_month = spec.get('start_month', 1)
now = dt.date.today()
skip = fmt.get('skiprows', 0)
if table:
lines = now.year - start_year + 1 - skip
else:
lines = (now.year - start_year - 1) * 12
lines += 13 - start_month
lines += now.month - skip # skipped lines not included
fmt['nrows'] = lines
response = requests.get(url, headers=headers)
df = pd.read_csv(StringIO(response.text), **fmt)
if 'nrows' in spec.keys(): # remove spurious lines
mx = df.index.argmax()
df = df.iloc[:mx+1]
if 'Err5' in df.columns:
df[emn] = (df[dn] - df['Err5'])/2
df[epn] = (df['Err95'] - df[dn])/2
df.drop(columns=['Err5', 'Err95'], inplace=True)
elif en in df.columns:
df[emn] = df[en]/2
df[epn] = df[emn]
df.drop(columns=[en], inplace=True)
if origin:
df.index = pd.to_datetime(df.index, origin=origin, unit='D')
if table:
# Turn a table with month columns to a long list
df[yn] = df.index
df = df.melt(id_vars=[yn], value_name=dn, var_name='Month')
df['Day'] = 1
df.index = pd.to_datetime(df[['Year', 'Month', 'Day']])
df.index.name = 'Date'
df.drop(columns=['Year', 'Month', 'Day'], inplace=True)
df.sort_index(inplace=True)
df.dropna(inplace=True)
df.to_csv(pre+spec['save_as'], sep='\t',
float_format='%.4f', date_format='%Y-%m-%d')
df.label = name
return df
def load_modern(f: str, annual=True):
""" Return files that have been processed as
tab-delimited files
f: str, name of data set
annual: bool, return annual data, default True
"""
spec = make_spec(dst.specs[f])
fname = spec.save_as
df = pd.read_csv(pre+fname, sep='\t', index_col=0, parse_dates=[0])
if f in dst.specs['temperature']:
# assume temperatures have a datetime index
# normalize to Hadcrut data for 1961-90
df['Raw'] = df[dn].to_numpy() # force a copy
df[dn] -= df.loc[(df.index.year>=1961)&(df.index.year<=1990), dn].mean()
df[dn] += dst.specs['pie_offset'] # pre-industrial era
if hasattr(df.index, 'month'):
counts = df[df.columns[0]].groupby(df.index.year).count()
if annual:
low_yrs = counts.loc[counts < 12].index.values
if len(low_yrs) < 3:
# if there are more than 2 low_counts, it means that
# only annual data is given.
for y in low_yrs:
i = df.loc[df.index.year==y].index
df.drop(index=i, inplace=True)
df = df.groupby(df.index.year).mean()
else: # return monthly data
high_yrs = counts.loc[counts > 24].index.values
if len(high_yrs) > 2 : # it is daily data
df['m'] = df.index.strftime('%Y-%m-01')
df = df.groupby('m').mean()
df.index = pd.to_datetime(df.index)
df.spec = '' # avoid warning for setting columns
df.spec = spec
return df
def update_list():
"""Update modern temperature data files.
"""
m = 'best,hadcrut'
m = m.split(',')
for f in m:
print(f'loading {f}...')
update_modern(f)
def combine_co2(df, start=1760):
# add in years from Antarctic Composite to get longer sequence
if isinstance(df, pd.Series):
mf = pd.DataFrame(index=df.index)
mf[dn] = df.values
mf[epn] = 0.3598 # Moana Loa error values
mf[emn] = 0.3598
else:
mf = df.copy()
cf = dst.co2composite()
win = 15 # a larger window required to ensure sufficient data in early yrs
fy = mf.index[0] # first year of modern data
ly = mf.index[-1]
cyrs = list(range(start, fy)) # annual ice core years
myrs = mf.index # modern years
cols = [dn, emn, epn]
temp = pd.DataFrame(index=list(range(start, ly + 1)), columns=cols,
dtype=np.float)
temp.loc[myrs, cols] = mf.loc[myrs, cols]
mf = mf.combine_first(cf) # add composite data, index is now float
# Make composite data into annual
for yr in cyrs:
xf = mf.loc[(mf.index >= (yr - win)) & (mf.index <= (yr + win))]
for col in cols:
[slope, intercept] = np.polyfit(xf.index, xf[col], 1)
temp.loc[yr, col] = (yr * slope + intercept)
return temp
def load_processed(f: str):
"""
Load processed tab-delimited data
Parameters
----------
f: str Name of data
Returns
-------
pandas dataframe
"""
if f not in dst.specs:
raise Exception(f'{f} is unknown. Check specs.yaml')
spec = make_spec(dst.specs[f])
df = pd.read_csv(pre+spec.save_as, sep='\t', index_col=0, parse_dates=[0],
comment='#')
df.spec = '' # avoid warning for setting columns
df.spec = spec
return df
def load_special(f: str):
"""
Load data requiring special processing
Parameters
----------
f: str Name of data
Returns
-------
pandas dataframe
"""
raise Exception(f'"{f}" not a known data source.')
def get_nino(ix):
""" Return the requested nino index
"""
ixs = [12, 3, 34, 4]
if ix not in ixs:
raise Exception(f'ix must be one of {ixs}')
path = f'nino{ix}.long.anom.data.txt'
src = (f' Nino {ix} Index:\n' +
f' psl.noaa.gov/gcos_wgsp/Timeseries/Nino{ix}/')
df = pd.read_csv(pre + path, sep='\s+', header=None, skiprows=1,
index_col=0, na_values=-99.99)
df.dropna(inplace=True)
df.index.name = yn
nino = df.mean(axis=1)
nino.index = nino.index.astype(int)
nino.src = src
return nino
class _DataSource:
""" Singleton class that interfaces with file data and keeps a copy
in memory when requested for faster loading.
"""
# list properties to copy with data in dataframes
_meta = ['spec']
def __init__(self):
self.frames = {}
with open(base+'specs.yaml', 'r') as file:
self.specs = yaml.safe_load(file)
keys = list(self.specs.keys())[2:]
# add dataframes as methods to get with . notation
for k in keys:
self.__dict__.update({k: lambda f=k, annual=True: self._request(f, annual)})
def _pull(self, f, annual=False):
""" Pull data from data file and return data frame
"""
if f in self.specs['modern']:
return load_modern(f, annual)
elif f in self.specs['special']:
return load_special(f)
else:
return load_processed(f)
def _request(self, f, annual=False):
""" Return a copy of requested data frame given string name
"""
if f in self.frames:
has_month = hasattr(self.frames[f].index[0], 'month')
if annual == (not has_month):
return self.frames[f].copy()
r = self._pull(f, annual)
# tell dataframe what new properties to copy with .copy()
r._metadata.extend(self._meta)
r.index.name = f
self.frames[f] = r
return r.copy()
def reset(self):
""" Reset the stored dataframes.
"""
self.frames = {}
dst = _DataSource()