22
33import functools
44import logging
5+ import math
56import os
67import random
78import select
@@ -89,9 +90,11 @@ def __init__(self, auxiliary=False):
8990 self .wakeup_unblock = False
9091 self .wakeup_handle = False
9192 self .cancel_task = False
93+ self .unblocked_count = 0
9294
9395 self .ignored_task_ids = []
9496 self .ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL
97+ self .false_alarms = 0
9598
9699 self .auxiliary = auxiliary
97100 self .task = None
@@ -158,6 +161,14 @@ def _signal_handler(self, thesignal, frame):
158161 self .shutdown_requested = True
159162
160163 def _pg_notify_handler (self , notification ):
164+ if notification .channel == "pulp_worker_broadcast" :
165+ key , value = notification .payload .split (":" , maxsplit = 1 )
166+ _logger .debug ("broadcast message recieved: %s: %s" , key , value )
167+ if key == "unblocked_count" :
168+ self .unblocked_count = int (value )
169+ self .wakeup_handle = self .unblocked_count > 0
170+ elif key == "metrics_heartbeat" :
171+ self .last_metric_heartbeat = datetime .fromisoformat (key )
161172 if notification .channel == "pulp_worker_wakeup" :
162173 if notification .payload == TASK_WAKEUP_UNBLOCK :
163174 # Auxiliary workers don't do this.
@@ -171,6 +182,7 @@ def _pg_notify_handler(self, notification):
171182 self .wakeup_handle = True
172183
173184 elif notification .channel == "pulp_worker_metrics_heartbeat" :
185+ # TODO (in one of the next releases) Remove that superseeded channel.
174186 self .last_metric_heartbeat = datetime .fromisoformat (notification .payload )
175187 elif self .task and notification .channel == "pulp_worker_cancel" :
176188 if notification .payload == str (self .task .pk ):
@@ -257,6 +269,7 @@ def record_unblocked_waiting_tasks_metric(self, now):
257269 )
258270
259271 self .cursor .execute (f"NOTIFY pulp_worker_metrics_heartbeat, '{ str (now )} '" )
272+ self .broadcast ("metrics_heartbeat" , now )
260273
261274 def beat (self ):
262275 now = timezone .now ()
@@ -278,6 +291,9 @@ def beat(self):
278291 if self .otel_enabled and now > self .last_metric_heartbeat + self .heartbeat_period :
279292 self .record_unblocked_waiting_tasks_metric (now )
280293
294+ def broadcast (self , key , value ):
295+ self .cursor .execute ("SELECT pg_notify('pulp_worker_broadcast', %s)" , (f"{ key } :{ value } " ,))
296+
281297 def notify_workers (self , reason ):
282298 self .cursor .execute ("SELECT pg_notify('pulp_worker_wakeup', %s)" , (reason ,))
283299
@@ -345,14 +361,17 @@ def unblock_tasks(self):
345361
346362 self .wakeup_unblock = False
347363 result = self ._unblock_tasks ()
348- if result is not None and (
349- Task .objects .filter (
350- state__in = [TASK_STATES .WAITING , TASK_STATES .CANCELING ], app_lock = None
364+ if result is not None :
365+ unblocked_count = (
366+ Task .objects .filter (
367+ state__in = [TASK_STATES .WAITING , TASK_STATES .CANCELING ], app_lock = None
368+ )
369+ .exclude (unblocked_at = None )
370+ .count ()
351371 )
352- .exclude (unblocked_at = None )
353- .exists ()
354- ):
355- self .notify_workers (TASK_WAKEUP_HANDLE )
372+ if unblocked_count > 0 :
373+ self .notify_workers (TASK_WAKEUP_HANDLE )
374+ self .broadcast ("unblocked_count" , unblocked_count )
356375 return True
357376
358377 return result
@@ -369,6 +388,7 @@ def _unblock_tasks(self):
369388 .order_by ("pulp_created" )
370389 .select_related ("pulp_domain" )
371390 ):
391+ _logger .debug ("Considering task %s for unblocking." , task .pk )
372392 reserved_resources_record = task .reserved_resources_record or []
373393 exclusive_resources = [
374394 resource
@@ -389,23 +409,26 @@ def _unblock_tasks(self):
389409 )
390410 task .unblock ()
391411
392- elif (
393- task .state == TASK_STATES .WAITING
394- and task .unblocked_at is None
395- # No exclusive resource taken?
396- and not any (
397- resource in taken_exclusive_resources or resource in taken_shared_resources
398- for resource in exclusive_resources
399- )
400- # No shared resource exclusively taken?
401- and not any (resource in taken_exclusive_resources for resource in shared_resources )
402- ):
403- _logger .debug (
404- "Marking waiting task %s in domain: %s unblocked." ,
405- task .pk ,
406- task .pulp_domain .name ,
407- )
408- task .unblock ()
412+ elif task .state == TASK_STATES .WAITING and task .unblocked_at is None :
413+ if (
414+ # No exclusive resource taken?
415+ not any (
416+ resource in taken_exclusive_resources or resource in taken_shared_resources
417+ for resource in exclusive_resources
418+ )
419+ # No shared resource exclusively taken?
420+ and not any (
421+ resource in taken_exclusive_resources for resource in shared_resources
422+ )
423+ ):
424+ _logger .debug (
425+ "Marking waiting task %s in domain: %s unblocked." ,
426+ task .pk ,
427+ task .pulp_domain .name ,
428+ )
429+ task .unblock ()
430+ else :
431+ _logger .debug ("Task %s is still blocked." , task .pk )
409432 elif task .state == TASK_STATES .RUNNING and task .unblocked_at is None :
410433 # This should not happen in normal operation.
411434 # And it is only an issue if the worker running that task died, because it will
@@ -426,8 +449,8 @@ def _unblock_tasks(self):
426449 def sleep (self ):
427450 """Wait for signals on the wakeup channel while heart beating."""
428451
429- _logger .debug (_ ( "Worker %s entering sleep state." ) , self .name )
430- while not self .shutdown_requested and not self . wakeup_handle :
452+ _logger .debug ("Worker %s entering sleep state." , self .name )
453+ while not self .shutdown_requested :
431454 r , w , x = select .select (
432455 [self .sentinel , connection .connection ],
433456 [],
@@ -441,7 +464,14 @@ def sleep(self):
441464 self .unblock_tasks ()
442465 if self .sentinel in r :
443466 os .read (self .sentinel , 256 )
444- _logger .debug (_ ("Worker %s leaving sleep state." ), self .name )
467+ if self .wakeup_handle :
468+ if not self .auxiliary or random .random () < math .exp (
469+ self .unblocked_count - self .false_alarms
470+ ):
471+ _logger .debug ("Worker %s leaving sleep state." , self .name )
472+ break
473+ else :
474+ _logger .debug ("Worker %s backing off" , self .name )
445475
446476 def supervise_task (self , task ):
447477 """Call and supervise the task process while heart beating.
@@ -586,8 +616,12 @@ def handle_unblocked_tasks(self):
586616 task = self .fetch_task ()
587617 if task is None :
588618 # No task found
619+ self .false_alarms += 1
620+ _logger .debug ("False Alarms: %s" , self .false_alarms )
589621 break
590622 try :
623+ self .false_alarms //= 2
624+ _logger .debug ("False Alarms: %s" , self .false_alarms )
591625 if task .state == TASK_STATES .CANCELING :
592626 # No worker picked this task up before being canceled.
593627 # Or the worker disappeared before handling the canceling.
@@ -615,6 +649,7 @@ def run(self, burst=False):
615649 signal .signal (signal .SIGHUP , self ._signal_handler )
616650 # Subscribe to pgsql channels
617651 connection .connection .add_notify_handler (self ._pg_notify_handler )
652+ self .cursor .execute ("LISTEN pulp_worker_broadcast" )
618653 self .cursor .execute ("LISTEN pulp_worker_cancel" )
619654 self .cursor .execute ("LISTEN pulp_worker_metrics_heartbeat" )
620655 if burst :
@@ -638,4 +673,5 @@ def run(self, burst=False):
638673 self .cursor .execute ("UNLISTEN pulp_worker_wakeup" )
639674 self .cursor .execute ("UNLISTEN pulp_worker_metrics_heartbeat" )
640675 self .cursor .execute ("UNLISTEN pulp_worker_cancel" )
676+ self .cursor .execute ("UNLISTEN pulp_worker_broadcast" )
641677 self .shutdown ()
0 commit comments