Skip to content

Commit

Permalink
Merge branch 'main' into improvement/elastic-adapter-metadata-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
JSCU-CNI authored May 14, 2024
2 parents 6fd1d19 + e0586ef commit 0ffc805
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 14 deletions.
127 changes: 120 additions & 7 deletions flow/record/fieldtypes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import os
import pathlib
import re
import shlex
import sys
import warnings
from binascii import a2b_hex, b2a_hex
from datetime import datetime as _dt
from datetime import timezone
from posixpath import basename, dirname
from typing import Any, Optional, Tuple
from typing import Any, Optional
from urllib.parse import urlparse

try:
Expand All @@ -34,8 +35,8 @@
PY_311 = sys.version_info >= (3, 11, 0)
PY_312 = sys.version_info >= (3, 12, 0)

PATH_POSIX = 0
PATH_WINDOWS = 1
TYPE_POSIX = 0
TYPE_WINDOWS = 1

string_type = str
varint_type = int
Expand Down Expand Up @@ -694,15 +695,15 @@ def __repr__(self) -> str:
return repr(str(self))

def _pack(self):
path_type = PATH_WINDOWS if isinstance(self, windows_path) else PATH_POSIX
path_type = TYPE_WINDOWS if isinstance(self, windows_path) else TYPE_POSIX
return (str(self), path_type)

@classmethod
def _unpack(cls, data: Tuple[str, str]):
def _unpack(cls, data: tuple[str, str]):
path_, path_type = data
if path_type == PATH_POSIX:
if path_type == TYPE_POSIX:
return posix_path(path_)
elif path_type == PATH_WINDOWS:
elif path_type == TYPE_WINDOWS:
return windows_path(path_)
else:
# Catch all: default to posix_path
Expand Down Expand Up @@ -734,3 +735,115 @@ def __repr__(self) -> str:
quote = '"'

return f"{quote}{s}{quote}"


class command(FieldType):
executable: Optional[path] = None
args: Optional[list[str]] = None

_path_type: type[path] = None
_posix: bool

def __new__(cls, value: str) -> command:
if cls is not command:
return super().__new__(cls)

if not isinstance(value, str):
raise ValueError(f"Expected a value of type 'str' not {type(value)}")

# pre checking for windows like paths
# This checks for windows like starts of a path:
# an '%' for an environment variable
# r'\\' for a UNC path
# the strip and check for ":" on the second line is for `<drive_letter>:`
windows = value.startswith((r"\\", "%")) or value.lstrip("\"'")[1] == ":"

if windows:
cls = windows_command
else:
cls = posix_command
return super().__new__(cls)

def __init__(self, value: str | tuple[str, tuple[str]] | None):
if value is None:
return

if isinstance(value, str):
self.executable, self.args = self._split(value)
return

executable, self.args = value
self.executable = self._path_type(executable)
self.args = list(self.args)

def __repr__(self) -> str:
return f"(executable={self.executable!r}, args={self.args})"

def __eq__(self, other: Any) -> bool:
if isinstance(other, command):
return self.executable == other.executable and self.args == other.args
elif isinstance(other, str):
return self._join() == other
elif isinstance(other, (tuple, list)):
return self.executable == other[0] and self.args == list(other[1:])

return False

def _split(self, value: str) -> tuple[str, list[str]]:
executable, *args = shlex.split(value, posix=self._posix)
executable = executable.strip("'\" ")

return self._path_type(executable), args

def _join(self) -> str:
return shlex.join([str(self.executable)] + self.args)

def _pack(self) -> tuple[tuple[str, list], str]:
command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX
if self.executable:
_exec, _ = self.executable._pack()
return ((_exec, self.args), command_type)
else:
return (None, command_type)

@classmethod
def _unpack(cls, data: tuple[tuple[str, tuple] | None, int]) -> command:
_value, _type = data
if _type == TYPE_WINDOWS:
return windows_command(_value)

return posix_command(_value)

@classmethod
def from_posix(cls, value: str) -> command:
return posix_command(value)

