-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add LEB128 variable-lengh integer support (#69)
- Loading branch information
Showing
4 changed files
with
256 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any, BinaryIO | ||
|
||
from dissect.cstruct.types.base import BaseType | ||
|
||
|
||
class LEB128(int, BaseType): | ||
"""Variable-length code compression to store an arbitrarily large integer in a small number of bytes. | ||
See https://en.wikipedia.org/wiki/LEB128 for more information and an explanation of the algorithm. | ||
""" | ||
|
||
signed: bool | ||
|
||
@classmethod | ||
def _read(cls, stream: BinaryIO, context: dict[str, Any] = None) -> LEB128: | ||
result = 0 | ||
shift = 0 | ||
while True: | ||
b = stream.read(1) | ||
if b == b"": | ||
raise EOFError("EOF reached, while final LEB128 byte was not yet read") | ||
|
||
b = ord(b) | ||
result |= (b & 0x7F) << shift | ||
shift += 7 | ||
if (b & 0x80) == 0: | ||
break | ||
|
||
if cls.signed: | ||
if b & 0x40 != 0: | ||
result |= ~0 << shift | ||
|
||
return result | ||
|
||
@classmethod | ||
def _read_0(cls, stream: BinaryIO, context: dict[str, Any] = None) -> LEB128: | ||
result = [] | ||
|
||
while True: | ||
if (value := cls._read(stream, context)) == 0: | ||
break | ||
|
||
result.append(value) | ||
|
||
return result | ||
|
||
@classmethod | ||
def _write(cls, stream: BinaryIO, data: int) -> int: | ||
# only write negative numbers when in signed mode | ||
if data < 0 and not cls.signed: | ||
raise ValueError("Attempt to encode a negative integer using unsigned LEB128 encoding") | ||
|
||
result = bytearray() | ||
while True: | ||
# low-order 7 bits of value | ||
byte = data & 0x7F | ||
data = data >> 7 | ||
|
||
# function works similar for signed- and unsigned integers, except for the check when to stop | ||
# the encoding process. | ||
if (cls.signed and (data == 0 and byte & 0x40 == 0) or (data == -1 and byte & 0x40 != 0)) or ( | ||
not cls.signed and data == 0 | ||
): | ||
result.append(byte) | ||
break | ||
|
||
# Set high-order bit of byte | ||
result.append(0x80 | byte) | ||
|
||
stream.write(result) | ||
return len(result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
import io | ||
|
||
import pytest | ||
|
||
from dissect.cstruct.cstruct import cstruct | ||
|
||
|
||
def test_leb128_unsigned_read_EOF(cs: cstruct): | ||
with pytest.raises(EOFError, match="EOF reached, while final LEB128 byte was not yet read"): | ||
cs.uleb128(b"\x8b") | ||
|
||
|
||
def test_leb128_unsigned_read(cs: cstruct): | ||
assert cs.uleb128(b"\x02") == 2 | ||
assert cs.uleb128(b"\x8b\x25") == 4747 | ||
assert cs.uleb128(b"\xc9\x8f\xb0\x06") == 13371337 | ||
assert cs.uleb128(b"\x7e") == 126 | ||
assert cs.uleb128(b"\xf5\x5a") == 11637 | ||
assert cs.uleb128(b"\xde\xd6\xcf\x7c") == 261352286 | ||
|
||
|
||
def test_leb128_signed_read(cs: cstruct): | ||
assert cs.ileb128(b"\x02") == 2 | ||
assert cs.ileb128(b"\x8b\x25") == 4747 | ||
assert cs.ileb128(b"\xc9\x8f\xb0\x06") == 13371337 | ||
assert cs.ileb128(b"\x7e") == -2 | ||
assert cs.ileb128(b"\xf5\x5a") == -4747 | ||
assert cs.ileb128(b"\xde\xd6\xcf\x7c") == -7083170 | ||
|
||
|
||
def test_leb128_struct_unsigned(cs: cstruct): | ||
cdef = """ | ||
struct test { | ||
uleb128 len; | ||
char data[len]; | ||
}; | ||
""" | ||
cs.load(cdef) | ||
|
||
buf = b"\xaf\x18" | ||
buf += b"\x41" * 3119 | ||
obj = cs.test(buf) | ||
|
||
assert obj.len == 3119 | ||
assert obj.data == (b"\x41" * 3119) | ||
assert len(obj.data) == 3119 | ||
assert len(buf) == 3119 + 2 | ||
|
||
|
||
def test_leb128_struct_unsigned_zero(cs: cstruct): | ||
cdef = """ | ||
struct test { | ||
uleb128 numbers[]; | ||
}; | ||
""" | ||
cs.load(cdef) | ||
|
||
buf = b"\xaf\x18\x8b\x25\xc9\x8f\xb0\x06\x00" | ||
obj = cs.test(buf) | ||
|
||
assert len(obj.numbers) == 3 | ||
assert obj.numbers[0] == 3119 | ||
assert obj.numbers[1] == 4747 | ||
assert obj.numbers[2] == 13371337 | ||
|
||
|
||
def test_leb128_struct_signed_zero(cs: cstruct): | ||
cdef = """ | ||
struct test { | ||
ileb128 numbers[]; | ||
}; | ||
""" | ||
cs.load(cdef) | ||
|
||
buf = b"\xaf\x18\xf5\x5a\xde\xd6\xcf\x7c\x00" | ||
obj = cs.test(buf) | ||
|
||
assert len(obj.numbers) == 3 | ||
assert obj.numbers[0] == 3119 | ||
assert obj.numbers[1] == -4747 | ||
assert obj.numbers[2] == -7083170 | ||
|
||
|
||
def test_leb128_nested_struct_unsigned(cs: cstruct): | ||
cdef = """ | ||
struct entry { | ||
uleb128 len; | ||
char data[len]; | ||
uint32 crc; | ||
}; | ||
struct nested { | ||
uleb128 name_len; | ||
char name[name_len]; | ||
uleb128 n_entries; | ||
entry entries[n_entries]; | ||
}; | ||
""" | ||
cs.load(cdef) | ||
|
||
# Dummy file format specifying 300 entries | ||
buf = b"\x08\x54\x65\x73\x74\x66\x69\x6c\x65\xac\x02" | ||
|
||
# Each entry has 4 byte data + 4 byte CRC | ||
buf += b"\x04\x41\x41\x41\x41\x42\x42\x42\x42" * 300 | ||
|
||
obj = cs.nested(buf) | ||
|
||
assert obj.name_len == 8 | ||
assert obj.name == b"\x54\x65\x73\x74\x66\x69\x6c\x65" | ||
assert obj.n_entries == 300 | ||
|
||
|
||
def test_leb128_nested_struct_signed(cs: cstruct): | ||
cdef = """ | ||
struct entry { | ||
ileb128 len; | ||
char data[len]; | ||
uint32 crc; | ||
}; | ||
struct nested { | ||
ileb128 name_len; | ||
char name[name_len]; | ||
ileb128 n_entries; | ||
entry entries[n_entries]; | ||
}; | ||
""" | ||
cs.load(cdef) | ||
|
||
# Dummy file format specifying 300 entries | ||
buf = b"\x08\x54\x65\x73\x74\x66\x69\x6c\x65\xac\x02" | ||
|
||
# Each entry has 4 byte data + 4 byte CRC | ||
buf += b"\x04\x41\x41\x41\x41\x42\x42\x42\x42" * 300 | ||
|
||
obj = cs.nested(buf) | ||
|
||
assert obj.name_len == 8 | ||
assert obj.name == b"\x54\x65\x73\x74\x66\x69\x6c\x65" | ||
assert obj.n_entries == 300 | ||
|
||
|
||
def test_leb128_unsigned_write(cs: cstruct): | ||
assert cs.uleb128(2).dumps() == b"\x02" | ||
assert cs.uleb128(4747).dumps() == b"\x8b\x25" | ||
assert cs.uleb128(13371337).dumps() == b"\xc9\x8f\xb0\x06" | ||
assert cs.uleb128(126).dumps() == b"\x7e" | ||
assert cs.uleb128(11637).dumps() == b"\xf5\x5a" | ||
assert cs.uleb128(261352286).dumps() == b"\xde\xd6\xcf\x7c" | ||
|
||
|
||
def test_leb128_signed_write(cs: cstruct): | ||
assert cs.ileb128(2).dumps() == b"\x02" | ||
assert cs.ileb128(4747).dumps() == b"\x8b\x25" | ||
assert cs.ileb128(13371337).dumps() == b"\xc9\x8f\xb0\x06" | ||
assert cs.ileb128(-2).dumps() == b"\x7e" | ||
assert cs.ileb128(-4747).dumps() == b"\xf5\x5a" | ||
assert cs.ileb128(-7083170).dumps() == b"\xde\xd6\xcf\x7c" | ||
|
||
|
||
def test_leb128_write_negatives(cs: cstruct): | ||
with pytest.raises(ValueError, match="Attempt to encode a negative integer using unsigned LEB128 encoding"): | ||
cs.uleb128(-2).dumps() | ||
assert cs.ileb128(-2).dumps() == b"\x7e" | ||
|
||
|
||
def test_leb128_unsigned_write_amount_written(cs: cstruct): | ||
out1 = io.BytesIO() | ||
bytes_written1 = cs.uleb128(2).write(out1) | ||
assert bytes_written1 == out1.tell() | ||
|
||
out2 = io.BytesIO() | ||
bytes_written2 = cs.uleb128(4747).write(out2) | ||
assert bytes_written2 == out2.tell() | ||
|
||
out3 = io.BytesIO() | ||
bytes_written3 = cs.uleb128(13371337).write(out3) | ||
assert bytes_written3 == out3.tell() |