-
Notifications
You must be signed in to change notification settings - Fork 0
/
walkscore_calculator.py
355 lines (320 loc) · 14.7 KB
/
walkscore_calculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
from osgeo import ogr
from osgeo import gdal
import os
import time
import shelve
import csv
from scipy import spatial
import numpy as np
from make_start_points import Make_Start_Points_Road
from compute_walkscore import Compute_Walkscore_Road
from networkx_readshp import read_shp_to_graph
import catch_road
import shp
'''防止编码出现问题'''
gdal.SetConfigOption('GDAL_FILENAME_IS_UTF8', 'YES')
gdal.SetConfigOption('SHAPE_ENCODING', 'UTF-8')
class WalkscoreCalculator:
all_road_info = []
all_start_point_info = []
def __init__(self, city):
self.road_linedata = ''
self.poi_pointdata = {}
self.all_road_info.clear()
self.all_start_point_info.clear()
self.MultiType_poi_points = {}
self.weight_table = {}
self.city = city
self.road_points = {}
self.shelve_path = './Shelve/' # kdtree和Graph的缓存文件,一个城市首次计算会创建,之后就直接读取
self.flag = '3'
def Set_Weights(self, filepath):
with open(filepath, 'r') as file:
reader = csv.reader(file)
lines = list(reader)
del lines[0]
for line in lines:
poi_code = line[0]
poi_code = del_last_letter(poi_code)
poi_type = line[1]
poi_weights = line[2].split('+')
self.weight_table[poi_code] = [[float(poi_weight) for poi_weight in poi_weights], poi_type]
ss = 0
print('权重表如下:')
for weight_poi_type in self.weight_table.values():
s = 0
for w in weight_poi_type[0]:
s += w
ss += s
print(weight_poi_type[1], s)
self.weight_sum = ss
print('权重和为:%f' % ss)
# print(self.weight_table)
def Set_Road_Linedata(self, filename:str):
self.road_linedata = filename
if os.path.exists(self.road_linedata):
print('道路数据路径设置完毕...')
else:
print('道路数据路径设置失败...')
def Set_New_Road_Linedata(self, filename):
self.new_road_linedata = filename
def Set_POI_Pointdata(self, filepath:str, new_csv_path:str):
print('##############################')
print('正在检测POI是否已经过抓路...')
MultiType_poi_points = {}
index = new_csv_path.find('.csv')
new_csv_path_list = list(new_csv_path)
new_csv_path_list.insert(index, '错误')
error_path = ''.join(new_csv_path_list)
lines = []
catch_lines = []
with open(filepath, 'r', encoding='utf-8') as file:
lines = list(csv.reader(file))
if os.path.exists(new_csv_path):
with open(new_csv_path, 'r') as file:
catch_lines = list(csv.reader(file))
if not os.path.exists(new_csv_path) or len(lines) != len(catch_lines):
print('POI尚未被抓取到道路上')
print('开始将POI抓取到道路上.........')
catch_road.catch(self.city,
filepath,
self.road_linedata,
new_csv_path, error_path, 50)
else:
print('POI已经成功被抓取到道路上')
del lines
del catch_lines
print('正在准备POI的数据...')
with open(new_csv_path, 'r') as file:
reader = csv.reader(file)
lines = list(reader)
coord_index = lines[0].index('lng')
code_index = lines[0].index('poi_code')
del lines[0]
for poi_code, weights_types in self.weight_table.items():
SingleType_poi_points = []
for line in lines:
if float(line[-1]) > 3000:
continue
current_code = str(line[code_index])
current_code = del_last_letter(current_code)
#为什么要去掉种类代码的最后一个字符呢,因为种类代码的后面有个A
#excel总是把字符串当成数字,然后把第一个0删了,有这个A可以强制为字符串
if is_included(current_code, poi_code):
coord = (float(line[coord_index]), float(line[coord_index+1]))
SingleType_poi_points.append(coord)
road_point = to_tuple(line[-4])
pre_point = to_tuple(line[-3])
next_point = to_tuple(line[-2])
dist = float(line[-1])
self.road_points[coord] = [road_point, pre_point, next_point, dist]
MultiType_poi_points[weights_types[1]] = SingleType_poi_points
self.MultiType_poi_points = MultiType_poi_points
#print(self.MultiType_poi_points)
'''
with open('test.csv', 'w', newline='') as file:
writer = csv.writer(file)
for k, v in MultiType_poi_points.items():
for i in v:
x = i[0]
y = i[1]
writer.writerows([[k, x, y]])
'''
print('所有POI的数据准备完成')
print('#################################')
def Compute_Road_Walkscore(self, seg_length, poi_num, part_num):
'''准备道路的数据以及计算步行指数'''
#Multi_roads_geos = read_shp_to_geo(self.road_linedata)
scale = 100.0 / self.weight_sum
print('正在检测Graph和KDtree是否已经生成过')
write_G = False
write_T = False
if not os.path.exists(self.shelve_path):
os.makedirs(self.shelve_path)
if not os.path.exists(self.shelve_path + self.city + '.dat'):
write_G = True
write_T = True
elif os.path.exists(self.shelve_path + self.city + '.dat'):
s = shelve.open(self.shelve_path + self.city, 'r')
if s.get('G') is None:
write_G = True
if s.get('T') is None:
write_T = True
s.close()
print('Graph: 未生成', end=' ') if write_G else print('Graph: 已生成', end=' ')
print('KDtree: 未生成') if write_T else print('KDtree: 已生成')
if write_G:
print('正在生成Graph...')
s = shelve.open(self.shelve_path + self.city, 'c')
G = read_shp_to_graph(self.road_linedata, simplify=False).to_undirected()
s['G'] = G
s.close()
print('Graph生成完成')
if write_T:
print('正在生成KDtree...')
s = shelve.open(self.shelve_path + self.city, 'c')
kdtrees = self.create_kdtree()
s['T'] = kdtrees
s.close()
print('KDtree生成完成')
with shelve.open(self.shelve_path + self.city, 'r') as s:
G = s['G']
kdtrees = s['T']
#temp_road_info = []
#准备路网的数据,temp_road_info是所有道路的数据,temp_roadInfo是一条道路的数据
'''读取shp文件'''
reader = shp.shp_reader(self.road_linedata)
head = reader.readhead()
spatial_reference = reader.readSRS()
# 获取id字段和name字段的index
for field in head:
if 'name' == field.lower():
name_index = list(head.keys()).index(field)
elif 'id' in field.lower():
id_index = list(head.keys()).index(field)
print('正在统计一共有多少个点...')
road_count, point_count = Counting(reader.layer, seg_length, name_index)
print('一共有%d条路,%d个点' % (road_count, point_count))
# 这个part是想把一个城市分成三个部分并行计算,还没有调试好
parts = [1, road_count // 3, road_count // 3 * 2, road_count]
flag = self.flag
#flag = '2'
if flag == '1':
writer = shp.shp_writer(self.new_road_linedata)
writer.createlayer(spatial_reference, ogr.wkbLineString)
new_head = {'real_id':ogr.OFTInteger}
for field, _OFT in head.items():
new_head[field] = _OFT
new_head['walkscore'] = ogr.OFTReal
for weights_types in self.weight_table.values():
poi_type = weights_types[1]
new_head[poi_type] = ogr.OFTReal
writer.writehead(new_head)
elif flag == '2':
print('不写入\n')
else:
return False
print('kdtree, seg_length=' + str(seg_length))
print('\n开始计算步行指数')
real_id = 0
start = time.time()
counting_up_to_now = 0
for row in reader.readrows():
real_id += 1
if not parts[part_num-1] < real_id <= parts[part_num]:
continue
start_point_info = {}
temp_geo = row[-1] #temp_geo可能是MultiLinestring
#if real_id > 8884: break
#if real_id != 8884: continue
start_time = time.time()
if row[name_index] is not None:
if '高速' in row[name_index]:
continue
Make_Start_Points_Road(temp_geo, start_point_info, seg_length)
counting = len(start_point_info)
counting_up_to_now += counting
print('第%d/%d条:名称为%s,%d个点' % (real_id, road_count, row[name_index], counting ))
ws = Compute_Walkscore_Road(start_point_info,
self.weight_table,
self.MultiType_poi_points,
self.road_points,
G,
kdtrees,
poi_num,
scale)
walkscore = ws['main']
if flag == '1':
# 如果flag等于1,则写入,否则只是计算,不会写入一个新的shp文件
new_row = [real_id] + row[:-1] + [walkscore]
for weights_types in self.weight_table.values():
poi_type = weights_types[1]
poi_type_ws = ws[poi_type]
new_row.append(poi_type_ws)
new_row.append(temp_geo)
writer.appendrows([new_row])
print('步行指数为 %f ' % walkscore, end='')
end_time = time.time()
time_left = (end_time-start) / counting_up_to_now * (point_count - counting_up_to_now)
print('步行指数计算%f秒,共%f秒, 预计还剩%.1f分钟' % ((end_time-start_time), (end_time-start), time_left/60))
#if real_id > 10:
#break
if flag == '1':
writer.ds.Destroy()
'''
self.all_road_info = temp_road_info
self.all_start_point_info = start_point_info
'''
print(counting_up_to_now, point_count)
return True
def create_kdtree(self):
kdtrees = {}
for weights_types in self.weight_table.values():
SingleType_poi_points = self.MultiType_poi_points[weights_types[1]]
kdtree = spatial.KDTree(np.array(SingleType_poi_points))
kdtrees[weights_types[1]] = kdtree
return kdtrees
def is_included(current_code:str, weight_table_poi_code:str):
#判断这个POI有没有包含在权重表里,是不是计算需要的
#current_code是爬到的,weight_table_poi_code是自己写的权重表里的
#poi_code: 050000/050101|050119 "/"后面是要排除掉的id 表示050000类的都会被提取,但是除了属于050101和050119的
needed_poi_codes = weight_table_poi_code.split('/')[0] #要包含的id
if '/' in weight_table_poi_code:
excepted_poi_codes = weight_table_poi_code.split('/')[1] #要排除掉的id
else:
excepted_poi_codes = None
poi_codes = [del_zeros(temp_code) for temp_code in needed_poi_codes.split('|')] #[05]
full_current_codes = [temp_code for temp_code in current_code.split('|')] #[050101, 050105]
if excepted_poi_codes is not None:
excepted_poi_codes = [del_zeros(temp_code) for temp_code in excepted_poi_codes.split('|')] #[050101, 050119]
else:
excepted_poi_codes = []
#先判断在不在需要排除掉的里面
for excepted_poi_code in excepted_poi_codes:
for full_current_code in full_current_codes:
if full_current_code[:len(excepted_poi_code)] == excepted_poi_code:
return False
for poi_code in poi_codes:
for full_current_code in full_current_codes:
if full_current_code[:len(poi_code)] == poi_code:
return True
return False
def del_zeros(code):
code = list(code)
while code and code[-1] == '0' and code[-2] == '0':
del code[-1]
del code[-1]
return ''.join(code)
def del_last_letter(code):
code = list(code)
code = code[:-1]
return ''.join(code)
def to_tuple(s):
coord = s.split(', ')
x = float(coord[0][1:])
y = float(coord[1][:-1])
return (x, y)
def Counting(layer, seg_length, name_index):
'''统计有多少条路多少个点'''
layer.ResetReading()
temp_road = layer.GetNextFeature()
road_count = 0
point_count = 0
while temp_road:
name = temp_road.GetFieldAsString(name_index)
if '高速' in name:
temp_road = layer.GetNextFeature()
continue
temp_geo = temp_road.GetGeometryRef().Clone()
start_point_info = {}
if temp_geo.GetGeometryName() == 'MULTILINESTRING':
roads_count = temp_geo.GetGeometryCount()
for index in range(roads_count):
temp_road_geo = temp_geo.GetGeometryRef(index)
Make_Start_Points_Road(temp_road_geo, start_point_info, seg_length)
elif temp_geo.GetGeometryName() == 'LINESTRING':
Make_Start_Points_Road(temp_geo, start_point_info, seg_length)
road_count += 1
point_count += len(start_point_info)
temp_road = layer.GetNextFeature()
return road_count, point_count