-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
616 lines (468 loc) · 19.7 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
import contextlib
import dis
import glob
import importlib
import inspect
import itertools
import os
import pickle
import re
import shlex
import shutil
import signal
import subprocess
import sys
import textwrap
import threading
import time
import traceback
import typing
from collections.abc import Iterable, Iterator, Mapping, MutableSequence, Sequence
from ctypes import pythonapi as libpy, c_long, py_object
from dataclasses import dataclass
from enum import Enum
from inspect import FullArgSpec
from os import path
from subprocess import CompletedProcess
from threading import ExceptHookArgs
from time import time_ns as ns
from types import CodeType as Code, FrameType as Frame, FunctionType as Function, TracebackType as Traceback
from typing import Any, Callable, Generator, Optional, Self, TypeVar
if sys.version_info < (3, 12): exit(print("bt requires Python 3.12 or newer."))
__version__ = 6
assert __name__ == "bt" or "bt" not in sys.modules, f'bt\'s module name is "{__name__}" but "bt" is already in sys.modules'
bt = sys.modules[__name__]
"bt's main module."
sys.modules["bt"] = bt
type Runnable = Callable[[], Any]
Runnable = Runnable.__value__
"A function that can be called without arguments."
type FileSpecifier = str | typing.Iterable[FileSpecifier] | Callable[[], FileSpecifier]
FileSpecifier = FileSpecifier.__value__
"""A path or collection of paths."""
PY314 = sys.version_info >= (3, 14)
class Getter:
def __init__(this, getter): this.getter = getter
def __get__(this, owner, type = None): return this(owner)
def __call__(this, owner): return this.getter(owner)
class State(Enum):
NORMAL = 0
RUNNING = 1
DONE = 2
SKIPPED = 3
def isof(this, task): return this == task.state
class FlatList(list):
def transform(this, x):
return x
def copy(this):
copy = type(this)()
copy += this
return copy
def append(this, x):
if x := this.transform(x):
if isIterable(x): this.extend(x)
elif x: super().append(x)
def insert(this, i, x):
if x := this.transform(x):
if isIterable(x): this[i:i] = x
elif x: super().insert(i, x)
def extend(this, x):
if x := this.transform(x):
assert isIterable(x), f"{x!r} is a string or not iterable."
super().extend(x)
return this
def __setitem__(this, i, x):
if x := this.transform(x):
if isinstance(x, Iterable):
if not isinstance(i, slice): i = slice(i, i + 1)
if isinstance(x, str): x = [x]
super().__setitem__(i, x)
def __iadd__(this, x):
return this.extend(x)
def __add__(this, x):
return this.copy().__iadd__(x)
class Arguments(FlatList):
"""`Arguments` is a `list` derivative that stores a full or partial command line.
Only `None`, strings and `Iterable`s may be added;
`None` is discarded and every `Iterable` is flattened.
```python
source = ["main.c"]
exe = "foo"
options = "-Ofast -std=c2x"
command = Arguments("gcc", source, "-o", exe, options, parameter("o"))
print(command) # gcc main.c -o foo -Ofast -std=c2x
print(command.split()) # ['gcc', 'main.c', '-o', 'foo', '-Ofast', '-std=c2x']
"""
def __init__(this, *arguments):
for arg in arguments: this.append(arg)
def set(this, *arguments): this[:] = Arguments(arguments)
def transform(this, args):
if isinstance(args, str): return args.strip()
if isinstance(args, Arguments): return args
if isinstance(args, Iterable): return Arguments(*args)
if args: raise TypeError(f"{args!r} is not iterable or a string")
def split(this):
"Split this argument list's `__str__` into a list."
return shlex.split(str(this))
def __str__(this):
"Return all elements joined by spaces."
return " ".join(this)
def __iadd__(this, arguments):
this.append(arguments)
return this
@dataclass
class Files:
def __init__(this, *files):
this.files = {}
def flatten(f):
if isinstance(f, str): this.files[f] = None
elif isinstance(f, Mapping): flatten(f.values())
elif isinstance(f, Iterable):
for e in f: flatten(e)
elif callable(f): flatten(f())
else: raise AssertionError(f"{output!r} cannot be converted to a file (is not a string, a list, or callable).")
flatten(files)
def __iter__(this): return iter(this.files)
def __repr__(this): return f"Files({", ".join(this.files)})"
class Task:
def __init__(this, task: Runnable, options: dict[str, object]):
this._name: str = None
this.options = options
this.lazyopts: dict[str, TypeVar] = {}
this.spec: FullArgSpec
this.dependencies: list[Self | Runnable] = []
this.state = State.NORMAL
this.force = False
this.args = []
this.sourceFiles = []
this.outputFiles = []
this.cache = []
this.setFunction(task)
def __repr__(this): return f"<Task {this.name}>"
def __call__(this, *args, **kw):
if started:
for name, option in this.lazyopts.items(): getattr(this, name)
return this.fn(*args, *this.args[len(args):], **this.lazyopts, **kw)
del tasks[this.name]
this.dependencies.insert(0, this.fn)
this.setFunction(args[0])
tasks[this.name] = this
return this
@property
def name(this):
if "name" in this.lazyopts:
n = this.lazyopts["name"]
if callable(n): this.lazyopts["name"] = (n := n())
return n
if (name := this.options["name"]) is None: return this._name
return name
def setFunction(this, fn: Runnable):
this.fn = fn
this._name = getattr(fn, "__name__", f"#{len(tasks)}")
co: Code
if co := getattr(fn, "__code__", None):
vn = co.co_varnames[:co.co_argcount + co.co_kwonlyargcount]
kw = vn[len(vn) - co.co_kwonlyargcount:]
this.spec = inspect.FullArgSpec(
args = vn[:len(vn) - len(kw)],
varargs = bool(co.co_flags & inspect.CO_VARARGS),
varkw = bool(co.co_flags & inspect.CO_VARKEYWORDS),
defaults = fn.__defaults__,
kwonlyargs = kw,
kwonlydefaults = fn.__kwdefaults__,
annotations = this.lazyopts
)
options = vn[len(vn) - (len(kw) if (this.spec.varargs or kw) else co.co_argcount - co.co_posonlyargcount):]
options = [o for o in options if o in allOptions or error(this, f'"{o}" is not the name of an option')]
if annotate := getattr(fn, "__annotate__", None):
co = annotate.__code__
code = co.co_code
prefix = fn.__name__ + ".annotate."
i = 0
while i < len(code):
if dis.opname[code[i]] == "LOAD_CONST" and (name := co.co_consts[code[i + 1]]) in options:
start = i + 2
end = start
stack = 1
while (i := i + 2) < len(code):
op = code[i]
stack += dis.stack_effect(op, code[i + 1])
if stack == 2: end = i + 2
i = end
this.lazyopts[name] = Function(co.replace(
co_name = (n := prefix + name),
co_qualname = co.co_qualname.replace("__annotate__", n),
co_code = bytes([dis.opmap["RESUME"], 0, *code[start : end], dis.opmap["RETURN_VALUE"], 0])
), annotate.__globals__, n, ((".format", 1),))
else: i += 2
elif allOptions & getattr(fn, "__annotations__", {}).keys():
return error(this, "option annotations require Python 3.14 or newer")
else: this.spec = inspect.getfullargspec()
defaults = this.spec.defaults or ()
for o in options[:-len(defaults) or len(options)]:
o in this.lazyopts or error(this, f"option `{o}` does not have a value")
for o, value in zip(options[-len(defaults):], defaults):
this.options[o] = (this.options[o], value) if o in ["source", "input", "output"] else value
@staticmethod
def option(o: str, task: Task if PY314 else Self) -> Any:
v = vars(task)
v[o] = task.options[o]
if o in task.lazyopts:
value = task.lazyopts[o]()
v[o] = (v[o], value) if o in ["source", "input", "output"] else value
task.lazyopts[o] = v[o]
return v[o]
for state in State: vars()[state.name.lower()] = property(state.isof)
@contextlib.contextmanager
def measure(precision = 1e3):
t0 = ns()
try: yield None
finally: print((ns() - t0) / (1e9 / precision))
def isIterable(x): return isinstance(x, Iterable) and not isinstance(x, str)
def first[A](iterator: Iterator[A]) -> Optional[A]:
return next(iterator, None)
def group[A, B](iterable: Iterable[A], key: Callable[[A], B]) -> dict[list[B]]:
return {it[0]: list(it[1]) for it in itertools.groupby(sorted(iterable, key = key), key)}
def error(task: Optional[Task], message: str = None):
global errors
errors += not print(f"Task {task.name}: {message}." if task else message + ".")
def findTask(task: str | Runnable | Task, depender: Task = None, command = False) -> Optional[Task]:
if callable(task): return task
if (match := tasks.get(task, None)) and (not command or match.export):
return match
if task[-1:] == "!" and (match := tasks.get(task[:-1], None)) and (not command or match.export):
match.force = True
return match
error(depender, f'{"nN"[not depender]}o {["", "exported "][command]}task matched {task!r}')
global notFound
notFound = True
def registerTask(fn: Runnable, dependencies: Iterable, options):
task = Task(fn, options)
task.dependencies = [findTask(d, task) for d in dependencies]
tasks[task.name] = task
return task
def require(version: int):
"Exit with an error message if the version of bt is older than `version`."
if __version__ < version: exit(print(f"bt is version {__version__} but version {version} or newer is required."))
def task(*dependencies: str | Task | Runnable, name: Optional[str] = None, default = False, export = True, pure = False,
source: FileSpecifier = [], input: Optional[Any] = None, output: FileSpecifier = []) -> Task:
"""Declare a task named `name` to be run at most once from the command line or as a dependency.
Each dependency will run before the task.
If `default`, then the task will run when no tasks are specified in the command line.\n
If `export`, then it will be available in the command line.\n
If `pure`, then dependent tasks may be skipped even if this task runs.
If `source` or `output` is not an empty list or `input` is not `None`, then caching will be enabled.
`source` and `output` will be searched for files recursively.
Callables found therein will be converted into their results.
`source` may contain glob patterns.
The nonexistence of an exact file in `source` is an error.
All routines (as determined by `inspect.isroutine`) found recursively in `input`
will be evaluated just before the task runs.
The task will be skipped if
- caching is enabled
- no task dependency runs
- `input` and the source files' mtimes are the same values from the task's previous run
- and all output files exist."""
options = dict(list(locals().items())[:-1])
if dependencies and callable(dependencies[0]) and not isinstance(dependencies[0], Task):
return registerTask(dependencies[0], dependencies[1:], options)
return lambda fn: registerTask(fn, dependencies, options)
def parameter(name: str, default = None, require = False) -> str:
"""Return the value of the parameter `name` if it's set or else `default`.
If it's unset and not `require`, then print an error message and exit."""
assert isinstance(name, str), f"Parameter name ({name!r}) must be a string."
value = parameters.get(name, default)
if not value and require: exit(print(f'Parameter "{name}" must be set.'))
return value
def sh(*commandLine: Optional[str | Arguments | Iterable], shell = True, text = True, **kwargs) -> CompletedProcess[str]:
"""Wrap `subprocess.run` with the defaults `shell = True` and `text = True`.
Convert `commandLine` into an `Arguments` and then a string."""
return subprocess.run(str(Arguments(commandLine)), shell = shell, text = text, **kwargs)
def shout(*args, capture_output = True, **kwargs) -> str:
"Wrap `sh` with `capture_output = True` and return the command's `stdout`."
return sh(*args, capture_output = capture_output, **kwargs).stdout
def outdent(text: str) -> str:
"""Outdent `text` and strip leading and trailing whitespace.
If the last line contains only whitespace, then include a trailing newline."""
return textwrap.dedent(text).strip() + "\n"[not re.search(r"\n\s*\Z", text):]
def read(file: str) -> str:
"`open`, read and close the `file` and return its contents."
with open(file) as fo: return fo.read()
def write(file: str, contents: str):
"`open`, write `contents` to and close the `file`."
with open(file, "w") as fo: fo.write(contents)
def mkdir(path: str, new = False):
"""Make a directory with the given `path`, also making its ancestors if necessary.
If `new` and the path already exists, then raise an exception."""
os.makedirs(path, exist_ok = not new)
def rm(path: str):
"Remove the specified path recursively if it exists."
if os.path.isdir(path) and not os.path.islink(path): shutil.rmtree(path)
elif os.path.exists(path): os.remove(path)
def start():
global started
started = True
if errors: return
for task in tasks.values():
if not isinstance(task.default, int): error(task, f"default ({task.default!r}) is not a Boolean value")
if not isinstance(task.export, int): error(task, f"export ({task.export!r}) is not a Boolean value")
e = errors
initialTasks = [findTask(task, command = True) for task in cmdTasks] or [task for task in tasks.values() if task.default]
if notFound or errors > e: return print("Exported tasks are listed below.", *(name for name, task in tasks.items() if task.export), sep = "\n")
if initialTasks: initialTasks[-1].args = args
def recurse(depth: int, all: dict[Task, int], tasks: Iterable[Task]):
for task in tasks:
if all.get(task, -1) < depth:
all[task] = depth
recurse(depth + 1, all, (d for d in task.dependencies if isinstance(d, Task)))
selectedTasks: dict[Task, int] = {}
recurse(0, selectedTasks, initialTasks)
initialTasks.sort(key = lambda t: selectedTasks[t])
for task in selectedTasks:
arity = len(task.spec.args or ()) + len(task.spec.kwonlyargs or ()) - len(task.lazyopts)
min = arity - len(task.spec.defaults or ())
count = len(task.args)
if count < min or (count > arity and not task.spec.varargs):
error(task, f"received {count} argument{["s", ""][count == 1]} instead of {min}{[[f"-{arity}", ""][min == arity], " or more"][task.spec.varargs]}")
if errors: return
cache = {}
if path.exists(CACHE):
with open(CACHE, "br") as file:
try:
c = pickle.load(file)
assert isinstance(c, Mapping)
cache = c
except Exception as e:
print(CACHE + " is corrupt.")
print(e)
linesWritten = 0
def run(task: Task, parent: Task = None, initial = False):
if task.running: error(None, f'Circular dependency detected between tasks "{parent.name}" and "{task.name}".')
if not task.normal: return
task.state = State.RUNNING
skip = True
for dependency in task.dependencies:
if isinstance(dependency, Task):
run(dependency, task)
if dependency.done and not dependency.pure: skip = False
else: dependency()
global current
current = task
def getFiles(source, flat, errorMessage, container = None):
if container is None: container = source
if isinstance(source, str): flat.append(source)
elif isinstance(source, Mapping): getFiles(source.values(), flat, errorMessage, container)
elif isinstance(source, Iterable):
for o in source: getFiles(o, flat, errorMessage, container)
elif callable(source): getFiles(source(), flat, errorMessage, container)
else: error(task, errorMessage(source))
if task.source != []:
files = []
getFiles(task.source, files, lambda source: f"source file {source!r} is not a string, iterable, or callable")
for file in files:
if glob.has_magic(file): task.sourceFiles += glob.glob(file, include_hidden = True, recursive = True)
elif path.exists(file): task.sourceFiles.append(file)
else: error(task, f'source file "{file}" does not exist')
if task.input is not None:
def flatten(inputs):
if inspect.isroutine(inputs): inputs = inputs()
if isinstance(inputs, Mapping): inputs = list(inputs.values())
elif isinstance(inputs, Iterable) and not isinstance(inputs, str | MutableSequence): inputs = list(inputs)
if isIterable(inputs):
for i, input in enumerate(inputs):
inputs[i] = flatten(input)
return inputs
task.cache = flatten(task.input or 0)
task.cache = task.cache, [path.getmtime(source) for source in task.sourceFiles]
if task.output != []: getFiles(task.output, task.outputFiles, lambda o: f"output {o!r} is not a file (a string, iterable, or callable)")
if errors: return
if (skip and not (task.force or force == 1 and initial or force >= 2) and task.cache == cache.get(task.name, None)
and (task.source != [] or task.input is not None or task.outputFiles) and all(path.exists(output) for output in task.outputFiles)):
task.state = State.SKIPPED
return
for directory in {path.dirname(path.abspath(output)) for output in task.outputFiles}:
mkdir(directory)
nonlocal linesWritten
if debug:
if linesWritten > 1: print()
print(">", task.name)
linesWritten = 0
def redirect(stream):
write0 = stream.write
def write(s):
nonlocal linesWritten
linesWritten += s.count("\n")
write0(s)
stream.write = write
return write0
write10, write20 = redirect(sys.stdout), redirect(sys.stderr)
try: task()
finally: sys.stdout.write, sys.stderr.write = write10, write20
task.state = State.DONE
for task in initialTasks: run(task, initial = True)
cache.update((task.name, task.cache) for task in tasks.values() if task.done)
with open(CACHE, "bw") as file:
pickle.dump(cache, file)
def walkTrace(tb: Traceback) -> Generator[Traceback, None, None]:
yield tb
while tb := tb.tb_next: yield tb
def defer():
def handleException(type, value: BaseException, tb: Traceback, thread, hook):
if thread == caller: start.cancel = True
if type != KeyboardInterrupt:
tb = next((tb1 for tb1 in walkTrace(tb) if tb1.tb_frame.f_code.co_filename == bs), tb)
hook(type, value.with_traceback(tb), tb, thread)
def sigint(signal: int, frame: Frame):
sae = libpy.PyThreadState_SetAsyncExc
sae.argtypes = (c_long, py_object)
for thread in threading.enumerate():
if thread.is_alive(): sae(thread.ident, py_object(KeyboardInterrupt))
signal.signal(signal.SIGINT, sigint)
start.cancel = False
caller = threading.current_thread()
thread = threading.Thread(target = lambda: (caller.join(), start.cancel or start()), daemon = False)
sys.excepthook = lambda *a, hook = sys.excepthook: handleException(*a, threading.current_thread(), lambda *a: hook(*a[:3]))
threading.excepthook = lambda a, hook = threading.excepthook: handleException(*a, lambda *a: hook(ExceptHookArgs(a)))
thread.start()
mkdir(DIR)
def main(loadModule):
defer()
if entry := first(entry for entry in ["bs.py", "bs"] if path.exists(entry)):
global bs
bs = DIR + "/bs"
if not os.path.lexists(bs): os.symlink(os.path.abspath(entry), bs)
loadModule("bs", bs)
else: exit(print("No build script (bs or bs.py) was found."))
allOptions = {o: None for o in task.__code__.co_varnames[:task.__code__.co_kwonlyargcount]}
for o in allOptions:
if o != "name": setattr(Task, o, Getter(Task.option.__get__(o)))
debug = False
"""Whether to print debugging information.
Currently only names of tasks before they run are printed."""
current: Task = None
"The task that is currently running."
exports = bt, Arguments, Files, Task, mkdir, outdent, parameter, require, read, rm, sh, shout, task, write
exports = {export.__name__: export for export in exports} | {"FileSpecifier": FileSpecifier, "Runnable": Runnable, "path": path}
__all__ = list(exports)
DIR = ".bt"
CACHE = DIR + "/cache"
tasks: dict[str, Task] = {}
started = False
errors = 0
notFound = False
bs = ()
args0 = sys.argv[1:]
if "--" in args0 and ~(split := args0.index("--")):
args0, args = args0[:split], args0[split + 1:]
else: args = []
args1 = [a for a in args0 if a != "!"]
force = len(args0) - len(args1)
args1 = group(args1, lambda a: "=" in a)
cmdTasks = args1.get(False, [])
parameters: dict[str, str] = dict(arg.split("=", 2) for arg in args1.get(True, []))
f: Frame = sys._getframe()
while f := f.f_back:
if dis.opname[(co := f.f_code).co_code[i := f.f_lasti]] in ["IMPORT_NAME", "IMPORT_FROM"] and "__main__" not in co.co_names[co.co_code[i + 1]]:
os.chdir(path.dirname(path.realpath(sys.argv[0])))
defer()
break