@@ -63,37 +63,63 @@ func (g *Gatherer) GatherWorkloadInfo(ctx context.Context) ([]record.Record, []e
6363 return gatherWorkloadInfo (ctx , gatherKubeClient .CoreV1 (), gatherOpenShiftClient )
6464}
6565
66- // nolint: funlen, gocyclo, gocritic
6766func gatherWorkloadInfo (
6867 ctx context.Context ,
6968 coreClient corev1client.CoreV1Interface ,
7069 imageClient imageclient.ImageV1Interface ,
7170) ([]record.Record , []error ) {
72- // load images as we find them
73- imageCh := make (chan string , workloadGatherPageSize )
74- imagesDoneCh := gatherWorkloadImageInfo (ctx , imageClient .Images (), imageCh )
71+ imageCh , imagesDoneCh := gatherWorkloadImageInfo (ctx , imageClient .Images ())
7572
76- // load pods in order
7773 start := time .Now ()
74+ limitReached , info , err := workloadInfo (ctx , coreClient , imageCh )
75+ if err != nil {
76+ return nil , []error {err }
77+ }
78+
79+ workloadImageResize (info .PodCount )
80+
81+ records := []record.Record {
82+ {
83+ Name : "config/workload_info" ,
84+ Item : record.JSONMarshaller {Object : & info },
85+ },
86+ }
87+
88+ // wait for as many images as we can find to load
89+ handleWorkloadImageInfo (ctx , & info , start , imagesDoneCh )
90+
91+ if limitReached {
92+ return records , []error {fmt .Errorf ("the %d limit for number of pods gathered was reached" , podsLimit )}
93+ }
94+ return records , nil
95+ }
96+
97+ // nolint: funlen, gocritic, gocyclo
98+ func workloadInfo (
99+ ctx context.Context ,
100+ coreClient corev1client.CoreV1Interface ,
101+ imageCh chan string ,
102+ ) (bool , workloadPods , error ) {
103+ defer close (imageCh )
78104 limitReached := false
79105
80106 var info workloadPods
81- var namespace string
82- var namespaceHash string
107+ var namespace , namespaceHash , continueValue string
83108 var namespacePods workloadNamespacePods
84109 h := sha256 .New ()
85110
86111 // Use the Limit and Continue fields to request the pod information in chunks.
87- var continueValue string
88112 for {
89113 pods , err := coreClient .Pods ("" ).List (ctx , metav1.ListOptions {
90114 Limit : workloadGatherPageSize ,
91115 Continue : continueValue ,
92116 })
93117 if err != nil {
94- return nil , [] error { err }
118+ return false , workloadPods {}, err
95119 }
96- for _ , pod := range pods .Items {
120+
121+ for podIdx := range pods .Items {
122+ pod := pods .Items [podIdx ]
97123 // initialize the running state, including the namespace hash
98124 if pod .Namespace != namespace {
99125 if len (namespace ) != 0 {
@@ -117,45 +143,35 @@ func gatherWorkloadInfo(
117143 namespacePods .Count ++
118144
119145 switch {
120- case pod . Status . Phase == corev1 . PodSucceeded , pod . Status . Phase == corev1 . PodFailed :
146+ case isPodTerminated ( & pod ) :
121147 // track terminal pods but do not report their data
122148 namespacePods .TerminalCount ++
123149 continue
124- case pod .Status .Phase != corev1 .PodRunning && pod .Status .Phase != corev1 .PodPending ,
125- len (pod .Status .InitContainerStatuses ) != len (pod .Spec .InitContainers ),
126- len (pod .Status .ContainerStatuses ) != len (pod .Spec .Containers ):
150+ case podCanBeIgnored (& pod ):
127151 // consider pods that are in a known state
128152 // or pods without filled out status are invalid
129153 namespacePods .IgnoredCount ++
130154 continue
131155 }
132156
133- var podShape workloadPodShape
134- var ok bool
135- podShape .InitContainers , ok = calculateWorkloadContainerShapes (h , pod .Spec .InitContainers , pod .Status .InitContainerStatuses )
157+ podShape , ok := calculatePodShape (h , & pod )
136158 if ! ok {
137159 namespacePods .InvalidCount ++
138160 continue
139161 }
140- podShape .Containers , ok = calculateWorkloadContainerShapes (h , pod .Spec .Containers , pod .Status .ContainerStatuses )
141- if ! ok {
142- namespacePods .InvalidCount ++
143- continue
144- }
145-
146- podShape .RestartsAlways = pod .Spec .RestartPolicy == corev1 .RestartPolicyAlways
147162
148163 if index := workloadPodShapeIndex (namespacePods .Shapes , podShape ); index != - 1 {
149164 namespacePods .Shapes [index ].Duplicates ++
150- } else {
151- namespacePods .Shapes = append (namespacePods .Shapes , podShape )
165+ continue
166+ }
167+ namespacePods .Shapes = append (namespacePods .Shapes , podShape )
152168
153- for _ , container := range podShape .InitContainers {
154- imageCh <- container .ImageID
155- }
156- for _ , container := range podShape . Containers {
157- imageCh <- container . ImageID
158- }
169+ for _ , container := range podShape .InitContainers {
170+ imageCh <- container .ImageID
171+ }
172+
173+ for _ , container := range podShape . Containers {
174+ imageCh <- container . ImageID
159175 }
160176 }
161177
@@ -166,6 +182,7 @@ func gatherWorkloadInfo(
166182 }
167183 continueValue = pods .Continue
168184 }
185+
169186 // add the last set of pods
170187 if len (namespace ) != 0 {
171188 if info .Namespaces == nil {
@@ -175,23 +192,53 @@ func gatherWorkloadInfo(
175192 info .PodCount += namespacePods .Count
176193 }
177194
178- workloadImageResize (info .PodCount )
195+ return limitReached , info , nil
196+ }
179197
180- records := []record.Record {
181- {
182- Name : "config/workload_info" ,
183- Item : record.JSONMarshaller {Object : & info },
184- },
198+ func isPodTerminated (pod * corev1.Pod ) bool {
199+ return pod .Status .Phase == corev1 .PodSucceeded ||
200+ pod .Status .Phase == corev1 .PodFailed
201+ }
202+
203+ func podCanBeIgnored (pod * corev1.Pod ) bool {
204+ return pod .Status .Phase != corev1 .PodRunning && pod .Status .Phase != corev1 .PodPending ||
205+ len (pod .Status .InitContainerStatuses ) != len (pod .Spec .InitContainers ) ||
206+ len (pod .Status .ContainerStatuses ) != len (pod .Spec .Containers )
207+ }
208+
209+ func calculatePodShape (h hash.Hash , pod * corev1.Pod ) (workloadPodShape , bool ) {
210+ var podShape workloadPodShape
211+ var ok bool
212+ podShape .InitContainers , ok = calculateWorkloadContainerShapes (h , pod .Spec .InitContainers , pod .Status .InitContainerStatuses )
213+ if ! ok {
214+ return workloadPodShape {}, false
185215 }
186216
187- // wait for as many images as we can find to load
217+ podShape .Containers , ok = calculateWorkloadContainerShapes (h , pod .Spec .Containers , pod .Status .ContainerStatuses )
218+ if ! ok {
219+ return workloadPodShape {}, false
220+ }
221+
222+ podShape .RestartsAlways = pod .Spec .RestartPolicy == corev1 .RestartPolicyAlways
223+
224+ return podShape , true
225+ }
226+
227+ func handleWorkloadImageInfo (
228+ ctx context.Context ,
229+ info * workloadPods ,
230+ start time.Time ,
231+ imagesDoneCh <- chan workloadImageInfo ,
232+ ) {
188233 var imageInfo workloadImageInfo
234+
189235 // wait proportional to the number of pods + a floor
190236 waitDuration := time .Second * time .Duration (info .PodCount )/ 10 + 15 * time .Second
237+
191238 klog .V (2 ).Infof ("Loaded pods in %s, will wait %s for image data" ,
192239 time .Since (start ).Round (time .Second ).String (),
193240 waitDuration .Round (time .Second ).String ())
194- close ( imageCh )
241+
195242 select {
196243 case <- ctx .Done ():
197244 select {
@@ -208,19 +255,15 @@ func gatherWorkloadInfo(
208255
209256 info .Images = imageInfo .images
210257 info .ImageCount = imageInfo .count
211- if limitReached {
212- return records , []error {fmt .Errorf ("the %d limit for number of pods gathered was reached" , podsLimit )}
213- }
214- return records , nil
215258}
216259
217- // nolint: gocyclo
260+ // nolint: gocritic
218261func gatherWorkloadImageInfo (
219262 ctx context.Context ,
220263 imageClient imageclient.ImageInterface ,
221- imageCh <- chan string ,
222- ) <- chan workloadImageInfo {
264+ ) (chan string , <- chan workloadImageInfo ) {
223265 images := make (map [string ]workloadImage )
266+ imageCh := make (chan string , workloadGatherPageSize )
224267 imagesDoneCh := make (chan workloadImageInfo )
225268
226269 go func () {
@@ -254,17 +297,8 @@ func gatherWorkloadImageInfo(
254297 for k := range pendingIDs {
255298 delete (pendingIDs , k )
256299 }
257- if _ , ok := images [imageID ]; ! ok {
258- pendingIDs [imageID ] = struct {}{}
259- }
260- for l := len (imageCh ); l > 0 ; l = len (imageCh ) {
261- for i := 0 ; i < l ; i ++ {
262- imageID := <- imageCh
263- if _ , ok := images [imageID ]; ! ok {
264- pendingIDs [imageID ] = struct {}{}
265- }
266- }
267- }
300+
301+ readImageSHAs (pendingIDs , images , imageID , imageCh )
268302
269303 for imageID := range pendingIDs {
270304 if _ , ok := images [imageID ]; ok {
@@ -274,29 +308,55 @@ func gatherWorkloadImageInfo(
274308 images [imageID ] = image
275309 continue
276310 }
311+
277312 images [imageID ] = workloadImage {}
278- start := time .Now ()
279- image , err := imageClient .Get (ctx , imageID , metav1.GetOptions {})
280- if errors .IsNotFound (err ) {
281- klog .V (4 ).Infof ("No image %s (%s)" , imageID , time .Since (start ).Round (time .Millisecond ).String ())
313+ image := imageFromID (ctx , imageClient , imageID )
314+ if image == nil {
282315 continue
283316 }
284- if err == context .Canceled {
285- return
286- }
287- if err != nil {
288- klog .Errorf ("Unable to retrieve image %s" , imageID )
289- continue
290- }
291- klog .V (4 ).Infof ("Found image %s (%s)" , imageID , time .Since (start ).Round (time .Millisecond ).String ())
292317 info := calculateWorkloadInfo (h , image )
293318 images [imageID ] = info
294319 workloadImageAdd (imageID , info )
295320 }
296321 }
297322 }
298323 }()
299- return imagesDoneCh
324+ return imageCh , imagesDoneCh
325+ }
326+
327+ // readImageSHAs drains the channel of any image IDs
328+ func readImageSHAs (pendingIDs map [string ]struct {}, images map [string ]workloadImage , id string , imageCh <- chan string ) {
329+ if _ , ok := images [id ]; ! ok {
330+ pendingIDs [id ] = struct {}{}
331+ }
332+ for l := len (imageCh ); l > 0 ; l = len (imageCh ) {
333+ for i := 0 ; i < l ; i ++ {
334+ imageID := <- imageCh
335+ if _ , ok := images [imageID ]; ! ok {
336+ pendingIDs [imageID ] = struct {}{}
337+ }
338+ }
339+ }
340+ }
341+
342+ // imageFromID gets the container image from given ID
343+ func imageFromID (ctx context.Context , client imageclient.ImageInterface , id string ) * imagev1.Image {
344+ start := time .Now ()
345+ image , err := client .Get (ctx , id , metav1.GetOptions {})
346+ if errors .IsNotFound (err ) {
347+ klog .V (4 ).Infof ("No image %s (%s)" , id , time .Since (start ).Round (time .Millisecond ).String ())
348+ return nil
349+ }
350+ if err == context .Canceled {
351+ return nil
352+ }
353+ if err != nil {
354+ klog .Errorf ("Unable to retrieve image %s" , id )
355+ return nil
356+ }
357+
358+ klog .V (4 ).Infof ("Found image %s (%s)" , id , time .Since (start ).Round (time .Millisecond ).String ())
359+ return image
300360}
301361
302362// workloadPodShapeIndex attempts to find an equivalent shape within the current
0 commit comments