Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle .npz Numpy files #32

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*.pkl binary
*.pickle binary
*.bin binary
*.pt binary
*.zip binary
*.npy binary
*.npz binary
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
"python.testing.pytestEnabled": true,
"sarif-viewer.connectToGithubCodeScanning": "off"
}
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = picklescan
version = 0.0.17
version = 0.0.18
author = Matthieu Maitre
author_email = [email protected]
description = Security scanner detecting Python Pickle files performing suspicious actions
Expand Down
19 changes: 15 additions & 4 deletions src/picklescan/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ def __str__(self) -> str:
"IntStorage",
"ByteStorage",
},
"numpy": {
"dtype",
"ndarray",
},
"numpy.core.multiarray": {
"_reconstruct",
},
"torch._utils": {"_rebuild_tensor_v2"},
}

Expand Down Expand Up @@ -141,8 +148,7 @@ def __str__(self) -> str:
# https://www.tensorflow.org/api_docs/python/tf/keras/models/load_model
#

# TODO: support .npz files
_numpy_file_extensions = {".npy"}
_numpy_file_extensions = {".npy"} # Note: .npz is handled as zip files
_pytorch_file_extensions = {".bin", ".pt", ".pth", ".ckpt"}
_pickle_file_extensions = {".pkl", ".pickle", ".joblib", ".dat", ".data"}
_zip_file_extensions = {".zip", ".npz"}
Expand Down Expand Up @@ -301,10 +307,15 @@ def scan_zip_bytes(data: IO[bytes], file_id) -> ScanResult:
file_names = zip.namelist()
_log.debug("Files in archive %s: %s", file_id, file_names)
for file_name in file_names:
if os.path.splitext(file_name)[1] in _pickle_file_extensions:
file_ext = os.path.splitext(file_name)[1]
if file_ext in _pickle_file_extensions:
_log.debug("Scanning file %s in zip archive %s", file_name, file_id)
with zip.open(file_name, "r") as file:
result.merge(scan_pickle_bytes(file, f"{file_id}:{file_name}"))
elif file_ext in _numpy_file_extensions:
_log.debug("Scanning file %s in zip archive %s", file_name, file_id)
with zip.open(file_name, "r") as file:
result.merge(scan_numpy(file, f"{file_id}:{file_name}"))

return result

Expand All @@ -323,7 +334,7 @@ def scan_numpy(data: IO[bytes], file_id) -> ScanResult:
data.seek(-min(N, len(magic)), 1) # back-up
if magic.startswith(_ZIP_PREFIX) or magic.startswith(_ZIP_SUFFIX):
# .npz file
raise NotImplementedError("Scanning of .npz files is not implemented yet")
raise ValueError(f".npz file not handled as zip file: {file_id}")
elif magic == np.lib.format.MAGIC_PREFIX:
# .npy file

Expand Down
Binary file added tests/data2/int_array.npy
Binary file not shown.
Binary file added tests/data2/int_arrays.npz
Binary file not shown.
Binary file added tests/data2/int_arrays_compressed.npz
Binary file not shown.
File renamed without changes.
Binary file added tests/data2/object_arrays.npz
Binary file not shown.
Binary file added tests/data2/object_arrays_compressed.npz
Binary file not shown.
140 changes: 120 additions & 20 deletions tests/test_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,50 @@ def initialize_zip_file(path, file_name, data):
zip.writestr(file_name, data)


def initialize_numpy_file(path):
def initialize_numpy_files():
import numpy as np

# create numpy object array
with open(path, "wb") as f:
data = [(1, 2), (3, 4)]
os.makedirs(f"{_root_path}/data2", exist_ok=True)

path = f"{_root_path}/data2/object_array.npy"
if not os.path.exists(path):
x = np.empty((2, 2), dtype=object)
x[:] = data
np.save(f, x)
x[:] = [(1, 2), (3, 4)]
np.save(path, x)

path = f"{_root_path}/data2/int_array.npy"
if not os.path.exists(path):
x = np.empty((2, 2), dtype=int)
x[:] = [(1, 2), (3, 4)]
np.save(path, x)

