@@ -29,10 +29,10 @@ use stor_port::{
29
29
openapi:: { models, models:: NodeStatus } ,
30
30
store:: nexus_persistence:: { NexusInfo , NexusInfoKey } ,
31
31
transport:: {
32
- Child , ChildState , CreateVolume , DestroyVolume , Filter , GetNexuses , GetReplicas ,
32
+ self , Child , ChildState , CreateVolume , DestroyVolume , Filter , GetNexuses , GetReplicas ,
33
33
GetVolumes , Nexus , NodeId , PublishVolume , SetVolumeReplica , ShareVolume , Topology ,
34
- UnpublishVolume , UnshareVolume , Volume , VolumeId , VolumeShareProtocol , VolumeState ,
35
- VolumeStatus ,
34
+ UnpublishVolume , UnshareVolume , Volume , VolumeHealth , VolumeId , VolumeShareProtocol ,
35
+ VolumeState , VolumeStatus ,
36
36
} ,
37
37
} ,
38
38
} ;
@@ -1120,3 +1120,219 @@ async fn offline_node(cluster: &Cluster) {
1120
1120
}
1121
1121
}
1122
1122
}
1123
+
1124
+ #[ tokio:: test]
1125
+ async fn health ( ) {
1126
+ let cluster = ClusterBuilder :: builder ( )
1127
+ . with_rest ( true )
1128
+ . with_io_engines ( 3 )
1129
+ . with_tmpfs_pool ( 100 * 1024 * 1024 )
1130
+ . with_cache_period ( "150ms" )
1131
+ . with_reconcile_period ( Duration :: from_millis ( 100 ) , Duration :: from_secs ( 100 ) )
1132
+ . build ( )
1133
+ . await
1134
+ . unwrap ( ) ;
1135
+
1136
+ let volume_client = cluster. grpc_client ( ) . volume ( ) ;
1137
+ let volume = volume_client
1138
+ . create (
1139
+ & CreateVolume {
1140
+ uuid : "1e3cf927-80c2-47a8-adf0-95c486bdd7b7" . try_into ( ) . unwrap ( ) ,
1141
+ size : 5242880 ,
1142
+ replicas : 3 ,
1143
+ ..Default :: default ( )
1144
+ } ,
1145
+ None ,
1146
+ )
1147
+ . await
1148
+ . unwrap ( ) ;
1149
+
1150
+ let volume = volume_client
1151
+ . publish (
1152
+ & PublishVolume :: new (
1153
+ volume. spec ( ) . uuid . clone ( ) ,
1154
+ Some ( cluster. node ( 0 ) ) ,
1155
+ None ,
1156
+ HashMap :: new ( ) ,
1157
+ vec ! [ ] ,
1158
+ ) ,
1159
+ None ,
1160
+ )
1161
+ . await
1162
+ . unwrap ( ) ;
1163
+
1164
+ cluster. composer ( ) . kill ( & cluster. node ( 2 ) ) . await . unwrap ( ) ;
1165
+
1166
+ wait_for_health (
1167
+ & volume. state ( ) ,
1168
+ & volume_client,
1169
+ Duration :: from_secs ( 2 ) ,
1170
+ |h| {
1171
+ h. healthy_replicas == 2
1172
+ && !h. clean_shutdown
1173
+ && h. online_clean_replicas == 1
1174
+ && h. clean_replicas == 1
1175
+ && h. live_healthy_replicas == 2
1176
+ && h. online_healthy_replicas == 2
1177
+ } ,
1178
+ )
1179
+ . await
1180
+ . unwrap ( ) ;
1181
+
1182
+ let volume = volume_client
1183
+ . unpublish (
1184
+ & UnpublishVolume :: new ( & volume. spec ( ) . uuid , false , vec ! [ ] ) ,
1185
+ None ,
1186
+ )
1187
+ . await
1188
+ . unwrap ( ) ;
1189
+
1190
+ wait_for_health (
1191
+ & volume. state ( ) ,
1192
+ & volume_client,
1193
+ Duration :: from_secs ( 2 ) ,
1194
+ |h| {
1195
+ h. healthy_replicas == 2
1196
+ && h. clean_shutdown
1197
+ && h. online_clean_replicas == 2
1198
+ && h. clean_replicas == 2
1199
+ && h. live_healthy_replicas == 0
1200
+ && h. online_healthy_replicas == 2
1201
+ } ,
1202
+ )
1203
+ . await
1204
+ . unwrap ( ) ;
1205
+
1206
+ cluster. composer ( ) . stop ( & cluster. node ( 1 ) ) . await . unwrap ( ) ;
1207
+
1208
+ wait_for_health (
1209
+ & volume. state ( ) ,
1210
+ & volume_client,
1211
+ Duration :: from_secs ( 2 ) ,
1212
+ |h| {
1213
+ h. healthy_replicas == 2
1214
+ && h. clean_shutdown
1215
+ && h. online_clean_replicas == 1
1216
+ && h. clean_replicas == 2
1217
+ && h. live_healthy_replicas == 0
1218
+ && h. online_healthy_replicas == 1
1219
+ } ,
1220
+ )
1221
+ . await
1222
+ . unwrap ( ) ;
1223
+
1224
+ cluster. composer ( ) . stop ( & cluster. node ( 0 ) ) . await . unwrap ( ) ;
1225
+
1226
+ wait_for_health (
1227
+ & volume. state ( ) ,
1228
+ & volume_client,
1229
+ Duration :: from_secs ( 2 ) ,
1230
+ |h| {
1231
+ h. healthy_replicas == 2
1232
+ && h. clean_shutdown
1233
+ && h. online_clean_replicas == 0
1234
+ && h. clean_replicas == 2
1235
+ && h. live_healthy_replicas == 0
1236
+ && h. online_healthy_replicas == 0
1237
+ } ,
1238
+ )
1239
+ . await
1240
+ . unwrap ( ) ;
1241
+
1242
+ cluster. composer ( ) . start ( & cluster. node ( 1 ) ) . await . unwrap ( ) ;
1243
+ cluster
1244
+ . wait_node_status ( cluster. node ( 1 ) , transport:: NodeStatus :: Online )
1245
+ . await
1246
+ . unwrap ( ) ;
1247
+
1248
+ let volume = volume_client
1249
+ . publish (
1250
+ & PublishVolume :: new (
1251
+ volume. spec ( ) . uuid . clone ( ) ,
1252
+ Some ( cluster. node ( 1 ) ) ,
1253
+ None ,
1254
+ HashMap :: new ( ) ,
1255
+ vec ! [ ] ,
1256
+ ) ,
1257
+ None ,
1258
+ )
1259
+ . await
1260
+ . unwrap ( ) ;
1261
+
1262
+ wait_for_health (
1263
+ & volume. state ( ) ,
1264
+ & volume_client,
1265
+ Duration :: from_secs ( 2 ) ,
1266
+ |h| {
1267
+ h. healthy_replicas == 1
1268
+ && !h. clean_shutdown
1269
+ && h. online_clean_replicas == 1
1270
+ && h. clean_replicas == 1
1271
+ && h. live_healthy_replicas == 1
1272
+ && h. online_healthy_replicas == 1
1273
+ } ,
1274
+ )
1275
+ . await
1276
+ . unwrap ( ) ;
1277
+
1278
+ cluster. composer ( ) . start ( & cluster. node ( 2 ) ) . await . unwrap ( ) ;
1279
+ wait_for_health (
1280
+ & volume. state ( ) ,
1281
+ & volume_client,
1282
+ Duration :: from_secs ( 2 ) ,
1283
+ |h| {
1284
+ h. healthy_replicas == 2
1285
+ && !h. clean_shutdown
1286
+ && h. online_clean_replicas == 1
1287
+ && h. clean_replicas == 1
1288
+ && h. live_healthy_replicas == 2
1289
+ && h. online_healthy_replicas == 2
1290
+ } ,
1291
+ )
1292
+ . await
1293
+ . unwrap ( ) ;
1294
+
1295
+ // a stop is a clean shutdown and should be so marked in the health info!
1296
+ cluster. composer ( ) . stop ( & cluster. node ( 1 ) ) . await . unwrap ( ) ;
1297
+ cluster
1298
+ . wait_node_status ( cluster. node ( 1 ) , transport:: NodeStatus :: Unknown )
1299
+ . await
1300
+ . unwrap ( ) ;
1301
+
1302
+ wait_for_health (
1303
+ & volume. state ( ) ,
1304
+ & volume_client,
1305
+ Duration :: from_secs ( 2 ) ,
1306
+ |h| {
1307
+ h. healthy_replicas == 2
1308
+ && h. clean_shutdown
1309
+ && h. online_clean_replicas == 1
1310
+ && h. clean_replicas == 2
1311
+ && h. live_healthy_replicas == 0
1312
+ && h. online_healthy_replicas == 1
1313
+ } ,
1314
+ )
1315
+ . await
1316
+ . unwrap ( ) ;
1317
+ }
1318
+
1319
+ async fn wait_for_health < F : FnMut ( & VolumeHealth ) -> bool > (
1320
+ volume : & VolumeState ,
1321
+ client : & dyn VolumeOperations ,
1322
+ timeout : std:: time:: Duration ,
1323
+ mut cb : F ,
1324
+ ) -> Result < VolumeState , Option < VolumeHealth > > {
1325
+ let now = std:: time:: Instant :: now ( ) ;
1326
+ let mut health = None ;
1327
+ while now. elapsed ( ) < timeout {
1328
+ let volume = get_volume ( volume, client) . await ;
1329
+ health = volume. state ( ) . health ;
1330
+ if let Some ( health) = & health {
1331
+ if cb ( health) {
1332
+ return Ok ( volume. state ( ) ) ;
1333
+ }
1334
+ }
1335
+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1336
+ }
1337
+ Err ( health)
1338
+ }
0 commit comments