Skip to content

Commit

Permalink
output redirection for different types of output
Browse files Browse the repository at this point in the history
  • Loading branch information
zardus committed Feb 1, 2024
1 parent 64d8186 commit d6aa8ea
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 26 deletions.
63 changes: 44 additions & 19 deletions chio.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/python3 -I
#!/usr/bin/env python3

import subprocess
import argparse
Expand All @@ -20,22 +20,33 @@
# Output
#

_last_type = None
def print_msg(mtype, *msgs):
global _last_type #pylint:disable=global-statement

fd = getattr(_args, f"chio_{mtype}_fd")
f = sys.stdin if fd == 0 else sys.stdout if fd == 1 else sys.stderr if fd == 2 else os.fdopen(fd)
if _last_type and _last_type != mtype:
print("", file=f)
_last_type = mtype
print(f"[{mtype.upper()}]", *msgs, file=f)

def print_info(*msgs):
print("[INFO]", *msgs)
print_msg("info", *msgs)
def print_warn(*msgs):
print("[WARN]", *msgs)
print_msg("warn", *msgs)
def print_hint(*msgs):
print("[HINT]", *msgs)
print_msg("hint", *msgs)
def print_test(*msgs):
print("[TEST]", *msgs)
print_msg("test", *msgs)
def print_pass(*msgs):
print("[PASS]", *msgs)
print_msg("pass", *msgs)
def print_fail(*msgs):
print("[FAIL]", *msgs)
print_msg("fail", *msgs)
def print_flag(*msgs):
print("[FLAG]", *msgs)
print_msg("flag", *msgs)
def print_hype(*msgs):
print("[HYPE]", *msgs)
print_msg("hype", *msgs)

#
# Checking processes.
Expand All @@ -58,12 +69,10 @@ def check_exe_basename(process, basename):

def check_ipython(process):
print_test("We will now check that that the process is an interactive ipython instance.")
print_info("")
print_info("Since ipython runs as a script inside python, this will check a few things:")
print_info("1. That the process itself is python.")
print_info("2. That the module being run in python is ipython.")
print_info("If the process being checked is just a normal 'ipython', you'll be okay!")
print_info("")
check_exe_basename(process, 'python')
assert os.path.basename(process.cmdline()[1]).startswith('ipython'), "It does not look like the module being run is ipython."
assert len(process.cmdline()) == 2, "ipython must be running in its default, interactive mode (i.e., ipython with no commandline arguments)."
Expand Down Expand Up @@ -143,9 +152,10 @@ def resolve_fd_path(pid, fd):
def name_fd(fd):
return "stdin" if fd == 0 else "stdout" if fd == 1 else "stderr" if fd == 2 else f"file descriptor {fd}"

def check_fd_path(fd, path):
print_test(f"I will now check that you redirected {path} to/from my {name_fd(fd)}.")
print_hint("")
def check_fd_path(fd, path, verbose=False):
if verbose:
print_test(f"I will now check that you redirected {path} to/from my {name_fd(fd)}.")

print_hint("File descriptors are inherited from the parent, unless the FD_CLOEXEC is set by the parent on the file descriptor.")
print_hint("For security reasons, some programs, such as python, do this by default in certain cases. Be careful if you are")
print_hint("creating and trying to pass in FDs in python.")
Expand Down Expand Up @@ -317,8 +327,11 @@ def check_signals(num, myrand=random):
#

def do_checks(args):
print_info("This challenge will now perform a bunch of checks.")
print_info("If you pass these checks, you will receive the flag.")
print_info("This challenge will perform a bunch of checks.")
if args.reward:
print_info(f"If you pass these checks, you will receive the {args.reward} file.")
else:
print_info("Good luck!")

if args.parent:
print_test("Performing checks on the parent process of this process.")
Expand Down Expand Up @@ -502,18 +515,31 @@ def add_argument(parser, arg, **kwargs):
add_argument(_parser, "--num_signals", type=int, nargs='?', help="the challenge will require the parent to send number of signals")
add_argument(_parser, "--reward", type=str, nargs='?', help="the challenge will output a reward file if all the tests pass")

# chio behaviors
#pylint:disable=consider-using-with
add_argument(_parser, "--chio_info_fd", type=int, default=2, help="file to write info to")
add_argument(_parser, "--chio_warn_fd", type=int, default=2, help="file to write warnings to")
add_argument(_parser, "--chio_hint_fd", type=int, default=2, help="file to write hints to")
add_argument(_parser, "--chio_test_fd", type=int, default=2, help="file to write things we're about to test to")
add_argument(_parser, "--chio_pass_fd", type=int, default=2, help="file to write pass messages to")
add_argument(_parser, "--chio_fail_fd", type=int, default=2, help="file to write fail messages to")
add_argument(_parser, "--chio_flag_fd", type=int, default=2, help="file to write the flag to")
add_argument(_parser, "--chio_hype_fd", type=int, default=2, help="file to write hype to")


# remaining arguments
_parser.add_argument("old_args", nargs=argparse.REMAINDER)

_args = _parser.parse_args()

