Skip to content

Commit

Permalink
merged with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
cnvogelg committed Feb 1, 2025
2 parents 107a9de + f906eee commit 108e486
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 37 deletions.
12 changes: 6 additions & 6 deletions amitools/vamos/lib/DosLibrary.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,12 +766,12 @@ def Seek(self, ctx, fh_b_addr, pos, mode):
def FGetC(self, ctx, fh_b_addr):
fh = self.file_mgr.get_by_b_addr(fh_b_addr, False)
ch = fh.getc()
if ch == -1:
self.setioerr(ctx, 0)
log_dos.info("FGetC(%s) -> EOF", fh, ch)
elif ch == -2:
self.setioerr(ctx, ERROR_NO_FREE_STORE)
log_dos.info("FGetC(%s) -> Error", fh, ch)
if ch == -2:
log_dos.info("FGetC(%s) -> EOF (%d)", fh, ch)
# EOF is also -1
ch = -1
elif ch == -1:
log_dos.info("FGetC(%s) -> Error (%d)", fh, ch)
else:
self.setioerr(ctx, 0)
log_dos.info("FGetC(%s) -> '%c' (%d)", fh, ch, ch)
Expand Down
66 changes: 38 additions & 28 deletions amitools/vamos/lib/dos/FileHandle.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def __init__(
self.interactive = self.obj.isatty()
# tty stuff
if self.interactive:
fd = self.obj.fileno()
self.terminal = Terminal(fd)
self.terminal = Terminal(obj)
else:
self.terminal = None

Expand Down Expand Up @@ -83,52 +82,63 @@ def wait_for_char(self, timeout):
return self.terminal.wait_for_char(timeout)

def write(self, data):
"""write a byte array of data
"""write data
return -1 on error, 0 on EOF or >0 for written data
"""
return -1 on error, 0=EOF, >0 written bytes"""
assert isinstance(data, (bytes, bytearray))
try:
num = self.obj.write(data)
if num > 0 and self.auto_flush:
self.obj.flush()
return num
except IOError:
return -1

def read(self, size):
"""read up to len bytes of data
# read from terminal or direct
if self.terminal:
got = self.terminal.write(data)
else:
try:
got = self.obj.write(data)
except IOError:
got = -1

return -1 on error, 0 on EOF, or data bytes on success
"""
# do auto flush?
if got > 0 and self.auto_flush:
self.obj.flush()

# return got bytes
return got

def read(self, len):
"""read data
return -1 on error, 0=EOF, >0 written bytes"""
if self.terminal:
return self.terminal.read(len)
try:
if self.interactive:
d = self.obj.read1(size)
else:
d = self.obj.read(size)
return d
return self.obj.read(len)
except IOError:
return -1

def getc(self):
"""read one char
"""read character
return -1 on error, -2 on EOF or char code 0-255 on success
return char 0-255 or -1 on Error and -2 on EOF
"""
if len(self.unch) > 0:
# first unget char
self.ch = self.unch.pop(0)
else:
# handle NIL:
if self.is_nil:
return -1
d = self.read(1)
# error

# read from terminal or direct
if self.terminal:
d = self.terminal.read(1)
else:
d = self.obj.read(1)

# -1 on Error
if d == -1:
return -1
# EOF
elif len(d) == 0:
# -2 on EOF
elif d == b"":
return -2
# valid byte
self.ch = d[0]
return self.ch

Expand Down
145 changes: 142 additions & 3 deletions amitools/vamos/lib/dos/terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,121 @@
except ImportError:
select = None

import io

ESC = b"\x1b"
ESC_CODE = 0x1B
CSI = b"\x9b"
BRACKET = b"\x5b"
CSI_ESC_SEQ = b"\x1b\x5b"


class CsiEscConverter:
def convert(self, buffer):
return buffer.replace(CSI, CSI_ESC_SEQ)


class EscCsiConverter:
def __init__(self, obj, esc_next_timeout=0.2):
self.obj = obj
self.esc_next_timeout = esc_next_timeout
try:
self.fd = obj.fileno()
except io.UnsupportedOperation:
self.fd = None
self.next_ch = None

def read(self, size):
assert size > 0

# if a character from last read is available
if self.next_ch is not None:
next_ch = self.next_ch
self.next_ch = None

# only a single char read? return it
if size == 1:
return next_ch

left = size - 1
else:
next_ch = b""
left = size

# now read in 'left' bytes
try:
read_data = self.obj.read1(left)
except IOError:
# Error
return -1
if len(read_data) == 0:
# EOF
return next_ch

# add extra char to include in conversion
if next_ch:
read_data = next_ch + read_data

# convert ESC -> CSI
data = self._convert(read_data)

# check if last char is ESC
last_ch = data[-1]
if last_ch != ESC_CODE:
# No ESC -> we are done
return data

# we need another char to decide if '[' follows ESC
# if we have select we wait for it a bit
# otherwise we simply read with possibly blocking
if select and self.fd is not None:
rx, _, _ = select.select([self.fd], [], [], self.esc_next_timeout)
if self.fd not in rx:
# no char arrived. assume isolated ESC in data and return
return data

# read next char
try:
next_ch = self.obj.read1(1)
except IOError:
# Error
return -1
if len(read_data) == 0:
# EOF
return data

# if we got the '[' then last data byte is also a CSI
if next_ch == BRACKET:
return data[:-1] + CSI

# keep char for next read
self.next_ch = next_ch
return data

def _convert(self, buffer):
return buffer.replace(CSI_ESC_SEQ, CSI)


class Terminal:
def __init__(self, fd):
self.fd = fd
def __init__(self, obj, out_csi_conv=True, in_esc_conv=True):
self.fd = obj.fileno()
self.obj = obj

# have termios on this platform
if termios:
self.tty_state = termios.tcgetattr(fd)
self.tty_state = termios.tcgetattr(self.fd)

# do CSI -> ESC '[' conversion on output
if out_csi_conv:
self.out_conv = CsiEscConverter()
else:
self.out_conv = None

# do ESC '[' to CSI conversion
if in_esc_conv:
self.in_conv = EscCsiConverter(self.obj)
else:
self.in_conv = None

def close(self):
if termios:
Expand Down Expand Up @@ -54,3 +163,33 @@ def wait_for_char(self, timeout):
return self.fd in rx
else:
return None

def read(self, size):
"""read from terminal and do some covnersions.
return -1 on Error, or data bytes with len = 0: EOF
"""

# do ESC to CSI conversion
if self.in_conv:
return self.in_conv.read(size)
else:
try:
data = self.obj.read1(size)
except IOError:
return -1

def write(self, data):
"""write to terminal and do some conversions.
return -1 on Error, 0 on EOF, and >0 written bytes
"""

# do CSI to ESC conversion
if self.out_conv:
data = self.out_conv.convert(data)

try:
return self.obj.write(data)
except IOError:
return -1
68 changes: 68 additions & 0 deletions test/unit/term_csi_conv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from amitools.vamos.lib.dos.terminal import (
CsiEscConverter,
EscCsiConverter,
ESC,
CSI,
CSI_ESC_SEQ,
)
from io import BytesIO


def term_csi_conv_csi_to_esc_test():
conv = CsiEscConverter()
# no conversion
txt = b"hello, world!"
assert conv.convert(txt) == txt
# only esc
txt = ESC
assert conv.convert(txt) == txt
# CSI to ESC
txt = CSI
assert conv.convert(txt) == CSI_ESC_SEQ
# misc text
txt = b"hello" + CSI + b"world!"
out = b"hello" + CSI_ESC_SEQ + b"world!"
assert conv.convert(txt) == out


def setup(data):
reader = BytesIO(data)
conv = EscCsiConverter(reader)
return conv


def term_csi_conv_esc_to_csi_test():
# no conversion
txt = b"hello, world!"
conv = setup(txt)
assert conv.read(100) == txt
# esc no conversion
txt = b"hello" + ESC + b"world!"
conv = setup(txt)
assert conv.read(100) == txt
# esc -> CSI
txt = CSI_ESC_SEQ
conv = setup(txt)
assert conv.read(100) == CSI
# esc inside
txt = b"hello" + CSI_ESC_SEQ + b"world!"
out = b"hello" + CSI + b"world!"
conv = setup(txt)
assert conv.read(100) == out
# esc -> CSI at the border
txt = b"hello" + CSI_ESC_SEQ + b"world!"
out = b"hello" + CSI
conv = setup(txt)
assert conv.read(6) == out
assert conv.read(100) == b"world!"
# esc no CSI at the border
txt = b"hello" + ESC + b"world!"
out = b"hello" + ESC
conv = setup(txt)
assert conv.read(6) == out
assert conv.read(100) == b"world!"
# esc at end
txt = b"hello" + ESC
conv = setup(txt)
assert conv.read(6) == txt
assert conv.read(100) == b""

0 comments on commit 108e486

Please sign in to comment.