@@ -199,9 +199,6 @@ def cleanup(self, container_id: str, log_handler: LogHandler) -> None:
199199 self ._delete_services (log_handler )
200200 self ._delete_job (log_handler )
201201
202- self ._wait_for_pod_to_be_deleted (container_id , log_handler )
203- log_handler .write (f"Job { container_id } is cleaned up." , flush = True )
204-
205202 def update_logs (self , container_id : str , log_handler : LogHandler ) -> None :
206203 try :
207204 pod = self ._get_job_pod (container_id )
@@ -460,7 +457,7 @@ def _create_job(
460457 job_result = self .client .batch_api .create_namespaced_job (
461458 namespace = self .namespace , body = job_spec
462459 )
463- LOGGER .info (f"Submitted Job template: { self .job_name } , " )
460+ LOGGER .info (f"Submitted Job template: { self .job_name } " )
464461 except ApiException as e :
465462 LOGGER .error (f"API Exception { e } " )
466463 raise ContainerStartError (str (e )) from e
@@ -475,27 +472,66 @@ def _delete_job(self, log_handler: LogHandler) -> None:
475472 timeout_seconds = 0 ,
476473 )
477474
478- if activation_job .items and activation_job .items [0 ].metadata :
479- activation_job_name = activation_job .items [0 ].metadata .name
480- result = self .client .batch_api .delete_namespaced_job (
481- name = activation_job_name ,
482- namespace = self .namespace ,
483- propagation_policy = "Background" ,
484- )
485-
486- if result .status == "Failure" :
487- raise ContainerCleanupError (f"{ result } " )
488- else :
475+ if not (activation_job .items and activation_job .items [0 ].metadata ):
489476 LOGGER .info (f"Job for { self .job_name } has been removed" )
490- log_handler .write (
491- f"Job for { self .job_name } has been removed." , True
492- )
477+ return
478+
479+ activation_job_name = activation_job .items [0 ].metadata .name
480+ self ._delete_job_resource (activation_job_name , log_handler )
493481
494482 except ApiException as e :
495483 raise ContainerCleanupError (
496484 f"Stop of { self .job_name } Failed: \n { e } "
497485 ) from e
498486
487+ def _delete_job_resource (
488+ self , job_name : str , log_handler : LogHandler
489+ ) -> None :
490+ result = self .client .batch_api .delete_namespaced_job (
491+ name = job_name ,
492+ namespace = self .namespace ,
493+ propagation_policy = "Background" ,
494+ )
495+
496+ if result .status == "Failure" :
497+ raise ContainerCleanupError (f"{ result } " )
498+
499+ watcher = watch .Watch ()
500+ try :
501+ for event in watcher .stream (
502+ self .client .core_api .list_namespaced_pod ,
503+ namespace = self .namespace ,
504+ label_selector = f"job-name={ self .job_name } " ,
505+ timeout_seconds = POD_DELETE_TIMEOUT ,
506+ ):
507+ if event ["type" ] == "DELETED" :
508+ log_handler .write (
509+ f"Pod '{ self .job_name } ' is deleted." ,
510+ flush = True ,
511+ )
512+ break
513+
514+ log_handler .write (
515+ f"Job { self .job_name } is cleaned up." ,
516+ flush = True ,
517+ )
518+ except ApiException as e :
519+ if e .status == status .HTTP_404_NOT_FOUND :
520+ message = (
521+ f"Pod '{ self .job_name } ' not found (404), "
522+ "assuming it's already deleted."
523+ )
524+ log_handler .write (message , flush = True )
525+ return
526+ log_handler .write (
527+ f"Error while waiting for deletion: { e } " , flush = True
528+ )
529+ raise ContainerCleanupError (
530+ f"Error during cleanup: { str (e )} "
531+ ) from e
532+ finally :
533+ watcher .stop ()
534+
499535 def _wait_for_pod_to_start (self , log_handler : LogHandler ) -> None :
500536 watcher = watch .Watch ()
501537 LOGGER .info ("Waiting for pod to start" )
@@ -532,50 +568,6 @@ def _wait_for_pod_to_start(self, log_handler: LogHandler) -> None:
532568 finally :
533569 watcher .stop ()
534570
535- def _wait_for_pod_to_be_deleted (
536- self , pod_name : str , log_handler : LogHandler
537- ) -> None :
538- log_handler .write (
539- f"Waiting for pod '{ pod_name } ' to be deleted..." ,
540- flush = True ,
541- )
542- watcher = watch .Watch ()
543- try :
544- for event in watcher .stream (
545- self .client .core_api .list_namespaced_pod ,
546- namespace = self .namespace ,
547- label_selector = f"job-name={ self .job_name } " ,
548- timeout_seconds = POD_DELETE_TIMEOUT ,
549- ):
550- LOGGER .debug (f"Received event: { event } " )
551- pod_name = event ["object" ].metadata .name
552- pod_phase = event ["object" ].status .phase
553- LOGGER .debug (f"Pod { pod_name } - { pod_phase } " )
554-
555- if event ["type" ] == "DELETED" :
556- # Pod successfully deleted
557- log_handler .write (
558- f"Pod '{ pod_name } ' has been deleted." , flush = True
559- )
560- break
561- except ApiException as e :
562- if e .status == status .HTTP_404_NOT_FOUND :
563- message = (
564- f"Pod '{ pod_name } ' not found (404), "
565- "assuming it's already deleted."
566- )
567- log_handler .write (message , flush = True )
568- return
569- log_handler .write (
570- f"Error while waiting for deletion: { e } " , flush = True
571- )
572- raise ContainerCleanupError (
573- f"Error during cleanup: { str (e )} "
574- ) from e
575-
576- finally :
577- watcher .stop ()
578-
579571 def _set_namespace (self ) -> None :
580572 namespace_file = (
581573 "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
0 commit comments