path = f"{_root_path}/data2/object_arrays.npz"
if not os.path.exists(path):
np.savez(
path,
a=np.array([0, 1, 2], dtype=object),
b=np.array([3, 4, 5], dtype=object),
)

path = f"{_root_path}/data2/int_arrays.npz"
if not os.path.exists(path):
np.savez(
path, a=np.array([0, 1, 2], dtype=int), b=np.array([3, 4, 5], dtype=int)
)

path = f"{_root_path}/data2/object_arrays_compressed.npz"
if not os.path.exists(path):
np.savez_compressed(
path,
a=np.array([0, 1, 2], dtype=object),
b=np.array([3, 4, 5], dtype=object),
)

path = f"{_root_path}/data2/int_arrays_compressed.npz"
if not os.path.exists(path):
np.savez_compressed(
path, a=np.array([0, 1, 2], dtype=int), b=np.array([3, 4, 5], dtype=int)
)


def initialize_pickle_files():
Expand Down Expand Up @@ -364,13 +399,12 @@ def initialize_pickle_files():
pickle.dumps(Malicious1(), protocol=4),
)

initialize_numpy_file(f"{_root_path}/data/object_array.npy")

# Fake PyTorch file (PNG file format) simulating https://huggingface.co/RectalWorm/loras_new/blob/main/Owl_Mage_no_background.pt
initialize_data_file(f"{_root_path}/data/bad_pytorch.pt", b"\211PNG\r\n\032\n")


initialize_pickle_files()
initialize_numpy_files()


def compare_scan_results(sr1: ScanResult, sr2: ScanResult):
Expand Down Expand Up @@ -411,19 +445,32 @@ def test_scan_zip_bytes():


def test_scan_numpy():
scan_result = ScanResult(
[
Global("numpy.core.multiarray", "_reconstruct", SafetyLevel.Suspicious),
Global("numpy", "ndarray", SafetyLevel.Suspicious),
Global("numpy", "dtype", SafetyLevel.Suspicious),
],
1,
0,
0,
)
with open(f"{_root_path}/data/object_array.npy", "rb") as f:
with open(f"{_root_path}/data2/object_array.npy", "rb") as f:
compare_scan_results(
scan_numpy(io.BytesIO(f.read()), "object_array.npy"), scan_result
scan_numpy(io.BytesIO(f.read()), "object_array.npy"),
ScanResult(
[
Global(
"numpy.core.multiarray", "_reconstruct", SafetyLevel.Innocuous
),
Global("numpy", "ndarray", SafetyLevel.Innocuous),
Global("numpy", "dtype", SafetyLevel.Innocuous),
],
1,
0,
0,
),
)

with open(f"{_root_path}/data2/int_array.npy", "rb") as f:
compare_scan_results(
scan_numpy(io.BytesIO(f.read()), "int_array.npy"),
ScanResult(
[],
1,
0,
0,
),
)


Expand Down Expand Up @@ -581,6 +628,59 @@ def test_scan_file_path():
)


def test_scan_file_path_npz():

compare_scan_results(
scan_file_path(f"{_root_path}/data2/object_arrays.npz"),
ScanResult(
[
Global("numpy.core.multiarray", "_reconstruct", SafetyLevel.Innocuous),
Global("numpy", "ndarray", SafetyLevel.Innocuous),
Global("numpy", "dtype", SafetyLevel.Innocuous),
]
* 2,
2,
0,
0,
),
)

compare_scan_results(
scan_file_path(f"{_root_path}/data2/int_arrays.npz"),
ScanResult(
[],
2,
0,
0,
),
)

compare_scan_results(
scan_file_path(f"{_root_path}/data2/object_arrays_compressed.npz"),
ScanResult(
[
Global("numpy.core.multiarray", "_reconstruct", SafetyLevel.Innocuous),
Global("numpy", "ndarray", SafetyLevel.Innocuous),
Global("numpy", "dtype", SafetyLevel.Innocuous),
]
* 2,
2,
0,
0,
),
)

compare_scan_results(
scan_file_path(f"{_root_path}/data2/int_arrays_compressed.npz"),
ScanResult(
[],
2,
0,
0,
),
)


def test_scan_directory_path():
sr = ScanResult(
globals=[
Expand Down
Loading