-
Notifications
You must be signed in to change notification settings - Fork 2
/
stripe.nim
192 lines (178 loc) · 9.98 KB
/
stripe.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
{.push hint[Performance]: off.} # No warn about token copy in bu/execstr
import std/[strutils, parseutils, os, posix, random],
cligen, cligen/posixUt, bu/execstr
when not declared(stderr): import std/syncio
when defined(release): randomize()
proc `$`(tv: Timeval): string = # For Rusage.ru_utime, Rusage.ru_stime
$clong(tv.tv_sec) & "." & intToStr(tv.tv_usec, 6)
proc timeOfDay(): Timespec = discard clock_gettime(CLOCK_REALTIME, result)
proc `$`(t: Timespec):string = $clong(t.tv_sec) & intToStr(t.tv_nsec div 1000,6)
proc ERR(x: string) = stderr.write(x)
const BefDfl = "$tm \e[1mslot: $nm $cmd\e[m"
const AftDfl = "$tm \e[7mslot: $nm usr: $u sys: $s\e[m"
const IrpDfl = "$tm interrupt $nm after $w: $cmd"
var bef, aft, irp: string; var binsh,fancy,numMo,putSN: bool # How prog called
var sh_cmd: int # Shared between bg_setup() & bg()
var sh_av: cstringArray
var dSlot = 0 # Signal handlers access these globals
var sumSt, nKid: int # sum(exCodes) & num live kids
var rs: seq[tuple[pid: Pid; nm, sub, cmd: string; t0: Timespec]]
proc rsFind(p: Pid): int =
result = -1; for i, r in rs: (if r.pid == p: return i)
proc overloaded(loadSlot, n: int): bool =
proc getloadavg(avs: pointer, nelem: int): int {.importc, header: "stdlib.h".}
var avs: array[3, float]
if loadSlot notin 0..2: return false # Could raise ValueError
let got = getloadavg(avs[0].addr, 3)
if got == -1 or loadSlot > got: return false
avs[loadSlot] > n.float # Fastest=300s-avg means sleep >= 3 sec
proc maybeSleep(secs: float; seqNo, load: int) =
if secs > 0.0 and seqNo > 1: discard usleep(Useconds(secs * 1000000))
elif secs < 0.0 and seqNo > 1: discard usleep(Useconds(rand(-secs * 1000000)))
while overloaded(load,rs.len): discard usleep(Useconds(max(3.0,secs.abs)*1e6))
iterator lines2(f: File, tot: var int): string =
if "$tot" in bef: # `bef` requests $tot =>read all upfront
var all: seq[string] # std/sugar.collect fails in iterators?
for line in f.lines: all.add line
tot = all.len # Provide for $tot interpretation
for line in all: yield line # ..as in $seq/$tot message with -b
else:
for line in f.lines: yield line
proc bg_setup(run: string): File = #NOTE: Immediately dups stdin for result
discard open(result, dup(0), fmRead) # Keep kids away from stdin; FD_CLOEXEC
discard close(0) #..borks due to misinterp.of reused fd0
discard open("/dev/null", O_RDONLY) # Replace original fd0 with /dev/null
binsh = run == "/bin/sh"
if binsh: return
var sh = run.split() # PRE-ARRANGE so can do _av[_cmd]=cmd
if sh[^1] != "-c": sh.add("-c") # Assume -c option has sh meaning
sh_cmd = len(sh) # array slot of cmd string
sh.add("dummyArg")
sh_av = allocCStringArray(sh)
proc bg(cmd: string; seqNo, i, tot: int): Pid =
if bef.len + aft.len + irp.len > 1: # Want t0 for wall if any fmt nontrivial
rs[i].t0 = timeOfDay()
if bef.len > 1: # `> 1` since \n is always appended.
ERR bef%["tm",$rs[i].t0, "i",$i, "nm",rs[i].nm, "sub",rs[i].sub, "cmd",cmd,
"seq",$seqNo, "tot",$tot]
if aft.len+irp.len>1: rs[i].cmd = cmd # Maybe save for `wait`|interrupt report
if putSN: putEnv "STRIPE_SEQ", $seqNo # Sequence number export was requested
putEnv("STRIPE_SLOT", $i) # This & next both cycle over small sets
if rs[i].sub.len != 0: #..So, could maybe be optimized to save
putEnv("STRIPE_SUB", rs[i].sub) #..0.5-1 microsec so per kid launch.
var pid: Pid = vfork() # MUST BE C-ish between vfork & exec
if pid == -1: exitnow(3) # vfork failed! => DIE NOW
if pid != 0: return pid # Parent returns
if binsh: # Replace kid w/my "execstr" of cmd
if execStr(cmd) == 0: exitnow(0) # Auto-fallback to /bin/sh -c 'cmd'
else:
sh_av[sh_cmd] = cstring(cmd)
discard execvp(sh_av[0], sh_av) # Replace kid w/shell to run cmd
ERR("Cannot run \"$#\"\n" % cmd)
exitnow(113) # ..or exit kid on failed exec
proc wait(): int =
var ru: Rusage
var st: cint
var i = -1
while i < 0: # 1st|pid from prior prog of same parent
let pid = wait4(Pid(-1), addr st, cint(0), addr ru)
i = rsFind(pid) # Unfound => not our kid! (Pre-exec)
nKid -= 1 # Count kid & accum exit status
rs[i].pid = 0.Pid # Mark run slot free (Maybe unneeded)
if WIFEXITED(st): sumSt += WEXITSTATUS(st)
if aft.len > 1: # Maybe report rusage
let t1 = timeOfDay(); var w, ct: Timeval; var pc: string; var mr: string
if fancy:
let dt = t1 - rs[i].t0; w = dt.nsToTimeval
let tSched = ru.ru_utime.tv_sec.int*1_000_000 + ru.ru_utime.tv_usec +
ru.ru_stime.tv_sec.int*1_000_000 + ru.ru_stime.tv_usec
ct.tv_sec = Time(tSched div 1_000_000); ct.tv_usec = tSched mod 1_000_000
pc = formatFloat(tSched.float * 1e5 / dt.float, ffDecimal, 1) #%cpu
mr = formatFloat(ru.ru_maxrss.float/1024.0, ffDecimal, 1)#MiB RSS
ERR aft%["tm",$t1, "i",$i, "nm",rs[i].nm, "sub",rs[i].sub, "cmd",rs[i].cmd,
"w",$w, "pcpu",pc, "m",mr, "u",$ru.ru_utime, "s",$ru.ru_stime, "ct",$ct]
rs[i].cmd.setLen 0
i
proc stripe(jobs: File, secs = 0.0, load = -1): int =
var nSlot = rs.len
var seqNo = 1; var tot = 0
for cmd in lines2(jobs, tot): # Get a cmd, maybe wait for slot
var i = if nKid == nSlot: wait() else: rsFind(0.Pid)
if numMo: # MAYBE ADJUST NUMBER OF RUN SLOTS
let diff = dSlot # Minimize real time window for..
dSlot = 0 # ..signal deliveries to be lost.
if diff > 0: # At least one SIGUSR1 during wait
let n = min(1024, nSlot + diff)
for k in nSlot ..< n: rs.add (0.Pid, $k, "", "", rs[0].t0)
elif diff < 0: # At least one SIGUSR2 during wait
let n = max(1, nSlot + diff) # NOTE: wait does the nKid -= 1
while nKid + 1 > n: # nKid usually starts @nSlot-1, but SIG
rs.delete i # ..can hit while still ramping up kids.
i = wait() # ..Either way wait until nKid=n is ok.
nSlot = rs.len
maybeSleep(secs, seqNo, load) # MAYBE SLEEP BEFORE LAUNCH
rs[i].pid = bg(cmd, seqNo, i, tot)
nKid += 1 # Count kid as spawned
seqNo += 1
while nKid > 0: discard wait() # No more new=>Wait for any until 0 kids
sumSt # Exit w/informative status
proc CLI(run="/bin/sh", nums=false, secs=0.0, load = -1, before="", after="",
irupt="", posArgs: seq[string]) =
## where `posArgs` is either a number `<N>` *or* `<sub1 sub2..subM>`, reads
## job lines from *stdin* and keeps up to `N` | `M` running at once.
##
## In sub mode, each job has **$STRIPE_SUB** set, in turn, to `subJ`. Eg.:
## ``find . -printf "ssh $STRIPE_SUB FileJob \'%P\'\\n" | stripe X Y``
## runs `FileJob`\s first on host X then on host Y then on whichever finishes
## first. Repeat `X` or `Y` to keep more jobs running on each host.
##
## **$STRIPE_SLOT** (arg slot index) & optionally **$STRIPE_SEQ** (job seqNum)
## are also provided to jobs. In `N`-mode `SIGUSR[12]` (in|de)creases `N`.
## If `before` uses `$tot`, job lines are read upfront to provide that count.
if len(posArgs) < 1:
raise newException(ValueError, "Too few posArgs; need { num | 2+ slots }")
putSN = nums
bef = (if before in ["d", "D"]: BefDfl else: before) & "\n"
aft = (if after in ["d", "D"]: AftDfl else: after ) & "\n"
irp = (if irupt in ["d", "D"]: IrpDfl else: irupt ) & "\n"
fancy = "$w" in aft or "$pcpu" in aft or "$m" in aft
numMo = posArgs.len == 1
if numMo: # FIXED NUM JOBS MODE
var n: int; if parseInt(posArgs[0], n) == 0 or n <= 0:
raise newException(ValueError, "Only one slot but not a positive int.")
rs.setLen n # impossible zero PIDs
for i in 0 ..< n: rs[i].nm = $i # slot names == nums
if n == 1 and(let ss = getEnv("STRIPE_SUB", ""); ss.len > 0): rs[0].sub = ss
else: # STRIPE ID SUBST MODE
rs.setLen posArgs.len # impossible zero PIDs
for i, a in posArgs: rs[i].nm = a; rs[i].sub = a # $STRIPE_SLOT,_SUB
try:
quit(min(127, stripe(run.bg_setup, secs, load)))
except IOError:
stderr.write "No file descrip 0/stdin | stdout/err output space issue.\n"
quit(min(127, sumSt))
when isMainModule:
proc ctrlC() {.noconv.} =
if irp.len > 1: # interrupt reports requested
let t1 = timeOfDay(); var w: Timeval
for i, r in rs:
if r.cmd.len>0: (w = nsToTimeval(t1 - r.t0); ERR irp %
["tm",$t1, "i",$i, "nm",r.nm, "w",$w, "cmd",r.cmd, "sub",r.sub])
quit(min(127, sumSt)) # stdlib saturates at 127
setControlCHook(ctrlC)
proc sigu12(signo: cint) {.noconv.} =
if signo == SIGUSR1: inc(dSlot) # SIGUSR1 increases N
elif signo == SIGUSR2: dec(dSlot) # SIGUSR2 decreases N
signal(SIGUSR1, sigu12); signal(SIGUSR2, sigu12)
include cligen/mergeCfgEnv
dispatch CLI, cmdName = "stripe",
help={"run" : "run job lines via this interpreter",
"nums" : "provide **STRIPE_SEQ** to job procs",
"secs" : "sleep `SECS` before running each job",
"load" : "0/1/2: 1/5/15-minute load average < `N`",
"before":"""\"D\": $tm \\e[1mslot: $nm $cmd\\e[m
also: slot \$i \$seq \$tot""",
"after" :"""\"D\": $tm \\e[7mslot: $nm usr: $u sys: $s\\e[m
also: slot \$i wall \$w MiBRSS \$m \$ct \$pcpu \$cmd""",
"irupt" :"""\"D\": $tm interrupt $nm after $w: $cmd
also: slot \$i substitution \$sub"""}