-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathparser.py
201 lines (156 loc) · 6.99 KB
/
parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
""" Contains code to read CSV files containing taxi data """
from math import ceil
from glob import glob
import sqlite3
import argparse
import pandas as pd
import numpy as np
from tqdm import tqdm
import geojson
import bridge_info
DATE_COLUMNS = [
'tpep_pickup_datetime',
'tpep_dropoff_datetime'
]
PARSER = argparse.ArgumentParser(description='Parse TaxiData CSVs to SQL')
PARSER.add_argument('--rebuild_rides_table',
help='Whether or not to rebuild the rides table. '
'Requires argument: string containing regex matching '
'rides CSVs to parse'
)
PARSER.add_argument('--rebuild_locations_table',
help='Whether or not to rebuild the locations table. '
'Requires argument: string containing path to '
'locations CSV'
)
PARSER.add_argument('--rebuild_coordinates_table',
help='Whether or not to rebuild the coordinates table. '
'Requires argument: string containing path to '
'geojson file containing zone info'
)
PARSER.add_argument('--rebuild_bridges_table',
action='store_true',
help='Whether or not to rebuild the bridges table. '
)
def parse_files(file_list, convert_date_time=True):
"""Parses the context of files in file_list
:file_regex: a list of files whose contents should be parsed
:returns: A pandas dataframe containing every row in the specified files
"""
all_data_frames = (pd.read_csv(f, header=0) for f in file_list)
merged_data = pd.concat((all_data_frames))
if convert_date_time:
for col in DATE_COLUMNS:
merged_data[col] = pd.to_datetime(merged_data[col])
return merged_data
def chunk_iter(source, chunk_size):
"""Iterates over an iterable, dividing it into chunks of size chunk_size.
If the length of source is not a multiple of chunk_size, then the last
chunk yielded will not be of size chunk_size, and will instead contain
all elements that have not yet been yielded.
:source: the source iterable
:chunk_size: the size of the lists to yield
:yields: iterables of size chunk_size consisting of elements from source
"""
for index in range(0, len(source), chunk_size):
yield source[index:index+chunk_size]
def empty_table(conn, table_name):
"""Deletes all rows in a table specified by table_name in a database
associated with connection conn.
:conn: a sqlite3 database connection
:table_name: the name of the table to delete
"""
deletion_command = f'DROP TABLE IF EXISTS {table_name};'
cur = conn.cursor()
cur.execute(deletion_command)
conn.commit()
def write_to_db(dataframe, db_conn, table_name):
"""Writes the given dataframe to a database table. If the table
already exists, it will overwrite its contents.
:dataframe: The dataframe to write to a database table
:db_conn: The database connection object
:table_name: The name of the table to which we will write
"""
dataframe.to_sql(table_name, db_conn, if_exists='append')
def parse_geo_json(file_path):
"""Parses a geo_json file into our database
:file_path: The file containing the geojson which we wish to parse
:returns: A pandas dataframe containing the zone ID and mean lat/long
for each zone.
"""
with open(file_path[0]) as opened_file:
loaded_geojson = geojson.loads(opened_file.read())
all_data = []
for zone in loaded_geojson['features']:
location_id = zone['properties']['objectid']
coordinates = np.array(list(geojson.utils.coords(zone)))
# Approximate center of zone by finding center of bounding
# lat/long rectangle
maxes = np.max(coordinates, axis=0)
mins = np.min(coordinates, axis=0)
mean_lat, mean_long = (maxes + mins) / 2
all_data.append([location_id, mean_lat, mean_long])
return pd.DataFrame(all_data, columns=['LocationID', 'lat', 'long'])
def parse_files_and_write_to_db(file_regex,
db_conn,
table_name,
chunk_size=5,
convert_date_time=True,
parse_as_geojson=False):
"""
Parses the given files and writes them to a table in a database.
:file_regex: a regular expression matching the files whose contents we
want to parse
:db_conn: The database connection object
:table_name: The name of the table to which we will write
:chunk_size: The number of files to write to the db at once
:convert_date_time: Whether or not to attempt to convert certain columns
to datetimes
:parse_as_geojson: Whether or not to parse file_regex as geojson files
"""
empty_table(db_conn, table_name)
matching_files = glob(file_regex)
num_chunks = ceil(len(matching_files)/chunk_size)
with tqdm(total=num_chunks) as pbar:
for chunk in chunk_iter(matching_files, chunk_size):
if parse_as_geojson:
all_data = parse_geo_json(chunk)
else:
all_data = parse_files(chunk, convert_date_time)
write_to_db(all_data, db_conn, table_name)
pbar.update(1)
def parse_bridge_info():
"""Parses info about bridge from bridge_info into a pandas dataframe.
:returns: a pandas dataframe with two columns, LocationID1 and LocationID2,
representing the two endpoints of a crossing point between boros.
"""
return pd.DataFrame(bridge_info.BRIDGES, columns=['LocationID1', 'LocationID2'])
def main():
"""Handles args and calls parse_files_and_write_to_db.
"""
db_conn = sqlite3.connect('rides.db')
provided_args = PARSER.parse_args()
if provided_args.rebuild_rides_table:
table_name = 'rides'
parse_files_and_write_to_db(provided_args.rebuild_rides_table,
db_conn,
table_name)
if provided_args.rebuild_locations_table:
table_name = 'locations'
parse_files_and_write_to_db(provided_args.rebuild_locations_table,
db_conn,
table_name,
convert_date_time=False)
if provided_args.rebuild_coordinates_table:
table_name = 'coordinates'
parse_files_and_write_to_db(provided_args.rebuild_coordinates_table,
db_conn,
table_name,
convert_date_time=False,
parse_as_geojson=True)
if provided_args.rebuild_bridges_table:
table_name = 'bridges'
bridge_info = parse_bridge_info()
write_to_db(bridge_info, db_conn, table_name)
if __name__ == '__main__':
main()