Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where run_command blocks when it shouldn't #2169

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion metaflow/plugins/argo/argo_workflows_deployer_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def suspend(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def unsuspend(self, **kwargs) -> bool:
Expand Down Expand Up @@ -131,6 +132,7 @@ def unsuspend(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def terminate(self, **kwargs) -> bool:
Expand Down Expand Up @@ -165,6 +167,7 @@ def terminate(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

@property
Expand Down Expand Up @@ -319,6 +322,7 @@ def delete(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun:
Expand Down Expand Up @@ -361,7 +365,7 @@ def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun:
content = handle_timeout(
attribute_file_fd, command_obj, self.deployer.file_read_timeout
)

command_obj.sync_wait()
if command_obj.process.returncode == 0:
return ArgoWorkflowsTriggeredRun(
deployer=self.deployer, content=content
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def terminate(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0


Expand Down Expand Up @@ -174,6 +175,7 @@ def delete(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def trigger(self, **kwargs) -> StepFunctionsTriggeredRun:
Expand Down Expand Up @@ -217,6 +219,7 @@ def trigger(self, **kwargs) -> StepFunctionsTriggeredRun:
attribute_file_fd, command_obj, self.deployer.file_read_timeout
)

command_obj.sync_wait()
if command_obj.process.returncode == 0:
return StepFunctionsTriggeredRun(
deployer=self.deployer, content=content
Expand Down
2 changes: 1 addition & 1 deletion metaflow/runner/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Deployer(metaclass=DeployerMeta):
The directory to run the subprocess in; if not specified, the current
directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the deployer attribute file.
The timeout until which we try to read the deployer attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the deployment command.
Expand Down
4 changes: 2 additions & 2 deletions metaflow/runner/deployer_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class variable that matches the name of the CLI group.
The directory to run the subprocess in; if not specified, the current
directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the deployer attribute file.
The timeout until which we try to read the deployer attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the deployment command.
Expand Down Expand Up @@ -144,7 +144,7 @@ def _create(
# Additional info is used to pass additional deployer specific information.
# It is used in non-OSS deployers (extensions).
self.additional_info = content.get("additional_info", {})

command_obj.sync_wait()
if command_obj.process.returncode == 0:
return create_class(deployer=self)

Expand Down
4 changes: 3 additions & 1 deletion metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class Runner(object):
The directory to run the subprocess in; if not specified, the current
directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the runner attribute file.
The timeout until which we try to read the runner attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the `run` command.
Expand Down Expand Up @@ -326,6 +326,7 @@ def run(self, **kwargs) -> ExecutingRun:
show_output=self.show_output,
)
command_obj = self.spm.get(pid)
command_obj.sync_wait()

return self.__get_executing_run(attribute_file_fd, command_obj)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess command_obj.sync_wait() for Runner should happen inside self.__get_executing_run since we wanna do it after content = handle_timeout(...)


Expand Down Expand Up @@ -357,6 +358,7 @@ def resume(self, **kwargs):
show_output=self.show_output,
)
command_obj = self.spm.get(pid)
command_obj.sync_wait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here


return self.__get_executing_run(attribute_file_fd, command_obj)

Expand Down
2 changes: 2 additions & 0 deletions metaflow/runner/nbdeploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class NBDeployer(object):
base_dir : str, optional, default None
The directory to run the subprocess in; if not specified, the current
working directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the deployer attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` i.e. options
listed in `python myflow.py --help`
Expand Down
2 changes: 1 addition & 1 deletion metaflow/runner/nbrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class NBRunner(object):
The directory to run the subprocess in; if not specified, the current
working directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the runner attribute file.
The timeout until which we try to read the runner attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the `run` command.
Expand Down
4 changes: 3 additions & 1 deletion metaflow/runner/subprocess_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def run_command(
"""
Run a command synchronously and return its process ID.

Note: in no case does this wait for the process to *finish*. Use sync_wait()
to wait for the command to finish.

Parameters
----------
command : List[str]
Expand All @@ -145,7 +148,6 @@ def run_command(
command_obj = CommandManager(command, env, cwd)
pid = command_obj.run(show_output=show_output)
self.commands[pid] = command_obj
command_obj.sync_wait()
return pid

async def async_run_command(
Expand Down
24 changes: 12 additions & 12 deletions metaflow/runner/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def read_from_fifo_when_ready(
encoding : str, optional
Encoding to use while reading the file, by default "utf-8".
timeout : int, optional
Timeout for reading the file in milliseconds, by default 3600.
Timeout for reading the file in seconds, by default 3600.

Returns
-------
Expand All @@ -113,24 +113,24 @@ def read_from_fifo_when_ready(

while True:
poll_begin = time.time()
poll.poll(timeout)
timeout -= 1000 * (time.time() - poll_begin)

if timeout <= 0:
raise TimeoutError("Timeout while waiting for the file content")
poll.poll(1000 * timeout)
timeout -= time.time() - poll_begin

try:
data = os.read(fifo_fd, 128)
data = os.read(fifo_fd, 8192)
while data:
content += data
data = os.read(fifo_fd, 128)
data = os.read(fifo_fd, 8192)

# Read from a non-blocking closed FIFO returns an empty byte array
break

except BlockingIOError:
# FIFO is open but no data is available yet
continue
pass
finally:
if timeout <= 0:
raise TimeoutError("Timeout while waiting for the file content")

if not content and check_process_exited(command_obj):
raise CalledProcessError(command_obj.process.returncode, command_obj.command)
Expand All @@ -156,7 +156,7 @@ async def async_read_from_fifo_when_ready(
encoding : str, optional
Encoding to use while reading the file, by default "utf-8".
timeout : int, optional
Timeout for reading the file in milliseconds, by default 3600.
Timeout for reading the file in seconds, by default 3600.

Returns
-------
Expand Down Expand Up @@ -206,7 +206,7 @@ def handle_timeout(
command_obj : CommandManager
Command manager object that encapsulates the running command details.
file_read_timeout : int
Timeout for reading the file.
Timeout for reading the file, in seconds

Returns
-------
Expand Down Expand Up @@ -243,7 +243,7 @@ async def async_handle_timeout(
command_obj : CommandManager
Command manager object that encapsulates the running command details.
file_read_timeout : int
Timeout for reading the file.
Timeout for reading the file, in seconds

Returns
-------
Expand Down
Loading