From 1bfc02565b91648110385215b7ca301646229779 Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 10 Jul 2023 02:15:08 -0400 Subject: [PATCH] version 2.3.0 - add support for multiple processes in separate windows via multiplexer --- README.md | 3 +++ pyxargs.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index a4e93f6..6f29811 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,9 @@ options: executes 'from import *' for each library --pre "code" runs exec(code) before execution --post "code" runs exec(code) after execution + -P P, --procs P split into P chunks and execute each chunk in parallel + as a separate process and window with byobu or tmux + -c c, --chunk c runs chunk c of P (0 <= c < P) (without multiplexer) -i, --interactive prompt the user before executing each command, only proceeds if response starts with 'y' or 'Y' -n, --dry-run prints commands without executing them diff --git a/pyxargs.py b/pyxargs.py index 0097753..c1c258e 100644 --- a/pyxargs.py +++ b/pyxargs.py @@ -17,16 +17,21 @@ import argparse import os +import pickle +import pty import re import shlex +import shutil import signal import subprocess import sys +import tempfile import textwrap +import time import typing -__version__: typing.Final[str] = "2.2.0" +__version__: typing.Final[str] = "2.3.0" def replace_surrogates(string: str) -> str: @@ -312,6 +317,12 @@ def _split_lines(self, text, width): help="runs exec(code) before execution") parser.add_argument("--post", type=str, default="", metavar=("\"code\""), dest="post", help="runs exec(code) after execution") + parser.add_argument("-P", "--procs", type=int, default=None, metavar="P", dest="procs", + help="split into P chunks and execute each chunk in parallel as a separate process and window with byobu or tmux") + parser.add_argument("-c", "--chunk", type=int, default=None, metavar="c", dest="chunk", + help="runs chunk c of P (0 <= c < P) (without multiplexer)") + parser.add_argument("--_command_pickle", nargs=2, default=None, dest="command_pickle", + help=argparse.SUPPRESS) parser.add_argument("-i", "--interactive", action="store_true", dest="interactive", help="prompt the user before executing each command, only proceeds if response starts with 'y' or 'Y'") parser.add_argument("-n", "--dry-run", action="store_true", dest="dry_run", @@ -327,7 +338,9 @@ def _split_lines(self, text, width): if args.input_mode in ["f", "p", "a", "s"]: short_forms = {"f": "file", "p": "path", "a": "abspath", "s": "stdin"} args.input_mode = short_forms[args.input_mode] - if args.input_mode is None: + if args.command_pickle is not None: + args.input_mode = args.command_pickle[0] + elif args.input_mode is None: if not sys.stdin.isatty(): stdin = sys.stdin.read() args.input_mode = "stdin" @@ -354,10 +367,45 @@ def _split_lines(self, text, width): else: assert not args.null, f"invalid option --null for input mode: {args.input_mode}" assert not args.delim, f"invalid option --delimiter for input mode: {args.input_mode}" + assert args.procs is None or args.procs > 0, "invalid option --procs: requires P > 0" + assert args.chunk is None or args.procs is not None, "invalid option --chunk: --procs must be specified" + assert args.chunk is None or 0 <= args.chunk < args.procs, "invalid option --chunk: requires 0 <= c < P" + assert args.command_pickle is None or args.chunk is not None, "invalid option --_command_pickle: --chunk must be specified" # build and run commands if len(args.command) >= 1: - command_dicts = build_commands(args, stdin) - return execute_commands(args, command_dicts) + # build commands or load them from pickle if available + if args.command_pickle is None: + command_dicts = build_commands(args, stdin) + else: + with open(args.command_pickle[1], "rb") as f: + command_dicts = pickle.load(f) + # run subprocesses with multiplexer if requested + if args.procs is not None and args.chunk is None: + multiplexer = "byobu" if shutil.which("byobu") else "tmux" if shutil.which("tmux") else None + assert multiplexer is not None, "multiplexer not found: install byobu or tmux" + session = time.strftime("pyxargs_%Y%m%d_%H%M%S") + command_pickle = tempfile.NamedTemporaryFile() + pickle.dump(command_dicts, command_pickle.file) + command_pickle.file.flush() + pyxargs_command = [sys.executable, os.path.abspath(__file__), "--chunk", "0", "--_command_pickle", args.input_mode, command_pickle.name] + sys.argv[1:] + subprocess.run([multiplexer, "new-session", "-d", "-s", session, shlex.join(pyxargs_command)]) + for i in range(1, args.procs): + pyxargs_command[3] = str(i) + subprocess.run([multiplexer, "new-window", "-t", f"{session}:{i}", shlex.join(pyxargs_command)]) + if stdin: + sys.stdin = sys.__stdin__ = open("/dev/tty") + os.dup2(sys.stdin.fileno(), 0) + pty.spawn([multiplexer, "attach-session", "-t", session]) + else: + subprocess.run([multiplexer, "attach-session", "-t", session]) + return 0 + # execute commands (in chunks if requested) + if args.chunk is None: + return execute_commands(args, command_dicts) + else: + _ = execute_commands(args, command_dicts[args.chunk::args.procs]) + _ = input(f"Chunk {args.chunk} complete. Press enter to exit. ") + return 0 else: parser.print_usage() return 2