Skip to content

Commit

Permalink
refactor: util for pooled subprocesses
Browse files Browse the repository at this point in the history
  • Loading branch information
jesteria committed Mar 14, 2023
1 parent 7cfd54f commit 3f1b222
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 36 deletions.
34 changes: 19 additions & 15 deletions src/netrics/measurement/hops_traceroute.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from schema import Optional

from netrics import task
from netrics.util.iterutils import sequence
from netrics.util import (
iterutils,
procutils,
)

from .common import (
default,
Expand Down Expand Up @@ -56,8 +59,8 @@ def main(traceroute, params):
"""
# parallelize traceroutes
processes = {
destination: subprocess.Popen(
pool = [
subprocess.Popen(
(
traceroute,
'--max-hop', params.max_hop,
Expand All @@ -69,36 +72,37 @@ def main(traceroute, params):
stderr=subprocess.PIPE,
text=True,
) for destination in params.destinations
}
]

# wait and collect outputs
outputs = {destination: process.communicate() for (destination, process) in processes.items()}
# wait and map to completed processes
processes = {
destination: procutils.complete(process)
for (destination, process) in zip(params.destinations, pool)
}

# parse results
hop_results = [
HopResult.extract(
destination,
process,
*outputs[destination],
)
for (destination, process) in processes.items()
]

# check for exceptions
(successes, failures) = sequence(operator.attrgetter('hops'), hop_results)
(successes, failures) = iterutils.sequence(operator.attrgetter('hops'), hop_results)

fail_total = len(failures)

for (fail_count, hop_result) in enumerate(failures, 1):
process = processes[hop_result.dest]
(stdout, stderr) = outputs[hop_result.dest]

task.log.error(
dest=hop_result.dest,
status=f'Error ({process.returncode})',
failure=f"({fail_count}/{fail_total})",
stdout=stdout,
stderr=stderr,
stdout=process.stdout,
stderr=process.stderr,
)

if not successes:
Expand Down Expand Up @@ -143,11 +147,11 @@ class HopResult(typing.NamedTuple):
hops: typing.Optional[int]

@classmethod
def extract(cls, dest, process, stdout, stderr):
def extract(cls, destination, process):
"""Construct object from traceroute result."""
if process.returncode == 0:
try:
(*_earlier_lines, last_line) = stdout.splitlines()
(*_earlier_lines, last_line) = process.stdout.splitlines()

(hop_count, _line_remainder) = last_line.strip().split(' ', 1)

Expand All @@ -164,6 +168,6 @@ def extract(cls, dest, process, stdout, stderr):
#
pass
else:
return cls(dest, hop_int)
return cls(destination, hop_int)

return cls(dest, None)
return cls(destination, None)
45 changes: 24 additions & 21 deletions src/netrics/measurement/ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from schema import Optional

from netrics import task
from netrics.util import procutils

from .common import default, output, require_lan

Expand Down Expand Up @@ -69,8 +70,8 @@ def main(params):
"""
# parallelize pings
processes = {
destination: subprocess.Popen(
pool = [
subprocess.Popen(
(
'ping',
'-c', params.count,
Expand All @@ -82,37 +83,39 @@ def main(params):
stderr=subprocess.PIPE,
text=True,
) for destination in params.destinations
}
]

# wait and collect outputs
outputs = {destination: process.communicate() for (destination, process) in processes.items()}
# wait and map to completed processes
processes = {
destination: procutils.complete(process)
for (destination, process) in zip(params.destinations, pool)
}

# check for exceptions
failures = [
(destination, process, outputs[destination])
for (destination, process) in processes.items()
if process.returncode not in PING_CODES
]
failures = [(destination, process) for (destination, process) in processes.items()
if process.returncode not in PING_CODES]

if failures:
total_failures = len(failures)
fail_total = len(failures)

# directly log first 3 failures
for (fail_count, (destination, process, (stdout, stderr))) in enumerate(failures[:3], 1):
# directly log first few failures
some_failures = failures[:3]

for (fail_count, (destination, process)) in enumerate(some_failures, 1):
task.log.critical(
dest=destination,
status=f'Error ({process.returncode})',
failure=f"({fail_count}/{total_failures})",
args=process.args[:-1],
stdout=stdout,
stderr=stderr,
failure=f"({fail_count}/{fail_total})",
args=process.args,
stdout=process.stdout,
stderr=process.stderr,
)

if fail_count < total_failures:
if fail_count < fail_total:
task.log.critical(
dest='...',
status='Error (...)',
failure=f"(.../{total_failures})",
failure=f"(.../{fail_total})",
args='...',
stdout='...',
stderr='...',
Expand All @@ -129,8 +132,8 @@ def main(params):

# parse detailed results
results = {
destination: output.parse_ping(stdout)
for (destination, (stdout, _stderr)) in outputs.items()
destination: output.parse_ping(process.stdout)
for (destination, process) in processes.items()
}

# label results
Expand Down
28 changes: 28 additions & 0 deletions src/netrics/util/procutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import subprocess


def complete(process: subprocess.Popen) -> subprocess.CompletedProcess:
"""Allow a subprocess to complete and return a `CompletedProcess`.
`CompletedProcess` is otherwise returned only by the high-level
function `subprocess.run`. Here, `run`-style functionality is
recreated for a given `Popen` object:
* the process is allowed to complete (and the invoking thread is stalled)
* standard output and error are collected (via `communicate()`)
* a `CompletedProcess` is returned reflecting the process's command
argumentation, returncode, standard output and error
This is of primary use to subprocesses launched via `Popen` for the
purpose of parallelization. (Otherwise, higher-level interfaces may
be used.)
"""
(stdout, stderr) = process.communicate()

return subprocess.CompletedProcess(
process.args,
process.returncode,
stdout,
stderr,
)

0 comments on commit 3f1b222

Please sign in to comment.