@classmethod
def from_windows(cls, value: str) -> command:
return windows_command(value)


class posix_command(command):
_posix = True
_path_type = posix_path


class windows_command(command):
_posix = False
_path_type = windows_path

def _split(self, value: str) -> tuple[str, list[str]]:
executable, args = super()._split(value)
if args:
args = [" ".join(args)]

return executable, args

def _join(self) -> str:
arg = f" {self.args[0]}" if self.args else ""
executable_str = str(self.executable)

if " " in executable_str:
return f"'{executable_str}'{arg}"

return f"{executable_str}{arg}"
5 changes: 5 additions & 0 deletions flow/record/jsonpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ def pack_obj(self, obj):
return base64.b64encode(obj).decode()
if isinstance(obj, fieldtypes.path):
return str(obj)
if isinstance(obj, fieldtypes.command):
return {
"executable": obj.executable,
"args": obj.args,
}

raise Exception("Unpackable type " + str(type(obj)))

Expand Down
1 change: 1 addition & 0 deletions flow/record/whitelist.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
WHITELIST = [
"boolean",
"command",
"dynamic",
"datetime",
"filesize",
Expand Down
148 changes: 141 additions & 7 deletions tests/test_fieldtypes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding: utf-8
from __future__ import annotations

import hashlib
import os
Expand All @@ -12,14 +13,22 @@
import flow.record.fieldtypes
from flow.record import RecordDescriptor, RecordReader, RecordWriter
from flow.record.fieldtypes import (
PATH_POSIX,
PATH_WINDOWS,
PY_312,
TYPE_POSIX,
TYPE_WINDOWS,
_is_posixlike_path,
_is_windowslike_path,
command,
)
from flow.record.fieldtypes import datetime as dt
from flow.record.fieldtypes import fieldtype_for_value, net, uri, windows_path
from flow.record.fieldtypes import (
fieldtype_for_value,
net,
posix_command,
uri,
windows_command,
windows_path,
)

UTC = timezone.utc

Expand Down Expand Up @@ -639,16 +648,16 @@ def test_path():
assert isinstance(test_path, flow.record.fieldtypes.windows_path)

test_path = flow.record.fieldtypes.path.from_posix(posix_path_str)
assert test_path._pack() == (posix_path_str, PATH_POSIX)
assert test_path._pack() == (posix_path_str, TYPE_POSIX)

test_path = flow.record.fieldtypes.path._unpack((posix_path_str, PATH_POSIX))
test_path = flow.record.fieldtypes.path._unpack((posix_path_str, TYPE_POSIX))
assert str(test_path) == posix_path_str
assert isinstance(test_path, flow.record.fieldtypes.posix_path)

test_path = flow.record.fieldtypes.path.from_windows(windows_path_str)
assert test_path._pack() == (windows_path_str, PATH_WINDOWS)
assert test_path._pack() == (windows_path_str, TYPE_WINDOWS)

test_path = flow.record.fieldtypes.path._unpack((windows_path_str, PATH_WINDOWS))
test_path = flow.record.fieldtypes.path._unpack((windows_path_str, TYPE_WINDOWS))
assert str(test_path) == windows_path_str
assert isinstance(test_path, flow.record.fieldtypes.windows_path)

Expand Down Expand Up @@ -998,5 +1007,130 @@ def test_datetime_comparisions():
assert dt("2023-01-02") != datetime(2023, 3, 4, tzinfo=UTC)


def test_command_record() -> None:
TestRecord = RecordDescriptor(
"test/command",
[
("command", "commando"),
],
)

record = TestRecord(commando="help.exe -h")
assert isinstance(record.commando, posix_command)
assert record.commando.executable == "help.exe"
assert record.commando.args == ["-h"]

record = TestRecord(commando="something.so -h -q -something")
assert isinstance(record.commando, posix_command)
assert record.commando.executable == "something.so"
assert record.commando.args == ["-h", "-q", "-something"]


def test_command_integration(tmp_path: pathlib.Path) -> None:
TestRecord = RecordDescriptor(
"test/command",
[
("command", "commando"),
],
)

with RecordWriter(tmp_path / "command_record") as writer:
record = TestRecord(commando=r"\\.\\?\some_command.exe -h,help /d quiet")
writer.write(record)
assert record.commando.executable == r"\\.\\?\some_command.exe"
assert record.commando.args == [r"-h,help /d quiet"]

with RecordReader(tmp_path / "command_record") as reader:
for record in reader:
assert record.commando.executable == r"\\.\\?\some_command.exe"
assert record.commando.args == [r"-h,help /d quiet"]


def test_command_integration_none(tmp_path: pathlib.Path) -> None:
TestRecord = RecordDescriptor(
"test/command",
[
("command", "commando"),
],
)

with RecordWriter(tmp_path / "command_record") as writer:
record = TestRecord(commando=command.from_posix(None))
writer.write(record)
with RecordReader(tmp_path / "command_record") as reader:
for record in reader:
assert record.commando.executable is None
assert record.commando.args is None


@pytest.mark.parametrize(
"command_string, expected_executable, expected_argument",
[
# Test relative windows paths
("windows.exe something,or,somethingelse", "windows.exe", ["something,or,somethingelse"]),
# Test weird command strings for windows
("windows.dll something,or,somethingelse", "windows.dll", ["something,or,somethingelse"]),
# Test environment variables
(r"%WINDIR%\\windows.dll something,or,somethingelse", r"%WINDIR%\\windows.dll", ["something,or,somethingelse"]),
# Test a quoted path
(r"'c:\path to some exe' /d /a", r"c:\path to some exe", [r"/d /a"]),
# Test a unquoted path
(r"'c:\Program Files\hello.exe'", r"c:\Program Files\hello.exe", []),
# Test an unquoted path with a path as argument
(r"'c:\Program Files\hello.exe' c:\startmepls.exe", r"c:\Program Files\hello.exe", [r"c:\startmepls.exe"]),
(None, None, None),
],
)
def test_command_windows(command_string: str, expected_executable: str, expected_argument: list[str]) -> None:
cmd = windows_command(command_string)

assert cmd.executable == expected_executable
assert cmd.args == expected_argument


@pytest.mark.parametrize(
"command_string, expected_executable, expected_argument",
[
# Test relative posix command
("some_file.so -h asdsad -f asdsadas", "some_file.so", ["-h", "asdsad", "-f", "asdsadas"]),
# Test command with spaces
(r"/bin/hello\ world -h -word", r"/bin/hello world", ["-h", "-word"]),
],
)
def test_command_posix(command_string: str, expected_executable: str, expected_argument: list[str]) -> None:
cmd = posix_command(command_string)

assert cmd.executable == expected_executable
assert cmd.args == expected_argument


def test_command_equal() -> None:
assert command("hello.so -h") == command("hello.so -h")
assert command("hello.so -h") != command("hello.so")

# Test different types with the comparitor
assert command("hello.so -h") == ["hello.so", "-h"]
assert command("hello.so -h") == ("hello.so", "-h")
assert command("hello.so -h") == "hello.so -h"
assert command("c:\\hello.dll -h -b") == "c:\\hello.dll -h -b"

# Compare paths that contain spaces
assert command("'/home/some folder/file' -h") == "'/home/some folder/file' -h"
assert command("'c:\\Program files\\some.dll' -h -q") == "'c:\\Program files\\some.dll' -h -q"
assert command("'c:\\program files\\some.dll' -h -q") == ["c:\\program files\\some.dll", "-h -q"]
assert command("'c:\\Program files\\some.dll' -h -q") == ("c:\\Program files\\some.dll", "-h -q")

# Test failure conditions
assert command("hello.so -h") != 1
assert command("hello.so") != "hello.so -h"
assert command("hello.so") != ["hello.so", ""]
assert command("hello.so") != ("hello.so", "")


def test_command_failed() -> None:
with pytest.raises(ValueError):
command(b"failed")


if __name__ == "__main__":
__import__("standalone_test").main(globals())

0 comments on commit 0ffc805

Please sign in to comment.