From 938f2cdd60296e9c103e231fef3e7f588bda2fe6 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 28 Oct 2024 17:09:23 +0100 Subject: [PATCH 1/7] Commit before properly working on it with debugger. --- src/aiida/engine/processes/calcjobs/tasks.py | 12 +- src/aiida/engine/transports.py | 157 ++++++++++++++++++- 2 files changed, 165 insertions(+), 4 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 8b8231634f..cc36d8415c 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -51,6 +51,12 @@ logger = logging.getLogger(__name__) +## def log_to_file(message): +# current_time = datetime.now().strftime("%H:%M:%S") +# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f: +# f.write(f"{current_time}: {message}(tasks.py)\n") + + class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" @@ -72,7 +78,7 @@ async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, c node = process.node if node.get_state() == CalcJobState.SUBMITTING: - logger.warning(f'CalcJob<{node.pk}> already marked as SUBMITTING, skipping task_update_job') + logger.warning(f'CalcJob<{node.pk}> already marked as SUBMITTING, skipping task_upload_job') return initial_interval = get_config_option(RETRY_INTERVAL_OPTION) @@ -496,9 +502,13 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override result: plumpy.process_states.State = self process_status = f'Waiting for transport task: {self._command}' + + # ## log_to_file(f'TRANSPORT_QUEUE: {transport_queue}') + node.set_process_status(process_status) try: + # ? Possibly implement here to keep connection open if self._command == UPLOAD_COMMAND: skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue) if skip_submit: diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index fe32df7884..6bc796a7d4 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -13,8 +13,10 @@ import contextvars import logging import traceback +from datetime import datetime from typing import TYPE_CHECKING, Awaitable, Dict, Hashable, Iterator, Optional +from aiida.common import timezone from aiida.orm import AuthInfo if TYPE_CHECKING: @@ -22,6 +24,15 @@ _LOGGER = logging.getLogger(__name__) +# Open and append to the log file at different points + +# Open and append to the log file, prepending the current time +## def log_to_file(message): +# datetime.now() +# current_time = datetime.now().strftime("%H:%M:%S") +# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f: +# f.write(f"{current_time}: {message} (transports.py)\n") + class TransportRequest: """Information kept about request for a transport object""" @@ -30,6 +41,7 @@ def __init__(self): super().__init__() self.future: asyncio.Future = asyncio.Future() self.count = 0 + self.transport_closer = None class TransportQueue: @@ -47,6 +59,11 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): """:param loop: An asyncio event, will use `asyncio.get_event_loop()` if not supplied""" self._loop = loop if loop is not None else asyncio.get_event_loop() self._transport_requests: Dict[Hashable, TransportRequest] = {} + self._last_open_time = None + # self._last_submission_time = None + self._last_close_time = None + # self._last_transport_request: Dict[Hashable, str] = {} + # self._was_last_request_special: bool = False @property def loop(self) -> asyncio.AbstractEventLoop: @@ -68,7 +85,11 @@ async def transport_task(transport_queue, authinfo): :return: A future that can be yielded to give the transport """ open_callback_handle = None + close_callback_handle = None transport_request = self._transport_requests.get(authinfo.pk, None) + # ## log_to_file(f'EventLoop: {asyncio.all_tasks(self.loop)}') + + # ## log_to_file(f'transport_request: {transport_request}') if transport_request is None: # There is no existing request for this transport (i.e. on this authinfo) @@ -76,8 +97,12 @@ async def transport_task(transport_queue, authinfo): self._transport_requests[authinfo.pk] = transport_request transport = authinfo.get_transport() - safe_open_interval = transport.get_safe_open_interval() + # safe_open_interval = transport.get_safe_open_interval() + safe_open_interval = 15 + # Check here if last_open_time > safe_interval, one could immediately open the transport + # This should be the very first request, after a while + ## log_to_file(f'OPEN_CALLBACK_HANDLE BEFORE DO_OPEN: {open_callback_handle}') def do_open(): """Actually open the transport""" if transport_request.count > 0: @@ -85,6 +110,8 @@ def do_open(): _LOGGER.debug('Transport request opening transport for %s', authinfo) try: transport.open() + self._last_open_time = datetime.now() + ## log_to_file(f'LAST_OPEN_TIME: {self._last_open_time}') except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -99,10 +126,88 @@ def do_open(): # passed around to many places, including outside aiida-core (e.g. paramiko). Anyone keeping a reference # to this handle would otherwise keep the Process context (and thus the process itself) in memory. # See https://github.com/aiidateam/aiida-core/issues/4698 - open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + + # Pseudocode + # get_metadata from authinfo + # see if there is a last_close, if not None, compute seconds from then to now + # if < safe_interval, wait for difference + # if larger, call call_later, open call_later but with 0 + metadata = authinfo.get_metadata() + last_close_time = metadata.get('last_close_time') + + log_file_path = '/home/geiger_j/aiida_projects/thor-dev/transport-log.txt' + + debug_info = '\nDEBUG START\n' + + # if last_close_time is None: + # # Submit immediately -> This is not ever being triggered + # open_callback_handle = self._loop.call_later(1, do_open, context=contextvars.Context()) + + # debug_info += ( + # f"LAST_CLOSE_TIME_NONE: submit immediately\n" + # f"LAST_CLOSE_TIME: {last_close_time}\n" + # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" + # ) + + last_close_time = datetime.strptime(last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z') + timedelta_seconds = (timezone.localtime(timezone.now()) - last_close_time).total_seconds() + + if timedelta_seconds > safe_open_interval: + debug_info += ( + f'TIMEDELTA > SAFE_OPEN_INTERVAL: submit immediately\n' f'LAST_CLOSE_TIME: {last_close_time}\n' + # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" + ) + + open_callback_handle = self._loop.call_later(0, do_open, context=contextvars.Context()) + # self._was_last_request_special = True + + else: + # If the last one was a special request, wait the safe_open_interval + debug_info += ( + f'TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n' + f'LAST_CLOSE_TIME: {last_close_time}\n' + # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" + ) + + open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + + # if self._was_last_request_special: + + # # If the last one was a special request, wait the safe_open_interval + # debug_info += ( + # f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n" + # f"LAST_CLOSE_TIME: {last_close_time}\n" + # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" + # ) + + # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # self._was_last_request_special = False + + # else: + + # # This is also a special request + # # Or, should it be? Could also remove the if/else, and just wait the safe_open_interval, as is the default + # debug_info += ( + # f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request not special: submit after timedelta\n" + # f"LAST_CLOSE_TIME: {last_close_time}\n" + # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" + # f"TIMEDELTA: {timedelta_seconds}\n" + # ) + + # open_callback_handle = self._loop.call_later(timedelta_seconds, do_open, context=contextvars.Context()) + # self._was_last_request_special = True + # open_callback_handle = self._loop.call_later(5, do_open, context=contextvars.Context()) + + # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + + with open(log_file_path, 'a') as log_file: + log_file.write(debug_info) try: transport_request.count += 1 + self._last_submission_time = datetime.now() + ## log_to_file(f'LAST_SUBMISSION_TIME: {self._last_submission_time}') + ## log_to_file(f'TRANSPORT_REQUEST_COUNT: {transport_request.count}') yield transport_request.future except asyncio.CancelledError: # note this is only required in python<=3.7, @@ -113,14 +218,60 @@ def do_open(): _LOGGER.error('Exception whilst using transport:\n%s', traceback.format_exc()) raise finally: + ## log_to_file(f'FINALLY BLOCK - TRANSPORT_REQUEST.COUNT: {transport_request.count}') transport_request.count -= 1 assert transport_request.count >= 0, 'Transport request count dropped below 0!' # Check if there are no longer any users that want the transport if transport_request.count == 0: + ## log_to_file(f'TRANSPORT_REQUEST.FUTURE.DONE(): {transport_request.future.done()}') if transport_request.future.done(): - _LOGGER.debug('Transport request closing transport for %s', authinfo) + ## log_to_file(f'DATETIME: {(datetime.now() - self._last_open_time).total_seconds() > 5}') + + if (datetime.now() - self._last_open_time).total_seconds() > 5: + + def close_transport(): + transport_request.future.result().close() + """Close the transport if conditions are met.""" + ## log_to_file("CLOSE_TRANSPORT") + ## log_to_file(f"LAST_CLOSE_TIME: {self._last_close_time}") + + close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context()) + transport_request.transport_closer = close_callback_handle + # transport_request.transport_closer = None + # else: + # If not yet time to close, schedule again + # close_callback_handle = self._loop.call_later( + # 5, close_transport, context=contextvars.Context()) + # transport_request.transport_closer = close_callback_handle + + # ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}") + # if transport_request.transport_closer is None: + # ## log_to_file("INSIDE") + # self._last_close_time = datetime.now() + # else: + # return + + # This should be replaced with the call_later close_callback_handle invocation + # ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}") + transport_request.future.result().close() + + # Get old metadata from authinfo, and set variable last_close_time to datetime now in isoformat + # Need to convert to str, otherwise not json-serializable when setting authinfo metadata + # if self._was_last_request_special is True: + last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z') + authinfo.set_metadata({'last_close_time': last_close_time}) + # else: + # # asyncio.sleep(5) + # transport_request.count += 1 + # self._was_last_request_special = True + # yield transport_request.future + elif open_callback_handle is not None: open_callback_handle.cancel() self._transport_requests.pop(authinfo.pk, None) + + +# Should wait first time 0, then always ~30 +# Try out with manual waiting times in between From d2220d34c4ce06b1846707528c588d958a3ac82e Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 28 Oct 2024 18:03:36 +0100 Subject: [PATCH 2/7] Clean up from stash. --- src/aiida/engine/processes/calcjobs/tasks.py | 8 - src/aiida/engine/transports.py | 160 +++++-------------- 2 files changed, 37 insertions(+), 131 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index cc36d8415c..b1ea8c482c 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -51,12 +51,6 @@ logger = logging.getLogger(__name__) -## def log_to_file(message): -# current_time = datetime.now().strftime("%H:%M:%S") -# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f: -# f.write(f"{current_time}: {message}(tasks.py)\n") - - class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" @@ -503,8 +497,6 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override process_status = f'Waiting for transport task: {self._command}' - # ## log_to_file(f'TRANSPORT_QUEUE: {transport_queue}') - node.set_process_status(process_status) try: diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 6bc796a7d4..56baeb9237 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -24,15 +24,6 @@ _LOGGER = logging.getLogger(__name__) -# Open and append to the log file at different points - -# Open and append to the log file, prepending the current time -## def log_to_file(message): -# datetime.now() -# current_time = datetime.now().strftime("%H:%M:%S") -# with open("/home/geiger_j/aiida_projects/aiida-dev/ssh-alive-testing/transport-log.txt", "a") as f: -# f.write(f"{current_time}: {message} (transports.py)\n") - class TransportRequest: """Information kept about request for a transport object""" @@ -41,7 +32,8 @@ def __init__(self): super().__init__() self.future: asyncio.Future = asyncio.Future() self.count = 0 - self.transport_closer = None + # ? What do I need this for? + # self.transport_closer = None class TransportQueue: @@ -60,10 +52,10 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): self._loop = loop if loop is not None else asyncio.get_event_loop() self._transport_requests: Dict[Hashable, TransportRequest] = {} self._last_open_time = None - # self._last_submission_time = None self._last_close_time = None + self._last_request_special: bool = False + # self._last_submission_time = None # self._last_transport_request: Dict[Hashable, str] = {} - # self._was_last_request_special: bool = False @property def loop(self) -> asyncio.AbstractEventLoop: @@ -85,11 +77,8 @@ async def transport_task(transport_queue, authinfo): :return: A future that can be yielded to give the transport """ open_callback_handle = None - close_callback_handle = None + # close_callback_handle = None transport_request = self._transport_requests.get(authinfo.pk, None) - # ## log_to_file(f'EventLoop: {asyncio.all_tasks(self.loop)}') - - # ## log_to_file(f'transport_request: {transport_request}') if transport_request is None: # There is no existing request for this transport (i.e. on this authinfo) @@ -102,7 +91,6 @@ async def transport_task(transport_queue, authinfo): # Check here if last_open_time > safe_interval, one could immediately open the transport # This should be the very first request, after a while - ## log_to_file(f'OPEN_CALLBACK_HANDLE BEFORE DO_OPEN: {open_callback_handle}') def do_open(): """Actually open the transport""" if transport_request.count > 0: @@ -111,7 +99,6 @@ def do_open(): try: transport.open() self._last_open_time = datetime.now() - ## log_to_file(f'LAST_OPEN_TIME: {self._last_open_time}') except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -127,87 +114,45 @@ def do_open(): # to this handle would otherwise keep the Process context (and thus the process itself) in memory. # See https://github.com/aiidateam/aiida-core/issues/4698 - # Pseudocode - # get_metadata from authinfo - # see if there is a last_close, if not None, compute seconds from then to now - # if < safe_interval, wait for difference - # if larger, call call_later, open call_later but with 0 - metadata = authinfo.get_metadata() - last_close_time = metadata.get('last_close_time') - - log_file_path = '/home/geiger_j/aiida_projects/thor-dev/transport-log.txt' + # if self._last_request_special: + # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # self._last_request_special = False - debug_info = '\nDEBUG START\n' - - # if last_close_time is None: - # # Submit immediately -> This is not ever being triggered + # # First request, submit immediately + # # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? + # if self._last_close_time is None: # open_callback_handle = self._loop.call_later(1, do_open, context=contextvars.Context()) + # self._last_request_special = True - # debug_info += ( - # f"LAST_CLOSE_TIME_NONE: submit immediately\n" - # f"LAST_CLOSE_TIME: {last_close_time}\n" - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - # ) - - last_close_time = datetime.strptime(last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z') - timedelta_seconds = (timezone.localtime(timezone.now()) - last_close_time).total_seconds() - - if timedelta_seconds > safe_open_interval: - debug_info += ( - f'TIMEDELTA > SAFE_OPEN_INTERVAL: submit immediately\n' f'LAST_CLOSE_TIME: {last_close_time}\n' - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - ) + # # self._last_close_time = datetime.strptime(self._last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z') + # else: + # timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() - open_callback_handle = self._loop.call_later(0, do_open, context=contextvars.Context()) - # self._was_last_request_special = True + # if timedelta_seconds > safe_open_interval: + # # ! This could also be `_loop.call_soon` which has an implicit delay of 0s - else: - # If the last one was a special request, wait the safe_open_interval - debug_info += ( - f'TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n' - f'LAST_CLOSE_TIME: {last_close_time}\n' - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - ) + # open_callback_handle = self._loop.call_later(timedelta_seconds-safe_open_interval, do_open, context=contextvars.Context()) + # self._last_request_special = True - open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # else: + # # If the last one was a special request, wait the safe_open_interval + # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # if self._was_last_request_special: - - # # If the last one was a special request, wait the safe_open_interval - # debug_info += ( - # f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request special: submit after safe_open_interval\n" - # f"LAST_CLOSE_TIME: {last_close_time}\n" - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - # ) + # if self._last_request_special: # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # self._was_last_request_special = False + # self._last_request_special = False # else: - - # # This is also a special request - # # Or, should it be? Could also remove the if/else, and just wait the safe_open_interval, as is the default - # debug_info += ( - # f"TIMEDELTA < SAFE_OPEN_INTERVAL and last request not special: submit after timedelta\n" - # f"LAST_CLOSE_TIME: {last_close_time}\n" - # f"LAST_REQUEST_SPECIAL: {self._was_last_request_special}\n" - # f"TIMEDELTA: {timedelta_seconds}\n" - # ) - # open_callback_handle = self._loop.call_later(timedelta_seconds, do_open, context=contextvars.Context()) - # self._was_last_request_special = True + # self._last_request_special = True # open_callback_handle = self._loop.call_later(5, do_open, context=contextvars.Context()) # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - with open(log_file_path, 'a') as log_file: - log_file.write(debug_info) - try: transport_request.count += 1 self._last_submission_time = datetime.now() - ## log_to_file(f'LAST_SUBMISSION_TIME: {self._last_submission_time}') - ## log_to_file(f'TRANSPORT_REQUEST_COUNT: {transport_request.count}') yield transport_request.future except asyncio.CancelledError: # note this is only required in python<=3.7, @@ -218,60 +163,29 @@ def do_open(): _LOGGER.error('Exception whilst using transport:\n%s', traceback.format_exc()) raise finally: - ## log_to_file(f'FINALLY BLOCK - TRANSPORT_REQUEST.COUNT: {transport_request.count}') transport_request.count -= 1 assert transport_request.count >= 0, 'Transport request count dropped below 0!' # Check if there are no longer any users that want the transport if transport_request.count == 0: - ## log_to_file(f'TRANSPORT_REQUEST.FUTURE.DONE(): {transport_request.future.done()}') if transport_request.future.done(): - ## log_to_file(f'DATETIME: {(datetime.now() - self._last_open_time).total_seconds() > 5}') - - if (datetime.now() - self._last_open_time).total_seconds() > 5: - - def close_transport(): - transport_request.future.result().close() - """Close the transport if conditions are met.""" - ## log_to_file("CLOSE_TRANSPORT") - ## log_to_file(f"LAST_CLOSE_TIME: {self._last_close_time}") - - close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context()) - transport_request.transport_closer = close_callback_handle - # transport_request.transport_closer = None - # else: - # If not yet time to close, schedule again - # close_callback_handle = self._loop.call_later( - # 5, close_transport, context=contextvars.Context()) - # transport_request.transport_closer = close_callback_handle - # ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}") - # if transport_request.transport_closer is None: - # ## log_to_file("INSIDE") - # self._last_close_time = datetime.now() - # else: - # return + # if (datetime.now() - self._last_open_time).total_seconds() > 5: - # This should be replaced with the call_later close_callback_handle invocation - # ## log_to_file(f"TRANSPORT_REQUEST.TRANSPORT_CLOSER: {transport_request.transport_closer}") + # def close_transport(): + # """Close the transport if conditions are met.""" + # transport_request.future.result().close() - transport_request.future.result().close() + # Also here logic when transport should be closed immediately, or when via call_later? + # close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context()) + # transport_request.transport_closer = close_callback_handle - # Get old metadata from authinfo, and set variable last_close_time to datetime now in isoformat - # Need to convert to str, otherwise not json-serializable when setting authinfo metadata - # if self._was_last_request_special is True: - last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z') - authinfo.set_metadata({'last_close_time': last_close_time}) - # else: - # # asyncio.sleep(5) - # transport_request.count += 1 - # self._was_last_request_special = True - # yield transport_request.future + # This should be replaced with the call_later close_callback_handle invocation + transport_request.future.result().close() + # When storing in `AuthInfo`, had to convert to str to be storeable in the DB + # self._last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z') + self._last_close_time = timezone.localtime(timezone.now()) elif open_callback_handle is not None: open_callback_handle.cancel() self._transport_requests.pop(authinfo.pk, None) - - -# Should wait first time 0, then always ~30 -# Try out with manual waiting times in between From e818824345cd51c4c5cca2a12432d61590f871e8 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Tue, 29 Oct 2024 08:29:56 +0100 Subject: [PATCH 3/7] Let's see if this works... --- src/aiida/engine/transports.py | 74 ++++++++++++++++------------------ 1 file changed, 34 insertions(+), 40 deletions(-) diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 56baeb9237..e5311e3830 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -87,7 +87,7 @@ async def transport_task(transport_queue, authinfo): transport = authinfo.get_transport() # safe_open_interval = transport.get_safe_open_interval() - safe_open_interval = 15 + safe_open_interval = 30 # Check here if last_open_time > safe_interval, one could immediately open the transport # This should be the very first request, after a while @@ -98,7 +98,7 @@ def do_open(): _LOGGER.debug('Transport request opening transport for %s', authinfo) try: transport.open() - self._last_open_time = datetime.now() + self._last_open_time = timezone.localtime(timezone.now()) except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -114,45 +114,34 @@ def do_open(): # to this handle would otherwise keep the Process context (and thus the process itself) in memory. # See https://github.com/aiidateam/aiida-core/issues/4698 - # if self._last_request_special: - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # self._last_request_special = False + # First request, submit immediately + # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? + if self._last_close_time is None: + open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) + self._last_request_special = True - # # First request, submit immediately - # # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? - # if self._last_close_time is None: - # open_callback_handle = self._loop.call_later(1, do_open, context=contextvars.Context()) - # self._last_request_special = True + elif self._last_request_special: + open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + self._last_request_special = False - # # self._last_close_time = datetime.strptime(self._last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z') - # else: - # timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() + else: + timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() - # if timedelta_seconds > safe_open_interval: - # # ! This could also be `_loop.call_soon` which has an implicit delay of 0s + if timedelta_seconds > safe_open_interval: + # ! This could also be `_loop.call_soon` which has an implicit delay of 0s - # open_callback_handle = self._loop.call_later(timedelta_seconds-safe_open_interval, do_open, context=contextvars.Context()) - # self._last_request_special = True + open_timedelta = timedelta_seconds-safe_open_interval + open_callback_handle = self._loop.call_later(open_timedelta, do_open, context=contextvars.Context()) + self._last_request_special = True - # else: - # # If the last one was a special request, wait the safe_open_interval - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - - # if self._last_request_special: - - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # self._last_request_special = False - - # else: - # open_callback_handle = self._loop.call_later(timedelta_seconds, do_open, context=contextvars.Context()) - # self._last_request_special = True - # open_callback_handle = self._loop.call_later(5, do_open, context=contextvars.Context()) + else: + # If the last one was a special request, wait the safe_open_interval + open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) try: transport_request.count += 1 - self._last_submission_time = datetime.now() yield transport_request.future except asyncio.CancelledError: # note this is only required in python<=3.7, @@ -169,21 +158,26 @@ def do_open(): if transport_request.count == 0: if transport_request.future.done(): - # if (datetime.now() - self._last_open_time).total_seconds() > 5: + def do_close(): + """Close the transport if conditions are met.""" + transport_request.future.result().close() + # self._last_close_time = timezone.localtime(timezone.now()) - # def close_transport(): - # """Close the transport if conditions are met.""" - # transport_request.future.result().close() + close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() + + if close_timedelta > safe_open_interval: # Also here logic when transport should be closed immediately, or when via call_later? - # close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context()) + close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) + self._last_close_time = timezone.localtime(timezone.now()) + else: + close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) + # transport_request.transport_closer = close_callback_handle # This should be replaced with the call_later close_callback_handle invocation - transport_request.future.result().close() - # When storing in `AuthInfo`, had to convert to str to be storeable in the DB - # self._last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z') - self._last_close_time = timezone.localtime(timezone.now()) + # transport_request.future.result().close() + elif open_callback_handle is not None: open_callback_handle.cancel() From 9468f1e6836f2bad0b239bc366e72d6ebc0c95de Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Tue, 29 Oct 2024 09:45:47 +0100 Subject: [PATCH 4/7] Modify transport(_request) handling in tasks.py?? --- src/aiida/engine/processes/calcjobs/tasks.py | 25 +++++++-- src/aiida/engine/transports.py | 56 ++++++++++++-------- src/aiida/engine/utils.py | 1 + 3 files changed, 58 insertions(+), 24 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index b1ea8c482c..ca8dbf2d72 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -54,6 +54,23 @@ class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" +async def get_transport(authinfo, transport_queue, cancellable): + transport_requests = transport_queue._transport_requests + last_transport_request = transport_requests.get(authinfo.pk, None) + + # ? Refactor this into `obtain_transport` function + # ? Returns last transport if open, and awaits close callback handle, otherwise request new transport + if last_transport_request is None or transport_queue._last_request_special: + # This is the previous behavior + with transport_queue.request_transport(authinfo) as request: + transport = await cancellable.with_interrupt(request) + else: + transport = authinfo.get_transport() + if not transport.is_open: + with transport_queue.request_transport(authinfo) as request: + transport = await cancellable.with_interrupt(request) + else: + transport_queue._last_request_special = True async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to upload the files of a job calculation to the remote. @@ -143,9 +160,11 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca authinfo = node.get_authinfo() async def do_submit(): - with transport_queue.request_transport(authinfo) as request: - transport = await cancellable.with_interrupt(request) - return execmanager.submit_calculation(node, transport) + + transport = get_transport(authinfo=authinfo, transport_queue=transport_queue, cancellable=cancellable) + print('a') + + return execmanager.submit_calculation(node, transport) try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index e5311e3830..24b3d97435 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -29,12 +29,15 @@ class TransportRequest: """Information kept about request for a transport object""" def __init__(self): - super().__init__() self.future: asyncio.Future = asyncio.Future() self.count = 0 - # ? What do I need this for? - # self.transport_closer = None +class TransportCloseRequest: + """Information kept about close request for a transport object""" + + def __init__(self): + self.future: asyncio.Future = asyncio.Future() + self.count = 0 class TransportQueue: """A queue to get transport objects from authinfo. This class allows clients @@ -54,7 +57,7 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): self._last_open_time = None self._last_close_time = None self._last_request_special: bool = False - # self._last_submission_time = None + self._close_callback_handle = None # self._last_transport_request: Dict[Hashable, str] = {} @property @@ -77,8 +80,10 @@ async def transport_task(transport_queue, authinfo): :return: A future that can be yielded to give the transport """ open_callback_handle = None - # close_callback_handle = None + close_callback_handle = None transport_request = self._transport_requests.get(authinfo.pk, None) + # safe_open_interval = transport.get_safe_open_interval() + safe_open_interval = 30 if transport_request is None: # There is no existing request for this transport (i.e. on this authinfo) @@ -86,8 +91,6 @@ async def transport_task(transport_queue, authinfo): self._transport_requests[authinfo.pk] = transport_request transport = authinfo.get_transport() - # safe_open_interval = transport.get_safe_open_interval() - safe_open_interval = 30 # Check here if last_open_time > safe_interval, one could immediately open the transport # This should be the very first request, after a while @@ -116,30 +119,36 @@ def do_open(): # First request, submit immediately # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? - if self._last_close_time is None: - open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) - self._last_request_special = True - - elif self._last_request_special: + if self._last_request_special: open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) self._last_request_special = False + elif self._last_close_time is None: + open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) + self._last_request_special = True + else: - timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() + close_timedelta = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() + open_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() - if timedelta_seconds > safe_open_interval: + if open_timedelta > safe_open_interval: # ! This could also be `_loop.call_soon` which has an implicit delay of 0s - open_timedelta = timedelta_seconds-safe_open_interval - open_callback_handle = self._loop.call_later(open_timedelta, do_open, context=contextvars.Context()) + # open_timedelta = close_timedelta-safe_open_interval + open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) self._last_request_special = True else: - # If the last one was a special request, wait the safe_open_interval - open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # If the last one was a special request, wait the difference between safe_open_interval and lost + open_callback_handle = self._loop.call_later(safe_open_interval-open_timedelta, do_open, context=contextvars.Context()) # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + # ? This logic is implemented in `tasks.py` instead. + # else: + # transport = authinfo.get_transport() + # return transport + # If transport_request is open already try: transport_request.count += 1 yield transport_request.future @@ -161,7 +170,7 @@ def do_open(): def do_close(): """Close the transport if conditions are met.""" transport_request.future.result().close() - # self._last_close_time = timezone.localtime(timezone.now()) + self._last_close_time = timezone.localtime(timezone.now()) close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() @@ -170,16 +179,21 @@ def do_close(): # Also here logic when transport should be closed immediately, or when via call_later? close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) self._last_close_time = timezone.localtime(timezone.now()) + self._transport_requests.pop(authinfo.pk, None) else: close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) + self._transport_requests.pop(authinfo.pk, None) # transport_request.transport_closer = close_callback_handle # This should be replaced with the call_later close_callback_handle invocation # transport_request.future.result().close() - + # ? When should the transport_request be popped? + # ? If it is always popped as soon as the task is done, there is no way to re-use it... + # self._transport_requests.pop(authinfo.pk, None) elif open_callback_handle is not None: open_callback_handle.cancel() - self._transport_requests.pop(authinfo.pk, None) + # ? Somewhere I still need to `pop` the transport_request... or do I? + # self._transport_requests.pop(authinfo.pk, None) diff --git a/src/aiida/engine/utils.py b/src/aiida/engine/utils.py index 4053156a97..88fc9e80f9 100644 --- a/src/aiida/engine/utils.py +++ b/src/aiida/engine/utils.py @@ -198,6 +198,7 @@ async def exponential_backoff_retry( result: Any = None coro = ensure_coroutine(fct) + print('a') interval = initial_interval for iteration in range(max_attempts): From b0d077e0d2ca0ef14559be41be681033f9aee332 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Tue, 29 Oct 2024 10:38:57 +0100 Subject: [PATCH 5/7] Transport closing logic in `tasks.py` rather than `transports.py`? Otherwise we _always_ close the transport after requesting it?? --- src/aiida/engine/processes/calcjobs/tasks.py | 45 +++++++++++--------- src/aiida/engine/transports.py | 32 ++++++++------ src/aiida/engine/utils.py | 1 - 3 files changed, 45 insertions(+), 33 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index ca8dbf2d72..b51d638d1e 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -54,24 +54,6 @@ class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" -async def get_transport(authinfo, transport_queue, cancellable): - transport_requests = transport_queue._transport_requests - last_transport_request = transport_requests.get(authinfo.pk, None) - - # ? Refactor this into `obtain_transport` function - # ? Returns last transport if open, and awaits close callback handle, otherwise request new transport - if last_transport_request is None or transport_queue._last_request_special: - # This is the previous behavior - with transport_queue.request_transport(authinfo) as request: - transport = await cancellable.with_interrupt(request) - else: - transport = authinfo.get_transport() - if not transport.is_open: - with transport_queue.request_transport(authinfo) as request: - transport = await cancellable.with_interrupt(request) - else: - transport_queue._last_request_special = True - async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to upload the files of a job calculation to the remote. @@ -158,11 +140,34 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() + authinfo_pk = authinfo.pk + transport_request = transport_queue._transport_requests.get(authinfo.pk, None) + open_transport = transport_queue._open_transports.get(authinfo.pk, None) + + if open_transport is not None: # and not transport_queue._last_request_special: + transport = open_transport + transport_queue._last_request_special = True + elif transport_request is None: # or transport_queue._last_request_special: + # This is the previous behavior + with transport_queue.request_transport(authinfo) as request: + transport = await cancellable.with_interrupt(request) + else: + pass + async def do_submit(): + transport_request = transport_queue._transport_requests.get(authinfo.pk, None) + open_transport = transport_queue._open_transports.get(authinfo.pk, None) - transport = get_transport(authinfo=authinfo, transport_queue=transport_queue, cancellable=cancellable) - print('a') + if open_transport is not None: # and not transport_queue._last_request_special: + transport = open_transport + transport_queue._last_request_special = True + elif transport_request is None: # or transport_queue._last_request_special: + # This is the previous behavior + with transport_queue.request_transport(authinfo) as request: + transport = await cancellable.with_interrupt(request) + else: + pass return execmanager.submit_calculation(node, transport) diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 24b3d97435..5080ada817 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -58,6 +58,7 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): self._last_close_time = None self._last_request_special: bool = False self._close_callback_handle = None + self._open_transports: Dict[Hashable, Transport] = {} # self._last_transport_request: Dict[Hashable, str] = {} @property @@ -102,6 +103,7 @@ def do_open(): try: transport.open() self._last_open_time = timezone.localtime(timezone.now()) + self._open_transports[authinfo.pk] = transport except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -167,22 +169,28 @@ def do_open(): if transport_request.count == 0: if transport_request.future.done(): - def do_close(): - """Close the transport if conditions are met.""" - transport_request.future.result().close() - self._last_close_time = timezone.localtime(timezone.now()) + # ? Why is all this logic in the `request_transport` method? + # ? Shouldn't the logic to close a transport be outside, such that the transport is being closed + # ? once it was actually used??? + pass + # def do_close(): + # """Close the transport if conditions are met.""" + # transport_request.future.result().close() + # self._last_close_time = timezone.localtime(timezone.now()) - close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() + # close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() - if close_timedelta > safe_open_interval: + # if close_timedelta < safe_open_interval: # Also here logic when transport should be closed immediately, or when via call_later? - close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) - self._last_close_time = timezone.localtime(timezone.now()) - self._transport_requests.pop(authinfo.pk, None) - else: - close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) - self._transport_requests.pop(authinfo.pk, None) + # self._last_close_time = timezone.localtime(timezone.now()) + # self._transport_requests.pop(authinfo.pk, None) + # close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) + # if close_timedelta > safe_open_interval: + # close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) + # self._last_close_time = timezone.localtime(timezone.now()) + # self._transport_requests.pop(authinfo.pk, None) + # self._transport_requests.pop(authinfo.pk, None) # transport_request.transport_closer = close_callback_handle diff --git a/src/aiida/engine/utils.py b/src/aiida/engine/utils.py index 88fc9e80f9..4053156a97 100644 --- a/src/aiida/engine/utils.py +++ b/src/aiida/engine/utils.py @@ -198,7 +198,6 @@ async def exponential_backoff_retry( result: Any = None coro = ensure_coroutine(fct) - print('a') interval = initial_interval for iteration in range(max_attempts): From 9a9eb1696f6e9726cf69fb49aa4e4a928a2b1d41 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 4 Nov 2024 11:34:12 +0100 Subject: [PATCH 6/7] Save state before cleanup --- src/aiida/engine/processes/calcjobs/tasks.py | 30 ++++++++++---------- src/aiida/engine/transports.py | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index b51d638d1e..58647f4a70 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -140,20 +140,20 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() - authinfo_pk = authinfo.pk - - transport_request = transport_queue._transport_requests.get(authinfo.pk, None) - open_transport = transport_queue._open_transports.get(authinfo.pk, None) - - if open_transport is not None: # and not transport_queue._last_request_special: - transport = open_transport - transport_queue._last_request_special = True - elif transport_request is None: # or transport_queue._last_request_special: - # This is the previous behavior - with transport_queue.request_transport(authinfo) as request: - transport = await cancellable.with_interrupt(request) - else: - pass + # authinfo_pk = authinfo.pk + + # transport_request = transport_queue._transport_requests.get(authinfo.pk, None) + # open_transport = transport_queue._open_transports.get(authinfo.pk, None) + + # if open_transport is not None: # and not transport_queue._last_request_special: + # transport = open_transport + # transport_queue._last_request_special = True + # elif transport_request is None: # or transport_queue._last_request_special: + # # This is the previous behavior + # with transport_queue.request_transport(authinfo) as request: + # transport = await cancellable.with_interrupt(request) + # else: + # pass async def do_submit(): transport_request = transport_queue._transport_requests.get(authinfo.pk, None) @@ -161,7 +161,7 @@ async def do_submit(): if open_transport is not None: # and not transport_queue._last_request_special: transport = open_transport - transport_queue._last_request_special = True + # transport_queue._last_request_special = True elif transport_request is None: # or transport_queue._last_request_special: # This is the previous behavior with transport_queue.request_transport(authinfo) as request: diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 5080ada817..4efedd84ff 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -54,11 +54,11 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): """:param loop: An asyncio event, will use `asyncio.get_event_loop()` if not supplied""" self._loop = loop if loop is not None else asyncio.get_event_loop() self._transport_requests: Dict[Hashable, TransportRequest] = {} + self._open_transports: Dict[Hashable, Transport] = {} self._last_open_time = None self._last_close_time = None self._last_request_special: bool = False self._close_callback_handle = None - self._open_transports: Dict[Hashable, Transport] = {} # self._last_transport_request: Dict[Hashable, str] = {} @property From 3bc355017799fd30c740bbbbe544cce9c2e9bc36 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 4 Nov 2024 14:31:45 +0100 Subject: [PATCH 7/7] Add `_last_close_time` attribute to `TransportQueue`. --- src/aiida/engine/processes/calcjobs/tasks.py | 36 ++------ src/aiida/engine/transports.py | 87 +++----------------- 2 files changed, 18 insertions(+), 105 deletions(-) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 58647f4a70..2c030ba878 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -54,6 +54,7 @@ class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" + async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to upload the files of a job calculation to the remote. @@ -140,36 +141,11 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca max_attempts = get_config_option(MAX_ATTEMPTS_OPTION) authinfo = node.get_authinfo() - # authinfo_pk = authinfo.pk - - # transport_request = transport_queue._transport_requests.get(authinfo.pk, None) - # open_transport = transport_queue._open_transports.get(authinfo.pk, None) - - # if open_transport is not None: # and not transport_queue._last_request_special: - # transport = open_transport - # transport_queue._last_request_special = True - # elif transport_request is None: # or transport_queue._last_request_special: - # # This is the previous behavior - # with transport_queue.request_transport(authinfo) as request: - # transport = await cancellable.with_interrupt(request) - # else: - # pass - - async def do_submit(): - transport_request = transport_queue._transport_requests.get(authinfo.pk, None) - open_transport = transport_queue._open_transports.get(authinfo.pk, None) - - if open_transport is not None: # and not transport_queue._last_request_special: - transport = open_transport - # transport_queue._last_request_special = True - elif transport_request is None: # or transport_queue._last_request_special: - # This is the previous behavior - with transport_queue.request_transport(authinfo) as request: - transport = await cancellable.with_interrupt(request) - else: - pass - return execmanager.submit_calculation(node, transport) + async def do_submit(): + with transport_queue.request_transport(authinfo) as request: + transport = await cancellable.with_interrupt(request) + return execmanager.submit_calculation(node, transport) try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') @@ -520,11 +496,9 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override result: plumpy.process_states.State = self process_status = f'Waiting for transport task: {self._command}' - node.set_process_status(process_status) try: - # ? Possibly implement here to keep connection open if self._command == UPLOAD_COMMAND: skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue) if skip_submit: diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 4efedd84ff..a4b36a4bc8 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -32,12 +32,6 @@ def __init__(self): self.future: asyncio.Future = asyncio.Future() self.count = 0 -class TransportCloseRequest: - """Information kept about close request for a transport object""" - - def __init__(self): - self.future: asyncio.Future = asyncio.Future() - self.count = 0 class TransportQueue: """A queue to get transport objects from authinfo. This class allows clients @@ -54,12 +48,7 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): """:param loop: An asyncio event, will use `asyncio.get_event_loop()` if not supplied""" self._loop = loop if loop is not None else asyncio.get_event_loop() self._transport_requests: Dict[Hashable, TransportRequest] = {} - self._open_transports: Dict[Hashable, Transport] = {} - self._last_open_time = None - self._last_close_time = None - self._last_request_special: bool = False - self._close_callback_handle = None - # self._last_transport_request: Dict[Hashable, str] = {} + self._last_close_time: Optional[datetime] = None @property def loop(self) -> asyncio.AbstractEventLoop: @@ -81,10 +70,7 @@ async def transport_task(transport_queue, authinfo): :return: A future that can be yielded to give the transport """ open_callback_handle = None - close_callback_handle = None transport_request = self._transport_requests.get(authinfo.pk, None) - # safe_open_interval = transport.get_safe_open_interval() - safe_open_interval = 30 if transport_request is None: # There is no existing request for this transport (i.e. on this authinfo) @@ -92,9 +78,8 @@ async def transport_task(transport_queue, authinfo): self._transport_requests[authinfo.pk] = transport_request transport = authinfo.get_transport() + safe_open_interval = transport.get_safe_open_interval() - # Check here if last_open_time > safe_interval, one could immediately open the transport - # This should be the very first request, after a while def do_open(): """Actually open the transport""" if transport_request.count > 0: @@ -102,8 +87,6 @@ def do_open(): _LOGGER.debug('Transport request opening transport for %s', authinfo) try: transport.open() - self._last_open_time = timezone.localtime(timezone.now()) - self._open_transports[authinfo.pk] = transport except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -120,37 +103,22 @@ def do_open(): # See https://github.com/aiidateam/aiida-core/issues/4698 # First request, submit immediately - # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? - if self._last_request_special: - open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - self._last_request_special = False - - elif self._last_close_time is None: + if self._last_close_time is None: open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) - self._last_request_special = True else: close_timedelta = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() - open_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() - - if open_timedelta > safe_open_interval: - # ! This could also be `_loop.call_soon` which has an implicit delay of 0s - # open_timedelta = close_timedelta-safe_open_interval + if close_timedelta > safe_open_interval: + # If time since last close > `safe_open_interval`, open immediately open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) - self._last_request_special = True else: - # If the last one was a special request, wait the difference between safe_open_interval and lost - open_callback_handle = self._loop.call_later(safe_open_interval-open_timedelta, do_open, context=contextvars.Context()) + # Otherwise, wait only the difference required until the `safe_open_interval` is reached + open_callback_handle = self._loop.call_later( + safe_open_interval - close_timedelta, do_open, context=contextvars.Context() + ) - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - - # ? This logic is implemented in `tasks.py` instead. - # else: - # transport = authinfo.get_transport() - # return transport - # If transport_request is open already try: transport_request.count += 1 yield transport_request.future @@ -168,40 +136,11 @@ def do_open(): # Check if there are no longer any users that want the transport if transport_request.count == 0: if transport_request.future.done(): - - # ? Why is all this logic in the `request_transport` method? - # ? Shouldn't the logic to close a transport be outside, such that the transport is being closed - # ? once it was actually used??? - pass - # def do_close(): - # """Close the transport if conditions are met.""" - # transport_request.future.result().close() - # self._last_close_time = timezone.localtime(timezone.now()) - - # close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() - - # if close_timedelta < safe_open_interval: - - # Also here logic when transport should be closed immediately, or when via call_later? - # self._last_close_time = timezone.localtime(timezone.now()) - # self._transport_requests.pop(authinfo.pk, None) - # close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) - # if close_timedelta > safe_open_interval: - # close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) - # self._last_close_time = timezone.localtime(timezone.now()) - # self._transport_requests.pop(authinfo.pk, None) - # self._transport_requests.pop(authinfo.pk, None) - - # transport_request.transport_closer = close_callback_handle - - # This should be replaced with the call_later close_callback_handle invocation - # transport_request.future.result().close() - # ? When should the transport_request be popped? - # ? If it is always popped as soon as the task is done, there is no way to re-use it... - # self._transport_requests.pop(authinfo.pk, None) + _LOGGER.debug('Transport request closing transport for %s', authinfo) + transport_request.future.result().close() elif open_callback_handle is not None: open_callback_handle.cancel() - # ? Somewhere I still need to `pop` the transport_request... or do I? - # self._transport_requests.pop(authinfo.pk, None) + self._last_close_time = timezone.localtime(timezone.now()) + self._transport_requests.pop(authinfo.pk, None)