assert sys.executable == '/usr/bin/python3', "ERROR: unexpected python runtime. Contact the profs (unless you're trying to be sneaky)."
assert (not _args.old_args) or _args.old_args[0] == "--", "ERROR: INVALID OLD_ARGV. Contact the profs."

print_info("WELCOME! This challenge makes the following asks of you:")
for _a,_v in vars(_args).items():
if _a == 'old_args':
continue
if _a.startswith("chio"):
continue
if _v in ( None, False ):
continue
if _a in [ "challenge_ops", "challenge_depth" ] and not _args.num_challenges:
Expand All @@ -522,9 +548,8 @@ def add_argument(parser, arg, **kwargs):
print_info("-", ARG_HELP[_a])
else:
print_info("-", ARG_HELP[_a],":",_v)
print_hype("")

print_hype("ONWARDS TO GREATNESS!")
print_hype("")

try:
setup_input(_args)
Expand Down
17 changes: 10 additions & 7 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,24 @@ def test_redirection():
# test stdout
with pwn.process(f"{CHAL} --check_stdout_path /etc/passwd", shell=True) as p:
assert b'Success!' not in p.readall()
with pwn.process(f"{CHAL} --check_stdout_path /tmp/out", stdout=open("/tmp/out", "wb"), shell=True) as p:
with pwn.process(f"{CHAL} --check_stdout_path /tmp/out --chio_pass_fd 1".split(), stdout=open("/tmp/out", "wb")) as p:
p.clean() # readall triggers bizarre pwntools behavior with a kill -9
p.wait()
with open("/tmp/out") as f:
assert 'Success!' in f.read()

# test stderr
with pwn.process(f"{CHAL} --check_stderr_path /etc/passwd", shell=True) as p:
assert b'Success!' not in p.readall()
with pwn.process(f"{CHAL} --check_stderr_path /tmp/err", stderr=open("/tmp/err", "wb"), shell=True) as p:
with pwn.process(f"{CHAL} --check_stderr_path /tmp/err --chio_pass_fd 1", stderr=open("/tmp/err", "wb"), shell=True) as p:
assert b'Success!' in p.readuntil("Success!") #readall triggers bizarre pwntools behavior

def test_networking():
with pwn.process(f"{CHAL} --listen_dup 1337 --client netcat", shell=True) as p:
with pwn.process(f"{CHAL} --listen_dup 1337 --client netcat --chio_pass_fd 1", shell=True) as p:
p.readuntil("TCP port")
assert b"Success!" in pwn.process("nc -vv -w 1 localhost 1337", shell=True).readall()
output = pwn.process("nc -vv -w 1 localhost 1337".split()).readall()
p.readall()
assert b"Success!" in output

with pwn.process(f"{CHAL} --listen_dup 1337 --client socat", shell=True) as p:
p.readuntil("TCP port")
Expand All @@ -103,15 +106,15 @@ def test_pipes():
assert b'Success!' not in pwn.process(f"{CHAL} --check_stdout_pipe cat", shell=True).readall()
assert b'Success!' in pwn.process(f"{CHAL} --check_stdout_pipe grep | grep .", shell=True).readall()
assert b'Success!' in pwn.process(f"{CHAL} --check_stdout_pipe sed | sed -e 's/X/Y/'", shell=True).readall()
assert b'Success!' in pwn.process(f"{CHAL} --check_stdout_pipe rev | rev", shell=True).readall()[::-1]
assert b'Success!' in pwn.process(f"{CHAL} --check_stdout_pipe rev --chio_pass_fd 1 | rev", shell=True).readall()[::-1]

# test shellscript
with open("/tmp/x.sh", "w") as o:
o.write("""#!/bin/bash\ncat""")
assert b'Success!' in pwn.process(f"{CHAL} --check_stdout_pipe shellscript | bash /tmp/x.sh", shell=True).readall()

# test parent communication
assert b'Success!' in pwn.process(f"{CHAL} --check_stdout_parent".split(), stdout=pwn.PIPE).readuntil("Success!")
assert b'Success!' in pwn.process(f"{CHAL} --check_stdout_parent --chio_pass_fd 1".split(), stdout=pwn.PIPE).readuntil("Success!")
pwn.process(f"{CHAL} --check_stdout_parent".split(), stdout=open("/tmp/fdsa", "w")).wait()
assert "Success!" not in open("/tmp/fdsa").read()
assert b'Success!' in pwn.process(f"{CHAL} --check_stdin_parent".split()).readall()
Expand Down Expand Up @@ -144,7 +147,7 @@ def test_fifo():
with open("/tmp/testfifo", "w") as _:
assert b'Success!' in p.readall()

with pwn.process(f"{CHAL} --check_stdout_fifo >/tmp/testfifo", shell=True) as p:
with pwn.process(f"{CHAL} --check_stdout_fifo --chio_pass_fd 1 >/tmp/testfifo", shell=True) as p:
with open("/tmp/testfifo", "r") as f:
assert 'Success!' in f.read()

Expand Down

0 comments on commit d6aa8ea

Please sign in to comment.