-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathcdc-tsupdate
executable file
·233 lines (197 loc) · 9.16 KB
/
cdc-tsupdate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python3
# Copyright © Los Alamos National Security, LLC, and others.
'''Parse and process CDC web log files in the same manner as the Wikimedia
access logs. The CDC logs are provided daily starting 2013-01-01.'''
import csv
from glob import iglob
from itertools import chain
import datetime
import os
import hashlib
import quacpath
import timeseries
import u
import testable
import time_
import db
# c is config (set at the bottom: u.configure())
c = u.c
# l is the logging object (set at the bottom: u.logging_init())
l = u.l
ap = u.ArgumentParser(description=__doc__)
gr = ap.default_group
gr.add_argument('outfile',
metavar='OUTFILE',
help='time series dataset to create or update')
gr.add_argument('uncompressed_folder',
help='location of the folder containing the uncompressed CSV files')
def get_sha256(path):
'''Get the SHA-256 hash of a file. This method can handle files of arbitrary
size. Idea comes from http://stackoverflow.com/a/3431838/1269634.'''
sha256 = hashlib.sha256()
with open(path, 'rb') as f:
# reading 1 MiB at a time should provide sufficient performance for our
# dataset of small files
for chunk in iter(lambda: f.read(1048576), b''):
sha256.update(chunk)
return sha256.hexdigest()
def main():
'''Read all new time series and write them to the dataset.'''
previously_processed_files = None
try:
# if the database of previously processed files doesn't exist, create it
previously_processed_files = db.SQLite(os.path.join(args.outfile,
'processed_files.db'),
True)
if not previously_processed_files.exists('sqlite_master',
"type='table' AND "
"name='processed_files'"):
previously_processed_files.sql('''CREATE TABLE processed_files (
path TEXT NOT NULL PRIMARY KEY,
sha256 TEXT NOT NULL
);''')
time_series, processed_files = read_time_series(previously_processed_files)
write_time_series(time_series, processed_files, previously_processed_files)
finally:
previously_processed_files.close()
def read_time_series(previously_processed_files):
'''Read all unprocessed files and build a time series for each region's pages.
The resulting file will be a dict mapping date to region to page/count
time series.'''
# the CSV files are inconsistently named
csv_paths = [
iglob(os.path.join(args.uncompressed_folder, 'Flu Pages by Region*.csv')),
iglob(os.path.join(args.uncompressed_folder, 'Pages by Region*.csv')),
iglob(os.path.join(args.uncompressed_folder,
'Pages Data Extract for CDC Internet*.csv')),
]
processed_files = dict() # map file path to hash
time_series = dict() # map date to region to page/count time series
for path in chain.from_iterable(csv_paths):
sha256 = get_sha256(path)
if previously_processed_files.exists('processed_files',
"path='{}'".format(path)):
previous_sha256 = previously_processed_files.get_one(
"SELECT sha256 FROM processed_files WHERE path='{}';"
.format(path))[0]
if sha256 == previous_sha256:
l.info('skipping {}'.format(path))
continue
else:
# Note that if the hash changes, we're going to output this as a
# warning and will process the file as normal, potentially
# overwriting existing data. The assumption here is that a hash
# change is intentional (e.g., a data correction).
l.warning('hash changed for {}'.format(path))
l.info('processing {}'.format(path))
with open(path) as csvfile:
# skip header
for line in csvfile:
# the first line always starts with the byte order mark (BOM)
# this is "\xef\xbb\xbf" or character number 65279
if not (line.startswith('#') or ord(line[0]) == 65279):
break
reader = csv.DictReader(csvfile)
for row in reader:
date = time_.utcify(datetime.datetime.strptime(row['Date'],
'%b %d, %Y'))
page = row['Pages'].replace('\n', '').strip()
region = row['Regions']
# there's no consistent name for the "count" row...ugh
if 'All CDC Visits' in row:
count = int(row['All CDC Visits'])
elif 'Visits' in row:
count = int(row['Visits'])
elif 'Page Views' in row:
count = int(row['Page Views'])
elif 'Page Views (Report-Specific)' in row:
count = int(row['Page Views (Report-Specific)'])
else:
raise ValueError('Count key missing from '
'row {} in file [{}].'.format(row, path))
#l.info('{} --- {} --- {} --- {}'.format(date, page, region, count))
if date not in time_series:
time_series[date] = dict()
if region not in time_series[date]:
time_series[date][region] = dict()
# Some of the files contain overlapping data. That is, for some
# reason, for a specific date/region/page context, there are
# multiple counts. Anecdotally, the first count often looks
# "real" (e.g., 164), while the second count is generally
# relatively small (e.g., 1 or 2). Based on this, we accept the
# first count we encounter and throw away any follow-up counts.
if page in time_series[date][region] and \
time_series[date][region][page] != count:
#l.warning('differing count: {} --- {} --- {} --- {} --- {}'
# .format(date,
# page,
# region,
# count,
# time_series[date][region][page]))
continue
time_series[date][region][page] = count
processed_files[path] = sha256
if not time_series:
l.info('no new data files')
return (None, None)
# ensure we aren't missing any dates
sorted_dates = sorted(time_series.keys())
current_date = sorted_dates[0]
last_date = sorted_dates[-1]
while current_date < last_date:
if current_date not in time_series.keys():
l.warning('missing date: {}'.format(current_date))
current_date += datetime.timedelta(days=1)
return (time_series, processed_files)
def write_time_series(time_series, processed_files, previously_processed_files):
'''Write out the time series to the databases. Right now, we're
using a hashmod of 4. This results in ~100k rows per table for
a full year's worth of data, which should provide for excellent
performance.'''
if not time_series:
l.info('no time series data to write')
return
ds = timeseries.Dataset(args.outfile, 'D', 4, writeable=True)
for date, regions in sorted(time_series.items()):
num_days = time_.days_in_year(date)
day_offset = time_.day_year_offset(date)
l.info('{} - day {} of {} days in the year'.format(date,
(day_offset + 1),
num_days))
# one DB per year
year = datetime.datetime(date.year, 1, 1)
date_ds = ds.open_timestamp(year, num_days)
date_ds.begin()
for region, pages in sorted(regions.items()):
pages = regions[region]
for page, count in sorted(pages.items()):
#l.info('{} - {} - {} - {}'.format(date, region, page, count))
# save the page fragment
page_f = date_ds.fetch_or_create('{}+{}'.format(region, page))
page_f.data[day_offset] = count
page_f.save()
# update the region's fragment
region_f = date_ds.fetch_or_create(region)
region_f.data[day_offset] += count
region_f.save()
date_ds.commit()
#ds.dump()
# update the DB of processed file names and their hashes
for path, sha256 in sorted(processed_files.items()):
if previously_processed_files.exists('processed_files',
"path='{}'".format(path)):
previously_processed_files.sql("UPDATE processed_files "
"SET sha256='{}' "
"WHERE path='{}';".format(sha256, path))
else:
previously_processed_files.sql("INSERT INTO processed_files "
"VALUES ('{}', '{}');".format(path,
sha256))
try:
args = u.parse_args(ap)
u.configure(args.config)
u.logging_init('cdclogs')
if __name__ == '__main__':
main()
except testable.Unittests_Only_Exception:
testable.register()