40
40
41
41
-type stage_info () :: # stage_info {}.
42
42
43
- init (Stages ) ->
44
- PerStageProgress = dict :from_list (init_per_stage_progress (Stages )),
43
+ % % Need StageNodes as when rebalance starts we need to show a minimum stages of
44
+ % % rebalance that are expected to occur, usually the services involved.
45
+ init (StageNodes ) ->
46
+ PerStageProgress = dict :from_list (init_per_stage_progress (StageNodes )),
45
47
Aggregated = aggregate (PerStageProgress ),
46
- StageInfo = init_per_stage_info (Stages ),
48
+ StageInfo = init_per_stage_info (StageNodes ),
47
49
# stage_info {per_stage_progress = PerStageProgress ,
48
50
aggregated = Aggregated ,
49
51
per_stage_info = StageInfo }.
50
52
51
- init_per_stage_progress (Stages ) ->
52
- lists :flatten ([init_stage_progress (S , N , SS ) || {S , N , SS } <- Stages ]).
53
+ init_per_stage_progress (StageNodes ) ->
54
+ [{Stage , dict :from_list ([{N , 0 } || N <- Nodes ])} ||
55
+ {Stage , Nodes } <- StageNodes , Nodes =/= []].
53
56
54
- init_stage_progress (_Stage , [], _SubStage ) ->
55
- [];
56
- init_stage_progress (Stage , Nodes , SubStages ) ->
57
- SubStageNodes = init_per_stage_progress (SubStages ),
58
- [{Stage , dict :from_list ([{N , 0 } || N <- Nodes ])} | SubStageNodes ].
57
+ init_per_stage_info (StageNodes ) ->
58
+ [{Stage , # stage_details {}} || {Stage , Nodes } <- StageNodes , Nodes =/= []].
59
59
60
60
% % For backward compatibility.
61
61
get_progress (# stage_info {aggregated = Aggregated }) ->
62
62
Aggregated .
63
63
64
- init_per_stage_info (Stages ) ->
65
- [{Stage , # stage_details {
66
- start_time = false ,
67
- complete_time = false ,
68
- sub_stages = init_per_stage_info (SubStages ),
69
- notable_events = []
70
- }} || {Stage , Nodes , SubStages } <- Stages , Nodes =/= []].
71
-
72
64
update_progress (
73
65
Stage , StageProgress ,
74
66
# stage_info {per_stage_progress = OldPerStageProgress } = StageInfo ) ->
@@ -85,7 +77,7 @@ do_update_progress(Stage, StageProgress, PerStage) ->
85
77
dict :merge (fun (_ , _ , New ) ->
86
78
New
87
79
end , OldStageProgress , StageProgress )
88
- end , PerStage ).
80
+ end , StageProgress , PerStage ).
89
81
90
82
aggregate (PerStage ) ->
91
83
TmpAggr = dict :fold (
@@ -212,32 +204,39 @@ get_per_stage_progress(PerStageProgress) ->
212
204
dict :to_list (StageProgress )
213
205
end , PerStageProgress ).
214
206
215
- update_stage_info ({started , Time }, StageInfo ) ->
207
+ update_stage ({started , { Time , _ } }, StageInfo ) ->
216
208
StageInfo # stage_details {start_time = Time ,
217
209
complete_time = false };
218
- update_stage_info ({completed , Time }, StageInfo ) ->
210
+ update_stage ({completed , Time }, StageInfo ) ->
219
211
StageInfo # stage_details {complete_time = Time };
220
- update_stage_info ({notable_event , TS , Text },
221
- # stage_details {notable_events = NotableEvents } = StageInfo ) ->
212
+ update_stage ({notable_event , TS , Text },
213
+ # stage_details {notable_events = NotableEvents } = StageInfo ) ->
222
214
Time = binarify_timestamp (TS ),
223
215
Msg = list_to_binary (Text ),
224
216
StageInfo # stage_details {notable_events = [{Time , Msg } | NotableEvents ]}.
225
217
226
- update_stage_info (Stage , StageInfoUpdate ,
227
- # stage_info {per_stage_info = PerStageInfo } = StageInfo ) ->
218
+ update_stage_info (Stage , StageInfoUpdate , StageInfo ) ->
219
+ NewStageInfo = maybe_create (Stage , StageInfoUpdate , StageInfo ,
220
+ fun maybe_create_new_stage_progress /3 ),
221
+ update_stage_info_inner (Stage , StageInfoUpdate , NewStageInfo ).
222
+
223
+ update_stage_info_inner (Stage , StageInfoUpdate ,
224
+ # stage_info {per_stage_info = PerStageInfo } = StageInfo ) ->
228
225
NewPerStageInfo = update_stage_info_rec (Stage , StageInfoUpdate ,
229
226
PerStageInfo ),
230
227
StageInfo # stage_info {per_stage_info = NewPerStageInfo }.
231
228
232
- update_stage_info_rec ([Stage | SubStages ], StageInfoUpdate , AllStageInfo ) ->
229
+ update_stage_info_rec ([Stage | SubStages ] = AllStages , StageInfoUpdate ,
230
+ AllStageInfo ) ->
233
231
case lists :keysearch (Stage , 1 , AllStageInfo ) of
234
232
false ->
235
- AllStageInfo ;
233
+ maybe_create (AllStages , StageInfoUpdate , AllStageInfo ,
234
+ fun create_stage /3 );
236
235
{value , {Stage , OldStageInfo }} ->
237
236
NewStageInfo =
238
237
case SubStages of
239
238
[] ->
240
- update_stage_info (StageInfoUpdate , OldStageInfo );
239
+ update_stage (StageInfoUpdate , OldStageInfo );
241
240
_ ->
242
241
NewSubStages = update_stage_info_rec (
243
242
SubStages ,
@@ -248,3 +247,32 @@ update_stage_info_rec([Stage | SubStages], StageInfoUpdate, AllStageInfo) ->
248
247
end ,
249
248
lists :keyreplace (Stage , 1 , AllStageInfo , {Stage , NewStageInfo })
250
249
end .
250
+
251
+ create_new_field ({started , {_ , []}}) ->
252
+ false ;
253
+ create_new_field ({started , {_ , _ }}) ->
254
+ true ;
255
+ create_new_field (_ ) ->
256
+ false .
257
+
258
+ maybe_create (Stage , Info , Old , Fun ) ->
259
+ case create_new_field (Info ) of
260
+ true -> Fun (Stage , Info , Old );
261
+ false -> Old
262
+ end .
263
+
264
+ create_stage ([Stage | _ ] = AllStages , {started , {_ ,_ }} = Info , AllStageInfo ) ->
265
+ update_stage_info_rec (AllStages , Info ,
266
+ [{Stage , # stage_details {}} | AllStageInfo ]).
267
+
268
+ maybe_create_new_stage_progress (
269
+ Stage , {started , {_ , Nodes }},
270
+ # stage_info {per_stage_progress = PerStageProgress } = StageInfo ) ->
271
+ ProgressStage = lists :last (Stage ),
272
+ case dict :find (ProgressStage , PerStageProgress ) of
273
+ {ok , _ } ->
274
+ StageInfo ;
275
+ _ ->
276
+ [{ProgressStage , Dict }] = init_per_stage_progress ([{ProgressStage , Nodes }]),
277
+ update_progress (ProgressStage , Dict , StageInfo )
278
+ end .
0 commit comments