forked from geekori/virus_covid19
-
Notifications
You must be signed in to change notification settings - Fork 0
/
person.py
166 lines (142 loc) · 5.83 KB
/
person.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from point import *
from const import *
import scipy as sp
from params import *
from math import *
import personpool
from target import *
from hospital import *
import random
class Person(Point):
def __init__(self,city,x,y):
super(Person,self).__init__(x,y)
self.city = city
self.target_sigma = 50 # 方差
self.sigma = 1 # 方差
self.state = NORMAL # 当前状态
# 感染时间
self.infected_time = 0
# 确诊时间
self.confirmed_time = 0
# 死亡时间
self.dead_time = 0
self.need_bed = False
self.move_target = None
self.used_bed = None
# 是否有移动的意愿
def want_move(self):
return sp.random.normal(Params.average_flow_intention, self.sigma) > 0
# 是否被感染
def is_infected(self):
return self.state >= LATENCY
# 被感染
def be_infected(self):
self.state = LATENCY
personpool.Persons().latency_persons.append(self)
self.infected_time = Params.current_time
# 计算两点之间的直线距离
def distance(self,person):
return sqrt(pow(self.x - person.x,2) + pow(self.y - person.y,2))
def distance1(self,x,y):
return sqrt(pow(self.x - x,2) + pow(self.y - y,2))
# 住院
def freezy(self):
self.state = FREEZE
# 不同状态下的单个人实例运行行为
def action(self):
if self.state == FREEZE or self.state == DEATH:
# 处于隔离或死亡状态,无法行动
return
if not self.want_move():
return
# 存在流动意愿,将进行流动,流动同样遵循标准正态分布
if self.move_target == None or self.move_target.arrived:
target_x = sp.random.normal(self.x,self.target_sigma)
target_y = sp.random.normal(self.y, self.target_sigma)
self.move_target = MoveTarget(int(target_x),int(target_y))
# 计算运行位移
distance = self.distance1(self.move_target.x,self.move_target.y)
# 到达目标点
if distance < 1:
self.move_target.arrived = True
return
dx = self.move_target.x - self.x
dy = self.move_target.y - self.y
udx = dx // distance
if udx == 0 and dx != 0:
if dx > 0:
udx = 1
else:
udx = -1
udy = dy // distance
if udy == 0 and dy != 0:
if dy > 0:
udy = 1
else:
udy = -1
# 横向运动边界
if self.x > Params.city_width or self.x < 0:
self.move_target = None
if udx > 0:
udx = -udx
# 纵向运动边界
if self.y > Params.city_height or self.y < 0:
self.move_target = None
if udy > 0:
udy = -udy
self.move_to(udx,udy)
# 对各种状态的人进行不同的处理,更新发布市民健康状态
def update(self):
# 如果已经隔离或死亡,就不需要处理了
if self.state == FREEZE or self.state == DEATH:
return
# 处理已经确诊的感染者(患者)
if self.state == CONFIRMED and self.dead_time == 0:
destiny = random.randrange(1,10001) # 幸运数字
if destiny >= 1 and destiny <= int(Params.fatality_rate * 10000):
# 幸运数字落在死亡区间
self.dead_time = self.confirmed_time + self.dead_time
else:
self.dead_time = -1 # 未死亡
# 患者已经确诊,并且感染时间大于医院响应时间,这时就可以入院
if self.state == CONFIRMED and Params.current_time - self.confirmed_time >= Params.hospital_receive_time:
bed = Hospital().pick_bed() # 查找空床位
if bed == None:
# 没有空床位,报告需求的床位数
if not self.need_bed:
Hospital().need_bed_count += 1
self.need_bed = True
else:
# 安置病人
self.used_bed = bed
self.state = FREEZE
self.x = bed.x + Hospital().bed_size // 2
self.y = bed.y + Hospital().bed_size // 2
# 如果已经入院的患者曾经申请过床位,那么不再申请新的床位
if self.need_bed and Hospital().need_bed_count > 0:
Hospital().need_bed_count -= 1
bed.is_empty = False
# 处理病死者
if(self.state == CONFIRMED or self.state == FREEZE) and Params.current_time >= self.dead_time and self.dead_time > 0:
self.state = DEATH # 患者死亡
personpool.Persons().latency_persons.remove(self) # 已经死亡
Hospital().empty_bed(self.used_bed) # 腾出床位
if Hospital().need_bed_count > 0:
Hospital().need_bed_count -= 1
# 增加一个正态分布用于潜伏期内随机发病时间
latency_symptom_time = sp.random.normal(Params.virus_latency/2,25)
# 处理发病的潜伏期感染者
if self.state == LATENCY and Params.current_time - self.infected_time > latency_symptom_time:
self.state = CONFIRMED # 潜伏期发病
self.confirmed_time == Params.current_time # 刷新确诊时间
# 处理未隔离者的移动问题
self.action()
# 处理健康人被感染的问题
if self.state >= LATENCY:
return
latency_persons = personpool.Persons().latency_persons.copy()
for person in latency_persons:
random_value = random.random()
if random_value < Params.broad_rate and self.distance(person) < Params.safe_distance:
self.be_infected()
break