Skip to content

Commit

Permalink
WorkQueueExecutor: remove atexit handler (#3407)
Browse files Browse the repository at this point in the history
This PR removes the atexit handler of WorkQueueExecutor as discussed in #3334.

Co-authored-by: Ben Clifford <[email protected]>
  • Loading branch information
tphung3 and benclifford authored May 2, 2024
1 parent 9b7d199 commit 42c0e2e
Showing 1 changed file with 0 additions and 30 deletions.
30 changes: 0 additions & 30 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
high-throughput system for delegating Parsl tasks to thousands of remote machines
"""

import atexit
import threading
import multiprocessing
import logging
Expand Down Expand Up @@ -298,24 +297,6 @@ def __init__(self,
if self.init_command != "":
self.launch_cmd = self.init_command + "; " + self.launch_cmd

# register atexit handler to cleanup when Python shuts down
atexit.register(self.atexit_cleanup)

# Attribute indicating whether this executor was started to shut it down properly.
# This safeguards cases where an object of this executor is created but
# the executor never starts, so it shouldn't be shutdowned.
self.is_started = False

# Attribute indicating whether this executor was shutdown before.
# This safeguards cases where this object is automatically shut down (e.g.,
# via atexit) and the user also explicitly calls shut down. While this is
# permitted, the effect of an executor shutdown should happen only once.
self.is_shutdown = False

def atexit_cleanup(self):
# Calls this executor's shutdown method upon Python exiting the process.
self.shutdown()

def _get_launch_command(self, block_id):
# this executor uses different terminology for worker/launch
# commands than in htex
Expand All @@ -325,8 +306,6 @@ def start(self):
"""Create submit process and collector thread to create, send, and
retrieve Parsl tasks within the Work Queue system.
"""
# Mark this executor object as started
self.is_started = True
self.tasks_lock = threading.Lock()

# Create directories for data and results
Expand Down Expand Up @@ -713,14 +692,6 @@ def shutdown(self, *args, **kwargs):
"""Shutdown the executor. Sets flag to cancel the submit process and
collector thread, which shuts down the Work Queue system submission.
"""
if not self.is_started:
# Don't shutdown if the executor never starts.
return

if self.is_shutdown:
# Don't shutdown this executor again.
return

logger.debug("Work Queue shutdown started")
self.should_stop.value = True

Expand All @@ -741,7 +712,6 @@ def shutdown(self, *args, **kwargs):
self.collector_queue.close()
self.collector_queue.join_thread()

self.is_shutdown = True
logger.debug("Work Queue shutdown completed")

@wrap_with_logs
Expand Down

0 comments on commit 42c0e2e

Please sign in to comment.