diff --git a/metaflow/plugins/argo/argo_workflows_deployer_objects.py b/metaflow/plugins/argo/argo_workflows_deployer_objects.py index d2ec33bfff..004fce552f 100644 --- a/metaflow/plugins/argo/argo_workflows_deployer_objects.py +++ b/metaflow/plugins/argo/argo_workflows_deployer_objects.py @@ -97,6 +97,7 @@ def suspend(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def unsuspend(self, **kwargs) -> bool: @@ -131,6 +132,7 @@ def unsuspend(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def terminate(self, **kwargs) -> bool: @@ -165,6 +167,7 @@ def terminate(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 @property @@ -319,6 +322,7 @@ def delete(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun: @@ -361,7 +365,7 @@ def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun: content = handle_timeout( attribute_file_fd, command_obj, self.deployer.file_read_timeout ) - + command_obj.sync_wait() if command_obj.process.returncode == 0: return ArgoWorkflowsTriggeredRun( deployer=self.deployer, content=content diff --git a/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py b/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py index 394d873932..7e85e83e48 100644 --- a/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py +++ b/metaflow/plugins/aws/step_functions/step_functions_deployer_objects.py @@ -46,6 +46,7 @@ def terminate(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 @@ -174,6 +175,7 @@ def delete(self, **kwargs) -> bool: ) command_obj = self.deployer.spm.get(pid) + command_obj.sync_wait() return command_obj.process.returncode == 0 def trigger(self, **kwargs) -> StepFunctionsTriggeredRun: @@ -217,6 +219,7 @@ def trigger(self, **kwargs) -> StepFunctionsTriggeredRun: attribute_file_fd, command_obj, self.deployer.file_read_timeout ) + command_obj.sync_wait() if command_obj.process.returncode == 0: return StepFunctionsTriggeredRun( deployer=self.deployer, content=content diff --git a/metaflow/runner/deployer.py b/metaflow/runner/deployer.py index e49761b3f9..a0fde09584 100644 --- a/metaflow/runner/deployer.py +++ b/metaflow/runner/deployer.py @@ -64,7 +64,7 @@ class Deployer(metaclass=DeployerMeta): The directory to run the subprocess in; if not specified, the current directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the deployer attribute file. + The timeout until which we try to read the deployer attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the deployment command. diff --git a/metaflow/runner/deployer_impl.py b/metaflow/runner/deployer_impl.py index f9374ef891..64a9d5cab4 100644 --- a/metaflow/runner/deployer_impl.py +++ b/metaflow/runner/deployer_impl.py @@ -37,7 +37,7 @@ class variable that matches the name of the CLI group. The directory to run the subprocess in; if not specified, the current directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the deployer attribute file. + The timeout until which we try to read the deployer attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the deployment command. @@ -144,7 +144,7 @@ def _create( # Additional info is used to pass additional deployer specific information. # It is used in non-OSS deployers (extensions). self.additional_info = content.get("additional_info", {}) - + command_obj.sync_wait() if command_obj.process.returncode == 0: return create_class(deployer=self) diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index 3a0d16552a..098e645a00 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -221,7 +221,7 @@ class Runner(object): The directory to run the subprocess in; if not specified, the current directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the runner attribute file. + The timeout until which we try to read the runner attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the `run` command. @@ -272,6 +272,9 @@ async def __aenter__(self) -> "Runner": def __get_executing_run(self, attribute_file_fd, command_obj): content = handle_timeout(attribute_file_fd, command_obj, self.file_read_timeout) + + command_obj.sync_wait() + content = json.loads(content) pathspec = "%s/%s" % (content.get("flow_name"), content.get("run_id")) diff --git a/metaflow/runner/nbdeploy.py b/metaflow/runner/nbdeploy.py index 355047d2a4..4b733ebef1 100644 --- a/metaflow/runner/nbdeploy.py +++ b/metaflow/runner/nbdeploy.py @@ -46,6 +46,8 @@ class NBDeployer(object): base_dir : str, optional, default None The directory to run the subprocess in; if not specified, the current working directory is used. + file_read_timeout : int, default 3600 + The timeout until which we try to read the deployer attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` i.e. options listed in `python myflow.py --help` diff --git a/metaflow/runner/nbrun.py b/metaflow/runner/nbrun.py index 10fc473eff..055c1b5c7c 100644 --- a/metaflow/runner/nbrun.py +++ b/metaflow/runner/nbrun.py @@ -44,7 +44,7 @@ class NBRunner(object): The directory to run the subprocess in; if not specified, the current working directory is used. file_read_timeout : int, default 3600 - The timeout until which we try to read the runner attribute file. + The timeout until which we try to read the runner attribute file (in seconds). **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the `run` command. diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index 37e69ce06b..eed9eea816 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -120,6 +120,9 @@ def run_command( """ Run a command synchronously and return its process ID. + Note: in no case does this wait for the process to *finish*. Use sync_wait() + to wait for the command to finish. + Parameters ---------- command : List[str] @@ -145,7 +148,6 @@ def run_command( command_obj = CommandManager(command, env, cwd) pid = command_obj.run(show_output=show_output) self.commands[pid] = command_obj - command_obj.sync_wait() return pid async def async_run_command( diff --git a/metaflow/runner/utils.py b/metaflow/runner/utils.py index ec0f1865d7..72a1e565fe 100644 --- a/metaflow/runner/utils.py +++ b/metaflow/runner/utils.py @@ -91,7 +91,7 @@ def read_from_fifo_when_ready( encoding : str, optional Encoding to use while reading the file, by default "utf-8". timeout : int, optional - Timeout for reading the file in milliseconds, by default 3600. + Timeout for reading the file in seconds, by default 3600. Returns ------- @@ -107,30 +107,47 @@ def read_from_fifo_when_ready( content to the FIFO. """ content = bytearray() - poll = select.poll() poll.register(fifo_fd, select.POLLIN) - + max_timeout = 3 # Wait for 10 * 3 = 30 ms after last write while True: - poll_begin = time.time() - poll.poll(timeout) - timeout -= 1000 * (time.time() - poll_begin) - - if timeout <= 0: + if timeout < 0: raise TimeoutError("Timeout while waiting for the file content") + poll_begin = time.time() + # We poll for a very short time to be also able to check if the file was closed + # If the file is closed, we assume that we only have one writer so if we have + # data, we break out. This is to work around issues in macos + events = poll.poll(min(10, timeout * 1000)) + timeout -= time.time() - poll_begin + try: - data = os.read(fifo_fd, 128) - while data: + data = os.read(fifo_fd, 8192) + if data: content += data - data = os.read(fifo_fd, 128) - - # Read from a non-blocking closed FIFO returns an empty byte array - break - + else: + if len(events): + # We read an EOF -- consider the file done + break + else: + # We had no events (just a timeout) and the read didn't return + # an exception so the file is still open; we continue waiting for data + # Unfortunately, on MacOS, it seems that even *after* the file is + # closed on the other end, we still don't get a BlockingIOError so + # we hack our way and timeout if there is no write in 30ms which is + # a relative eternity for file writes. + if content: + if max_timeout <= 0: + break + max_timeout -= 1 + continue except BlockingIOError: - # FIFO is open but no data is available yet - continue + has_blocking_error = True + if content: + # The file was closed + break + # else, if we have no content, we continue waiting for the file to be open + # and written to. if not content and check_process_exited(command_obj): raise CalledProcessError(command_obj.process.returncode, command_obj.command) @@ -156,7 +173,7 @@ async def async_read_from_fifo_when_ready( encoding : str, optional Encoding to use while reading the file, by default "utf-8". timeout : int, optional - Timeout for reading the file in milliseconds, by default 3600. + Timeout for reading the file in seconds, by default 3600. Returns ------- @@ -206,7 +223,7 @@ def handle_timeout( command_obj : CommandManager Command manager object that encapsulates the running command details. file_read_timeout : int - Timeout for reading the file. + Timeout for reading the file, in seconds Returns ------- @@ -243,7 +260,7 @@ async def async_handle_timeout( command_obj : CommandManager Command manager object that encapsulates the running command details. file_read_timeout : int - Timeout for reading the file. + Timeout for reading the file, in seconds Returns -------