From e0dff995b888fe5dd5a187e2ca0ba458ffe062dd Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Mon, 9 Dec 2024 13:43:40 -0800 Subject: [PATCH 1/4] Fix issue where run_command blocks when it shouldn't In cases where we need to read from a file (for the result of run_command), the current code used to make it sequential (so the timeout was actually never triggered). This started causing problems on MacOS when writing large amounts of data (8KB FIFO) --- .../argo/argo_workflows_deployer_objects.py | 6 ++++- .../step_functions_deployer_objects.py | 3 +++ metaflow/runner/deployer.py | 2 +- metaflow/runner/deployer_impl.py | 4 ++-- metaflow/runner/metaflow_runner.py | 4 +++- metaflow/runner/nbdeploy.py | 2 ++ metaflow/runner/nbrun.py | 2 +- metaflow/runner/subprocess_manager.py | 4 +++- metaflow/runner/utils.py | 24 +++++++++---------- 9 files changed, 32 insertions(+), 19 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows_deployer_objects.py b/metaflow/plugins/argo/argo_workflows_deployer_objects.py index d2ec33bfff3..004fce552f2 100644 --- a/metaflow/plugins/argo/argo_workflows_deployer_objects.py +++ b/metaflow/plugins/argo/argo_workflows_deployer_objects.py @@ -97,6 +97,7 @@ def suspend(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def unsuspend(self, **kwargs) -> bool: @@ -131,6 +132,7 @@ def unsuspend(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def terminate(self, **kwargs) -> bool: @@ -165,6 +167,7 @@ def terminate(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 @property @@ -319,6 +322,7 @@ def delete(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun: @@ -361,7 +365,7 @@ def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun: content = handle_timeout( attribute_file_fd, command_obj, self.deployer.file_read_timeout ) - + command_obj.sync_wait() if command_obj.process.returncode == 0: return ArgoWorkflowsTriggeredRun( deployer=self.deployer, content=content diff --git a/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py b/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py index 394d8739327..7e85e83e48d 100644 --- a/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py +++ b/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py @@ -46,6 +46,7 @@ def terminate(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 @@ -174,6 +175,7 @@ def delete(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def trigger(self, **kwargs) -> StepFunctionsTriggeredRun: @@ -217,6 +219,7 @@ def trigger(self, **kwargs) -> StepFunctionsTriggeredRun: attribute_file_fd, command_obj, self.deployer.file_read_timeout ) + command_obj.sync_wait() if command_obj.process.returncode == 0: return StepFunctionsTriggeredRun( deployer=self.deployer, content=content diff --git a/metaflow/runner/deployer.py b/metaflow/runner/deployer.py index e49761b3f9e..a0fde09584d 100644 --- a/metaflow/runner/deployer.py +++ b/metaflow/runner/deployer.py @@ -64,7 +64,7 @@ class Deployer(metaclass=DeployerMeta): The directory to run the subprocess in; if not specified, the current directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the deployer attribute file. + The timeout until which we try to read the deployer attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the deployment command. diff --git a/metaflow/runner/deployer_impl.py b/metaflow/runner/deployer_impl.py index f9374ef8913..64a9d5cab4c 100644 --- a/metaflow/runner/deployer_impl.py +++ b/metaflow/runner/deployer_impl.py @@ -37,7 +37,7 @@ class variable that matches the name of the CLI group. The directory to run the subprocess in; if not specified, the current directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the deployer attribute file. + The timeout until which we try to read the deployer attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the deployment command. @@ -144,7 +144,7 @@ def _create( # Additional info is used to pass additional deployer specific information. # It is used in non-OSS deployers (extensions). self.additional_info = content.get("additional_info", {}) - + command_obj.sync_wait() if command_obj.process.returncode == 0: return create_class(deployer=self) diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index 3a0d16552aa..cbcd77c6e24 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -221,7 +221,7 @@ class Runner(object): The directory to run the subprocess in; if not specified, the current directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the runner attribute file. + The timeout until which we try to read the runner attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the `run` command. @@ -326,6 +326,7 @@ def run(self, **kwargs) -> ExecutingRun: show_output=self.show_output, ) command_obj = self.spm.get(pid) + command_obj.sync_wait() return self.__get_executing_run(attribute_file_fd, command_obj) @@ -357,6 +358,7 @@ def resume(self, **kwargs): show_output=self.show_output, ) command_obj = self.spm.get(pid) + command_obj.sync_wait() return self.__get_executing_run(attribute_file_fd, command_obj) diff --git a/metaflow/runner/nbdeploy.py b/metaflow/runner/nbdeploy.py index 355047d2a46..4b733ebef1e 100644 --- a/metaflow/runner/nbdeploy.py +++ b/metaflow/runner/nbdeploy.py @@ -46,6 +46,8 @@ class NBDeployer(object): base_dir : str, optional, default None The directory to run the subprocess in; if not specified, the current working directory is used. + file_read_timeout : int, default 3600 + The timeout until which we try to read the deployer attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` i.e. options listed in `python myflow.py --help` diff --git a/metaflow/runner/nbrun.py b/metaflow/runner/nbrun.py index 10fc473eff3..055c1b5c7c1 100644 --- a/metaflow/runner/nbrun.py +++ b/metaflow/runner/nbrun.py @@ -44,7 +44,7 @@ class NBRunner(object): The directory to run the subprocess in; if not specified, the current working directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the runner attribute file. + The timeout until which we try to read the runner attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the `run` command. diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index 37e69ce06bf..eed9eea8165 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -120,6 +120,9 @@ def run_command( """ Run a command synchronously and return its process ID. + Note: in no case does this wait for the process to *finish*. Use sync_wait() + to wait for the command to finish. + Parameters ---------- command : List[str] @@ -145,7 +148,6 @@ def run_command( command_obj = CommandManager(command, env, cwd) pid = command_obj.run(show_output=show_output) self.commands[pid] = command_obj - command_obj.sync_wait() return pid async def async_run_command( diff --git a/metaflow/runner/utils.py b/metaflow/runner/utils.py index ec0f1865d7c..6a7b3d299ad 100644 --- a/metaflow/runner/utils.py +++ b/metaflow/runner/utils.py @@ -91,7 +91,7 @@ def read_from_fifo_when_ready( encoding : str, optional Encoding to use while reading the file, by default "utf-8". timeout : int, optional - Timeout for reading the file in milliseconds, by default 3600. + Timeout for reading the file in seconds, by default 3600. Returns ------- @@ -113,24 +113,24 @@ def read_from_fifo_when_ready( while True: poll_begin = time.time() - poll.poll(timeout) - timeout -= 1000 * (time.time() - poll_begin) - - if timeout <= 0: - raise TimeoutError("Timeout while waiting for the file content") + poll.poll(1000 * timeout) + timeout -= time.time() - poll_begin try: - data = os.read(fifo_fd, 128) + data = os.read(fifo_fd, 8192) while data: content += data - data = os.read(fifo_fd, 128) + data = os.read(fifo_fd, 8192) # Read from a non-blocking closed FIFO returns an empty byte array break except BlockingIOError: # FIFO is open but no data is available yet - continue + pass + finally: + if timeout <= 0: + raise TimeoutError("Timeout while waiting for the file content") if not content and check_process_exited(command_obj): raise CalledProcessError(command_obj.process.returncode, command_obj.command) @@ -156,7 +156,7 @@ async def async_read_from_fifo_when_ready( encoding : str, optional Encoding to use while reading the file, by default "utf-8". timeout : int, optional - Timeout for reading the file in milliseconds, by default 3600. + Timeout for reading the file in seconds, by default 3600. Returns ------- @@ -206,7 +206,7 @@ def handle_timeout( command_obj : CommandManager Command manager object that encapsulates the running command details. file_read_timeout : int - Timeout for reading the file. + Timeout for reading the file, in seconds Returns ------- @@ -243,7 +243,7 @@ async def async_handle_timeout( command_obj : CommandManager Command manager object that encapsulates the running command details. file_read_timeout : int - Timeout for reading the file. + Timeout for reading the file, in seconds Returns ------- From f352bc02b0a2273fde3ff4a65948e956c8fbc7d7 Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Mon, 9 Dec 2024 14:28:26 -0800 Subject: [PATCH 2/4] Try to deal with race on MacOS --- metaflow/runner/utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/metaflow/runner/utils.py b/metaflow/runner/utils.py index 6a7b3d299ad..342ab25aa7f 100644 --- a/metaflow/runner/utils.py +++ b/metaflow/runner/utils.py @@ -110,7 +110,7 @@ def read_from_fifo_when_ready( poll = select.poll() poll.register(fifo_fd, select.POLLIN) - + read_data = False while True: poll_begin = time.time() poll.poll(1000 * timeout) @@ -119,6 +119,7 @@ def read_from_fifo_when_ready( try: data = os.read(fifo_fd, 8192) while data: + read_data = True content += data data = os.read(fifo_fd, 8192) @@ -126,8 +127,10 @@ def read_from_fifo_when_ready( break except BlockingIOError: - # FIFO is open but no data is available yet - pass + if read_data: + break + # FIFO is open but no data is available yet (spurious POLLIN?) + # so we loop around to poll again finally: if timeout <= 0: raise TimeoutError("Timeout while waiting for the file content") From f57bda3ffb757687d907c80e8a2a03c07351b62c Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Mon, 9 Dec 2024 16:38:10 -0800 Subject: [PATCH 3/4] Gah -- MacOS really doesn't make it easy --- metaflow/runner/utils.py | 46 ++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/metaflow/runner/utils.py b/metaflow/runner/utils.py index 342ab25aa7f..72a1e565fed 100644 --- a/metaflow/runner/utils.py +++ b/metaflow/runner/utils.py @@ -107,33 +107,47 @@ def read_from_fifo_when_ready( content to the FIFO. """ content = bytearray() - poll = select.poll() poll.register(fifo_fd, select.POLLIN) - read_data = False + max_timeout = 3 # Wait for 10 * 3 = 30 ms after last write while True: + if timeout < 0: + raise TimeoutError("Timeout while waiting for the file content") + poll_begin = time.time() - poll.poll(1000 * timeout) + # We poll for a very short time to be also able to check if the file was closed + # If the file is closed, we assume that we only have one writer so if we have + # data, we break out. This is to work around issues in macos + events = poll.poll(min(10, timeout * 1000)) timeout -= time.time() - poll_begin try: data = os.read(fifo_fd, 8192) - while data: - read_data = True + if data: content += data - data = os.read(fifo_fd, 8192) - - # Read from a non-blocking closed FIFO returns an empty byte array - break - + else: + if len(events): + # We read an EOF -- consider the file done + break + else: + # We had no events (just a timeout) and the read didn't return + # an exception so the file is still open; we continue waiting for data + # Unfortunately, on MacOS, it seems that even *after* the file is + # closed on the other end, we still don't get a BlockingIOError so + # we hack our way and timeout if there is no write in 30ms which is + # a relative eternity for file writes. + if content: + if max_timeout <= 0: + break + max_timeout -= 1 + continue except BlockingIOError: - if read_data: + has_blocking_error = True + if content: + # The file was closed break - # FIFO is open but no data is available yet (spurious POLLIN?) - # so we loop around to poll again - finally: - if timeout <= 0: - raise TimeoutError("Timeout while waiting for the file content") + # else, if we have no content, we continue waiting for the file to be open + # and written to. if not content and check_process_exited(command_obj): raise CalledProcessError(command_obj.process.returncode, command_obj.command) From 746422bcd65217b95c84b177874cb5f7d75ec934 Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Mon, 9 Dec 2024 17:11:16 -0800 Subject: [PATCH 4/4] Address comment --- metaflow/runner/metaflow_runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index cbcd77c6e24..098e645a00c 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -272,6 +272,9 @@ async def __aenter__(self) -> "Runner": def __get_executing_run(self, attribute_file_fd, command_obj): content = handle_timeout(attribute_file_fd, command_obj, self.file_read_timeout) + + command_obj.sync_wait() + content = json.loads(content) pathspec = "%s/%s" % (content.get("flow_name"), content.get("run_id")) @@ -326,7 +329,6 @@ def run(self, **kwargs) -> ExecutingRun: show_output=self.show_output, ) command_obj = self.spm.get(pid) - command_obj.sync_wait() return self.__get_executing_run(attribute_file_fd, command_obj) @@ -358,7 +360,6 @@ def resume(self, **kwargs): show_output=self.show_output, ) command_obj = self.spm.get(pid) - command_obj.sync_wait() return self.__get_executing_run(attribute_file_fd, command_obj)