From 2d852d24b42251e0210dee107f437127a8cb8cf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Mon, 24 Oct 2022 16:17:24 +0200 Subject: [PATCH 1/9] added android sparse image unpacker --- fact_extractor/install/unpacker.py | 4 ++- .../unpacking/android_simg/__init__.py | 0 .../unpacking/android_simg/code/__init__.py | 0 .../android_simg/code/android_simg.py | 26 ++++++++++++++++++ .../unpacking/android_simg/test/__init__.py | 0 .../unpacking/android_simg/test/data/simg.img | Bin 0 -> 127284 bytes .../test/test_plugin_android_simg.py | 23 ++++++++++++++++ fact_extractor/version.py | 2 +- 8 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 fact_extractor/plugins/unpacking/android_simg/__init__.py create mode 100644 fact_extractor/plugins/unpacking/android_simg/code/__init__.py create mode 100644 fact_extractor/plugins/unpacking/android_simg/code/android_simg.py create mode 100644 fact_extractor/plugins/unpacking/android_simg/test/__init__.py create mode 100644 fact_extractor/plugins/unpacking/android_simg/test/data/simg.img create mode 100644 fact_extractor/plugins/unpacking/android_simg/test/test_plugin_android_simg.py diff --git a/fact_extractor/install/unpacker.py b/fact_extractor/install/unpacker.py index 1495a59b..4720411f 100644 --- a/fact_extractor/install/unpacker.py +++ b/fact_extractor/install/unpacker.py @@ -134,7 +134,9 @@ 'gcc-multilib', 'lib32stdc++6', 'gawk', - 'pkg-config' + 'pkg-config', + # android sparse image + 'simg2img', ], 'pip3': [ 'pluginbase', diff --git a/fact_extractor/plugins/unpacking/android_simg/__init__.py b/fact_extractor/plugins/unpacking/android_simg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/android_simg/code/__init__.py b/fact_extractor/plugins/unpacking/android_simg/code/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/android_simg/code/android_simg.py b/fact_extractor/plugins/unpacking/android_simg/code/android_simg.py new file mode 100644 index 00000000..19b76509 --- /dev/null +++ b/fact_extractor/plugins/unpacking/android_simg/code/android_simg.py @@ -0,0 +1,26 @@ +''' +This plugin "unpacks" Android sparse image by converting them to regular filesystem images using the simg2img tool +''' +import logging +from pathlib import Path + +from common_helper_process import execute_shell_command + +NAME = 'Android-sparse-image' +MIME_PATTERNS = ['filesystem/android-simg'] +VERSION = '0.1' + + +def unpack_function(file_path, tmp_dir): + extract_dir = Path(tmp_dir) + file_to_unpack = Path(file_path) + output = execute_shell_command(f'simg2img {file_path} {extract_dir / file_to_unpack.name}.raw') + '\n' + meta_data = {'output': output} + logging.debug(output) + return meta_data + + +# ----> Do not edit below this line <---- +def setup(unpack_tool): + for item in MIME_PATTERNS: + unpack_tool.register_plugin(item, (unpack_function, NAME, VERSION)) diff --git a/fact_extractor/plugins/unpacking/android_simg/test/__init__.py b/fact_extractor/plugins/unpacking/android_simg/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/android_simg/test/data/simg.img b/fact_extractor/plugins/unpacking/android_simg/test/data/simg.img new file mode 100644 index 0000000000000000000000000000000000000000..ac0ee5ce9f9205657d56009c7e242fff972e2442 GIT binary patch literal 127284 zcmeI*2VfM{!iM45Y#<~A2))D5dqT5N1Olii(z_^#us|?Hlb~P&Y*=L%jHchI&-(y);IV;q%!Z;FQ zS;aUo%e)1aIoArEYffoV0Tsx#1$bPrY9he0x>UBTgTa<%9yd-bHph%SmNm}Xsn;fW ztfZuF2RzBwYszUZOMy| zh9yKq#S9qYi%9e(B=(Pq8y=gO@oHI|s_~!8^QAi{w9KM9``Sl_&8)tlXwc!9NggI_ z9tX{0wqreq8?X_Zuo+vh72B{KJFpYGup4{QHk#up546cgPW&gs#V5|y5f$){0=1b& z9n?jIvWEh_R$zvS)wa1DbO;?ju0TpntKt9{S!8DW#sDP^q z1lz8ft|qGjD&Sy&5Zl4QG?WUcfU64Rw_P<|O;!a|z`+8cwu6IdC>2lvR~0B=yK1_c ztO}@rg9QrO4i2WFR6qq>RiKdVs_AO7Dxd-m76`K)985#0fC{*(Kw;Ze)74~EKm{Bu zP{ej{Fb$;wD&VRDMQvA2SCdr%6>zXXG26kxG?WUcfU61=w_P<|O;!a|z`+6~YzGI^ zP%5ATt}0N{cGYw>Srt$L2Md(49UM$UselT&sz7PmRnygERX_zCEKtUFa4-#}0xICD z0%dJiO;?ju0TpntKsnpN!8DW#sDP^ql($_qT}@U6RKURk6>J9w(@-j)02lvR~4vayK1_ctO}@rg9R$v4i2WFR6qq>RiKLPs_AO7Dxd-m z7N}}FIGBb~0Tpmnfois^rmM-SfC@NRpt|kgU>ZsVRKQgQYS^xtt|qGjD&Sy&nzn<3 zX($y?0aq2MWxHy+nyd<_fP)2+{C63Zoy`o&&Xsf>6;Odw7D%di%Cr6EO#gj&(#%xA z{RPtQ=uN$ydZkG_FDDN7N3xbc%Nob}r=?0+Z=e;BIsp%d3saBlKn0vEFnSE5)ej6z z`?GHjQ^xmLQ_gWk>#}kC!&)~VJXlJDdI>p8js8?gzSu?1VP4coB;JFyG9vB%tT zw4AoU+(<62Z=IrlfK@8pEf;ZoF_vH{mSH(oU?o;zHP&D)vb*e$-0L-uf5#3U?O`7O z%%QpMUT?sO$3OEpxP|MS?b5S#n^u|KcIueD-eO$%-}QS=tl#`G^5o5Igsji+b(-~O zeQRAq1yn!-5$^zzo zp8o!S*qd_ycjd-t(kh?=XI|hOo_9L)mFX5LpaOnXK=*&Y`f+G7Dv+ZEbpOxM+*+9m z_*DVj|NZL6p~$^e7?LmN}wdn=juzN49cP$%A*3z?ucwu zLS-5!UC!He{&x;AOEe3bbBOn;Rb2SDxd;qT;RX^COhLH&gur~ z_YIgfhm?W5C-zNcPHW$QXO&ks%A!DS-#1|D-~Z{sK9wFV4myu=f(YdU)L6>fd3XSe=(cCtj%BG70Mn8Ox{s-x(9^XHkYF= z8U-B>^ql%-fpoXfQ58@Dw-iX*H~r+)@8vkTZ~BwZznK>T7usjPMLk-#X_eWn&A#bd zaA8*arVluWebe{OV*Td530bY*dk*XGbDH&M)jD+X z1qIALz0>diH*Nplq1^XNS!Q3~vA+1If%f3I=s~`Cp1kAw#jIeD+4HyA2Y3Un=jFUP z-jnAwyzn14w1p~gK@~`Q?cE&aj@4zy5Dtj62oe|M@vz-~VC$ zglJKL3%9@?Hf=A=n|b!(XZ(Ww_!S3m5WnGf9KvB7!5=t^WB3z);cxtd8KuMHBX_P@(ltXz`Kt^h1B#fC0D>12G6b48{-) zMHGhNCPX6!v53RXxCQY@Kq7|YR@{aW7>V0)2kyjOxErG|8e?z|?!|q$9}i$G9>ha< z7>{5a9>rsL9OLl>Cg4dtg{LtQ&tMXs#dCNbFJLmJ;6=QIm+=Z-#Z6n4H@eba_dw3ro;6u#BNB9_@U=}{bXPAvS_#9v0OU%VQ%*R*w8sFese24Gx0~X*% zEW{!##u6;WGAzdmti&p;#u}`}I;_VAY{VvP#ujYFHf+ZZ?8GkYMw0n3o*yr{;3*hw zq6(b10)Bkg&ylIkjbt_T_1)lEz5c212G5Z{jyb-P^Y*}#^@>MjCx*Yje=$4TpzEl> z`6^&u=SzG3FXgqsl<)s6!1;TkK?sL!($~SbvximzFzv*3)60AUOO0#<}h!YGv}XT zzagA^a)}rk7vqZ!o0npe+*wcer69iX3s$xm-IU`2raXy&c61MVKb{Q zC>nG)W|D`Ana4k~nC)24;RbBPCTxcJz|vN1!*=YzPVB;N>`B{bj-UL7_2d)D|H*Li z$+LA#1^l8wy3hZb-GF6l;q$*UJR{n+%GQ;2B^6Ks6;J^cPyrQC0ToaI6;J^cP=WJa zVEzK$tHvzM{=CgHkJHKT3AU3@>OU%=0xF;aDxd-?paLqO0=cBXUS7QZiGK|2%l0dS zxg?A>Qw4HCfuFel@59gd1^e+UcQ3X{| z4b@QtHBk$-Q3rKV58;SFeKbHrG(uxsgo|+rE=3bGMKd(VWoUtxXoX0$MjN!n<+uV@ zq8+Zn)o71v&;cFM37ydeUC|BQ(E~lv3%$_?*P<`3!}aKg{knK0d&Qn2C?@F+Ra8e2UL78*}hEzQC85i+Pxjukba#!MFGh-{S`?z>ipn zMOcg_Sc+v>julvmRalKRSc`R7j}6#}P1uYr*otk~jvd&EUD%C1*b6NxpaLqO0xF;a zDxd-?paQw4fO&5~OEx$Cdjrfr-9yvP$UP~vr7EBT=?d)Q`GKGD3-;qz9Kb>RhTm}r zhj9dd;3$saPyB_y@ehvUUnIfewFVCY;00euV&y?z1R)=S5rX^(MFA8u^2#p+9cG0NjXy7z7^%V+e*K3d3*{q7j2w#NlS#f_Nk#5yNpSZo>$S#O=5P zcj7MGjZqklF}Mf!;y&Du2QU^7;vqbYM=%bL;xRmq@pu9g@FbqX)0l{7FbU7%IXsUS zFd0+uB3{DFcm=OwDqh1hypA{UCf>qy%)r}t2k+uNypIp?A!gzue2h;p3!maM%*Gsi zjxX>f=3*Y^<12iPZ}2U?!}s_B3-BWrVi6W&36^3RmSY80Vii_n4c1~E)?))UViPuF z3$|h#wqpl&Vi$H}5BB0GXi)(bPyrQC0Tsx71@zoN?)#Z*i&a1cR6qq(;B*Ck=J|nN zuphtT01o0e{EkC7j3f91M{x{);xGJ-e{dZCA_*2>Lh3;Pya+@dt53Zn>$q8N&!1WKY5N}~+Qq8!Sj0xH5rB~(TgR7EvZM-9|OE!0LG)I~jnBLel& z01eRyjd2k!#wEBEP0$q0&>WYc1zMsNBGDRc&=!~D3S5bHxC&RJJ+46qbVMg~Mi+EN zH*`l2^h7W8Mju>@zPJw8qaXU?1`NQB7>GgeVK9bZD55Y7Hz67^h(#Q3#x00P0unJC zx8gR8z)0MVJ8&oN!rd5!(HMh!a4+t|{dfRl@gN?;!*~Sa@F*U`;~0-8Fab~EDLjpd zcm|X3ES|&jcmb0!1uxu7DWLZaWVfEKrvfUV0xF;aDxd-?paK_8 z0e#=dh4Zu2mZ-p)7BJ5ZwBts4Zs1JI>G~>gVHfB=)MqCSjfx*+o3H;#{?@Td7xorw z>r_Am(h8io*EbCr7#Ex9i%o25cNp27$AYfIqkVRR@P^@yLVEh*6Qbf`o7$b@w#zm|(^UZ#PyrQC0TpnhKvHH)AxDPy_d!Nyw)r|=1#&}yAV0r{ z=7yhvwowJnSAqSJ?3I|O`zL!l^_qrb2aonJ4+(p{Udud3U>5Ugz(U&`JG-Sv>o%=2 zyXDj|d%Z2V(5yG(`aLJsZ(dsr;JpoJyY$Xt{pA=qtMz-&Vf}qhv;MQ)Cf!a2R6qq( zKm}Al1yn!e~z^~FaGv Date: Fri, 11 Nov 2022 15:53:21 +0100 Subject: [PATCH 2/9] fixed srec parsing issues for files with additional data at the end --- .../plugins/unpacking/srec/code/srec.py | 43 +++++++++++-------- .../srec/test/data/bad_testfile.srec | 2 +- .../srec/test/data/bad_testfile2.srec | 6 +-- .../plugins/unpacking/srec/test/test_srec.py | 3 +- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/fact_extractor/plugins/unpacking/srec/code/srec.py b/fact_extractor/plugins/unpacking/srec/code/srec.py index 2bdd62b2..e6244514 100755 --- a/fact_extractor/plugins/unpacking/srec/code/srec.py +++ b/fact_extractor/plugins/unpacking/srec/code/srec.py @@ -1,41 +1,50 @@ ''' This plugin decodes / unpacks Motorola SRecord files (.srec) ''' +from __future__ import annotations +import re from pathlib import Path -from typing import Dict import bincopy NAME = 'Motorola S-Record' MIME_PATTERNS = ['firmware/srecord'] -VERSION = '0.1' +VERSION = '0.2' +SREC_REGEX = b'(S[0-6][0-9A-F]+[\n\r]{1,2})+(S[7-9][0-9A-F]+)?' -def unpack_function(file_path: str, tmp_dir: str) -> Dict[str, str]: +def unpack_function(file_path: str | Path, tmp_dir: str | Path) -> dict[str, str]: ''' file_path specifies the input file. tmp_dir should be used to store the extracted files. ''' - target_file = Path(tmp_dir) / _get_unpacked_filename(file_path) - decoded = b'' try: - srec = Path(file_path).read_bytes().splitlines() - for line in srec: - try: - _, _, _, data = bincopy.unpack_srec(line.decode()) - decoded += data - except UnicodeDecodeError: - break - target_file.write_bytes(decoded) - - except bincopy.Error as srec_error: - return {'output': 'Unknown error in srec decoding: {}'.format(str(srec_error))} + file_data = Path(file_path).read_bytes() except FileNotFoundError as fnf_error: - return {'output': 'Failed to open file: {}'.format(str(fnf_error))} + return {'output': f'Failed to open file: {str(fnf_error)}'} + + match = re.match(SREC_REGEX, file_data) + if match is None: + return {'output': 'Error: no valid srec data found'} + + try: + decoded = _decode_srec(file_data[0:match.end()]) + target_file = Path(tmp_dir) / _get_unpacked_filename(file_path) + target_file.write_bytes(decoded) + except (bincopy.Error, ValueError) as srec_error: + return {'output': f'Unknown error in srec decoding: {str(srec_error)}'} return {'output': 'Successfully decoded srec file'} +def _decode_srec(srec_data: bytes) -> bytes: + decoded = b'' + for line in srec_data.splitlines(): + _, _, _, data = bincopy.unpack_srec(line.decode()) + decoded += data + return decoded + + def _get_unpacked_filename(file_path: str) -> str: return Path(file_path).with_suffix('.bin').name diff --git a/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile.srec b/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile.srec index 02d07c2c..06f741d8 100755 --- a/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile.srec +++ b/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile.srec @@ -1,6 +1,6 @@ S00F000068656C6C6F212020202000003B S11F00007C0802A6900100049421FFF07C6C1B787C8C23783C6000003863000026 -SF1F001C4BFFFFE5398000007D83637880010014382100107C0803A64E800020E9 +S11F001C4BFFFFE5398000007D83637880010014382100107C0803A64E800020E8 S111003848656C6C6F20776F726C642E0A0042 S5030003F9 S9030000FC \ No newline at end of file diff --git a/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile2.srec b/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile2.srec index ae8855eb..0b7151d0 100755 --- a/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile2.srec +++ b/fact_extractor/plugins/unpacking/srec/test/data/bad_testfile2.srec @@ -1,6 +1,6 @@ -SX0F000068656C6C6F212020202000003B -S11F00007C0802A6900100049421FFF07C6C1B787C8C23783C6000003863000026 -SF1F001C4BFFFFE5398000007D83637880010014382100107C0803A64E800020E9 +S00F000068656C6C6F212020202000003B +S11F00007C0802A6900100049421FFF07C6C1B787C8C23783C60003863000026 +S11F001C4BFFFFE5398000007D83637880010014382100107C0803A64E800020E9 S111003848656C6C6F20776F726C642E0A0042 S5030003F9 S9030000FC \ No newline at end of file diff --git a/fact_extractor/plugins/unpacking/srec/test/test_srec.py b/fact_extractor/plugins/unpacking/srec/test/test_srec.py index e319517f..de5bd7d4 100755 --- a/fact_extractor/plugins/unpacking/srec/test/test_srec.py +++ b/fact_extractor/plugins/unpacking/srec/test/test_srec.py @@ -18,6 +18,7 @@ def successful_extraction(files, meta_data): content = Path(files[0]).read_bytes() assert b'Hello world.' in content assert 'Success' in meta_data['output'] + assert files[0].endswith('.bin') class TestMotorolaSRecord(TestUnpackerBase): @@ -37,7 +38,7 @@ def test_extraction_bad_file(): with TemporaryDirectory() as tmp_dir: meta_data = unpack_function(file_path, tmp_dir) - assert 'not starting with an \'S\'' in meta_data['output'] + assert 'Error: no valid srec data found' in meta_data['output'] def test_extraction_decoding_error(): From 2ffbd89f38f73b780eed9ee935a87e50c96351f9 Mon Sep 17 00:00:00 2001 From: rhelmke Date: Wed, 16 Nov 2022 09:36:32 +0100 Subject: [PATCH 3/9] add github action for ci build --- .github/workflows/build_ci.yml | 45 ++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 .github/workflows/build_ci.yml diff --git a/.github/workflows/build_ci.yml b/.github/workflows/build_ci.yml new file mode 100644 index 00000000..3074bde9 --- /dev/null +++ b/.github/workflows/build_ci.yml @@ -0,0 +1,45 @@ +name: Build CI +run-name: Build CI +on: + pull_request: + branches: [ master ] + schedule: + - cron: "0 2 * * *" + workflow_dispatch: + +jobs: + build-ci: + runs-on: [ self-hosted, linux, x64, focal ] + timeout-minutes: 45 + steps: + - name: Add Masks + run: | + echo "::add-mask::${{ secrets.NETWORK_MASK_1 }}" + echo "::add-mask::${{ secrets.NETWORK_MASK_2 }}" + echo "::add-mask::${{ secrets.NETWORK_MASK_3 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_1 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_2 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_3 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_4 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_5 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_6 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_7 }}" + echo "::add-mask::${{ secrets.INTERNAL_NODE_8 }}" + echo "::add-mask::${{ secrets.SECRET_STRING_1 }}" + echo "::add-mask::${{ secrets.SECRET_STRING_2 }}" + echo "::add-mask::${{ secrets.SECRET_STRING_3 }}" + - name: Checkout Branch + uses: actions/checkout@v3 + with: + ref: ${{ github.ref }} + - name: Set ulimit + run: ulimit -n 9999 + - name: Pre-Installation + shell: 'script -q -e -c "bash {0}"' + run: ./fact_extractor/install/pre_install.sh + - name: Install FACT + shell: 'script -q -e -c "bash {0}"' + run: ./fact_extractor/install.py + - name: Unit Tests + shell: 'script -q -e -c "bash {0}"' + run: pytest From 94b6a88a9ecc0574c54394f33a162a070acbdeca Mon Sep 17 00:00:00 2001 From: rhelmke Date: Wed, 16 Nov 2022 10:35:04 +0100 Subject: [PATCH 4/9] pytest privileges -_- --- .github/workflows/build_ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_ci.yml b/.github/workflows/build_ci.yml index 3074bde9..3f86af78 100644 --- a/.github/workflows/build_ci.yml +++ b/.github/workflows/build_ci.yml @@ -37,9 +37,9 @@ jobs: - name: Pre-Installation shell: 'script -q -e -c "bash {0}"' run: ./fact_extractor/install/pre_install.sh - - name: Install FACT + - name: Install fact_extractor shell: 'script -q -e -c "bash {0}"' run: ./fact_extractor/install.py - name: Unit Tests shell: 'script -q -e -c "bash {0}"' - run: pytest + run: sudo -EH pytest From 3226717539aadfd9c046b425495b281ee4de11b0 Mon Sep 17 00:00:00 2001 From: rhelmke Date: Wed, 16 Nov 2022 13:59:26 +0100 Subject: [PATCH 5/9] remove sudo --- .github/workflows/build_ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_ci.yml b/.github/workflows/build_ci.yml index 3f86af78..d4e28805 100644 --- a/.github/workflows/build_ci.yml +++ b/.github/workflows/build_ci.yml @@ -42,4 +42,4 @@ jobs: run: ./fact_extractor/install.py - name: Unit Tests shell: 'script -q -e -c "bash {0}"' - run: sudo -EH pytest + run: pytest From 1415efc63601ddb1ab83282dc51e22ed4041e881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Wed, 16 Nov 2022 17:31:13 +0100 Subject: [PATCH 6/9] moved unpacking of supported file systems to 7z from generic_fs plugin: ext2/3/4, fat, ntfs, hfs, cramfs --- .pre-commit-config.yaml | 3 +- fact_extractor/helperFunctions/file_system.py | 12 ++++ .../unpacking/generic_fs/code/generic_fs.py | 14 ++--- .../generic_fs/test/test_plugin_generic_fs.py | 45 +------------- .../plugins/unpacking/sevenz/code/sevenz.py | 26 ++++++-- .../test/data/cramfs.img | Bin .../test/data/ext2.img.xz | Bin .../test/data/ext3.img.xz | Bin .../test/data/ext4.img.xz | Bin .../test/data/fat.img.xz | Bin .../test/data/hfs.img.xz | Bin .../test/data/ntfs.img.xz | Bin .../sevenz/test/test_plugin_sevenz.py | 57 +++++++++++++++--- .../test/unit/unpacker/test_unpacker.py | 43 ++++++++----- 14 files changed, 115 insertions(+), 85 deletions(-) rename fact_extractor/plugins/unpacking/{generic_fs => sevenz}/test/data/cramfs.img (100%) rename fact_extractor/plugins/unpacking/{generic_fs => sevenz}/test/data/ext2.img.xz (100%) rename fact_extractor/plugins/unpacking/{generic_fs => sevenz}/test/data/ext3.img.xz (100%) rename fact_extractor/plugins/unpacking/{generic_fs => sevenz}/test/data/ext4.img.xz (100%) rename fact_extractor/plugins/unpacking/{generic_fs => sevenz}/test/data/fat.img.xz (100%) rename fact_extractor/plugins/unpacking/{generic_fs => sevenz}/test/data/hfs.img.xz (100%) rename fact_extractor/plugins/unpacking/{generic_fs => sevenz}/test/data/ntfs.img.xz (100%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4c05dd88..9b215d37 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,10 +33,9 @@ repos: types: [python] - repo: https://gitlab.com/pycqa/flake8 - rev: a7be77f7 + rev: 3.9.2 hooks: - id: flake8 - args: [--select=W504, --ignore=E501,W503] - repo: https://github.com/pre-commit/mirrors-pylint rev: v2.2.2 diff --git a/fact_extractor/helperFunctions/file_system.py b/fact_extractor/helperFunctions/file_system.py index d7c0dd7c..53168a22 100644 --- a/fact_extractor/helperFunctions/file_system.py +++ b/fact_extractor/helperFunctions/file_system.py @@ -1,4 +1,7 @@ +import lzma +from contextlib import contextmanager from pathlib import Path +from tempfile import TemporaryDirectory SRC_DIR_PATH = Path(__file__).parent.parent.absolute() @@ -40,3 +43,12 @@ def file_name_sanitize(file_path) -> str: Returns file path without directory traversal ''' return file_path.replace('../', '') + + +@contextmanager +def decompress_test_file(test_file: Path) -> Path: + with TemporaryDirectory() as tmp_dir: + target_file = Path(tmp_dir) / 'fs.img' + with lzma.open(test_file) as decompressed_file: + target_file.write_bytes(decompressed_file.read()) + yield target_file diff --git a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py index 8522d98d..d28a6e97 100644 --- a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py +++ b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py @@ -12,14 +12,12 @@ NAME = 'genericFS' MIME_PATTERNS = [ - 'filesystem/btrfs', 'filesystem/cramfs', 'filesystem/dosmbr', 'filesystem/ext2', 'filesystem/ext3', - 'filesystem/ext4', 'filesystem/f2fs', 'filesystem/fat', 'filesystem/hfs', 'filesystem/jfs', 'filesystem/minix', - 'filesystem/ntfs', 'filesystem/reiserfs', 'filesystem/romfs', 'filesystem/udf', 'filesystem/xfs', 'generic/fs', + 'filesystem/btrfs', 'filesystem/dosmbr', 'filesystem/f2fs', 'filesystem/jfs', 'filesystem/minix', + 'filesystem/reiserfs', 'filesystem/romfs', 'filesystem/udf', 'filesystem/xfs', 'generic/fs', ] -VERSION = '0.6' +VERSION = '0.6.1' TYPES = { 'filesystem/btrfs': 'btrfs', - 'filesystem/cramfs': 'cramfs', 'filesystem/f2fs': 'f2fs', 'filesystem/jfs': 'jfs', 'filesystem/minix': 'minix', @@ -41,7 +39,7 @@ def unpack_function(file_path, tmp_dir): def _mount_single_filesystem(file_path, mime_type, tmp_dir): - type_parameter = '-t {}'.format(TYPES[mime_type]) if mime_type in TYPES else '' + type_parameter = f'-t {TYPES[mime_type]}' if mime_type in TYPES else '' mount_dir = TemporaryDirectory() output = execute_shell_command(f'sudo mount {type_parameter} -v -o ro,loop {file_path} {mount_dir.name}') output += execute_shell_command(f'sudo cp -av {mount_dir.name}/* {tmp_dir}/') @@ -51,7 +49,7 @@ def _mount_single_filesystem(file_path, mime_type, tmp_dir): def _mount_from_boot_record(file_path, tmp_dir): - output, return_code = execute_shell_command_get_return_code('sudo kpartx -a -v {}'.format(file_path)) + output, _ = execute_shell_command_get_return_code(f'sudo kpartx -a -v {file_path}') sleep(1) # Necessary since initialization of special devices seem to take some time # kpartx may return an error on one partition but others are still loaded correctly. loop_devices = _extract_loop_devices(output) @@ -67,7 +65,7 @@ def _mount_from_boot_record(file_path, tmp_dir): # Bug in kpartx doesn't allow -d to work on long file names (as in /storage/path//_) # thus "host" loop device is used instead of filename - k_output, return_code = execute_shell_command_get_return_code(f'sudo kpartx -d -v {_get_host_loop(loop_devices)}') + k_output, _ = execute_shell_command_get_return_code(f'sudo kpartx -d -v {_get_host_loop(loop_devices)}') execute_shell_command(f'sudo losetup -d {_get_host_loop(loop_devices)}') return output + k_output diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py b/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py index 53d67c5b..30002df4 100644 --- a/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py +++ b/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py @@ -1,8 +1,6 @@ -import lzma -from contextlib import contextmanager from pathlib import Path -from tempfile import TemporaryDirectory +from helperFunctions.file_system import decompress_test_file from test.unit.unpacker.test_unpacker import TestUnpackerBase from ..code.generic_fs import _extract_loop_devices @@ -17,13 +15,8 @@ class TestGenericFsUnpacker(TestUnpackerBase): def test_unpacker_selection_generic(self): self.check_unpacker_selection('filesystem/btrfs', 'genericFS') - self.check_unpacker_selection('filesystem/cramfs', 'genericFS') self.check_unpacker_selection('filesystem/dosmbr', 'genericFS') - self.check_unpacker_selection('filesystem/ext2', 'genericFS') - self.check_unpacker_selection('filesystem/ext3', 'genericFS') - self.check_unpacker_selection('filesystem/ext4', 'genericFS') self.check_unpacker_selection('filesystem/f2fs', 'genericFS') - self.check_unpacker_selection('filesystem/hfs', 'genericFS') self.check_unpacker_selection('filesystem/jfs', 'genericFS') self.check_unpacker_selection('filesystem/minix', 'genericFS') self.check_unpacker_selection('filesystem/reiserfs', 'genericFS') @@ -31,9 +24,6 @@ def test_unpacker_selection_generic(self): self.check_unpacker_selection('filesystem/udf', 'genericFS') self.check_unpacker_selection('filesystem/xfs', 'genericFS') - def test_extraction_cramfs(self): - self.check_unpacking_of_standard_unpack_set(TEST_DATA_DIR / 'cramfs.img') - def test_extraction_romfs(self): self.check_unpacking_of_standard_unpack_set(TEST_DATA_DIR / 'romfs.img') @@ -41,30 +31,6 @@ def test_extraction_btrfs(self): with decompress_test_file(TEST_DATA_DIR / 'btrfs.img.xz') as test_file: self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') - def test_extraction_ext2(self): - with decompress_test_file(TEST_DATA_DIR / 'ext2.img.xz') as test_file: - self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') - - def test_extraction_ext3(self): - with decompress_test_file(TEST_DATA_DIR / 'ext3.img.xz') as test_file: - self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') - - def test_extraction_ext4(self): - with decompress_test_file(TEST_DATA_DIR / 'ext4.img.xz') as test_file: - self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') - - def test_extraction_fat(self): - with decompress_test_file(TEST_DATA_DIR / 'fat.img.xz') as test_file: - self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') - - def test_extraction_ntfs(self): - with decompress_test_file(TEST_DATA_DIR / 'ntfs.img.xz') as test_file: - self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') - - def test_extraction_hfs(self): - with decompress_test_file(TEST_DATA_DIR / 'hfs.img.xz') as test_file: - self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') - def test_extraction_jfs(self): with decompress_test_file(TEST_DATA_DIR / 'jfs.img.xz') as test_file: self.check_unpacking_of_standard_unpack_set(test_file, additional_prefix_folder='get_files_test') @@ -109,12 +75,3 @@ def test_extract_loop_devices(): loop_devices = _extract_loop_devices(KPARTX_OUTPUT) assert loop_devices assert loop_devices == ['loop7p1', 'loop7p2'] - - -@contextmanager -def decompress_test_file(test_file: Path) -> Path: - with TemporaryDirectory() as tmp_dir: - target_file = Path(tmp_dir) / 'fs.img' - with lzma.open(test_file) as decompressed_file: - target_file.write_bytes(decompressed_file.read()) - yield target_file diff --git a/fact_extractor/plugins/unpacking/sevenz/code/sevenz.py b/fact_extractor/plugins/unpacking/sevenz/code/sevenz.py index 9c18076f..4fb7fcd8 100644 --- a/fact_extractor/plugins/unpacking/sevenz/code/sevenz.py +++ b/fact_extractor/plugins/unpacking/sevenz/code/sevenz.py @@ -9,10 +9,24 @@ from helperFunctions.file_system import get_src_dir NAME = '7z' -MIME_PATTERNS = ['application/x-lzma', 'application/x-7z-compressed', 'application/zip', 'application/x-zip-compressed'] -VERSION = '0.7' - -UNPACKER_EXECUTEABLE = '7z' +MIME_PATTERNS = [ + # compressed archives + 'application/x-lzma', + 'application/x-7z-compressed', + 'application/zip', + 'application/x-zip-compressed', + # file systems + 'filesystem/cramfs', + 'filesystem/ext2', + 'filesystem/ext3', + 'filesystem/ext4', + 'filesystem/fat', + 'filesystem/hfs', + 'filesystem/ntfs', +] +VERSION = '0.8' + +UNPACKER_EXECUTABLE = '7z' PW_LIST = get_merged_password_set(os.path.join(get_src_dir(), 'unpacker/passwords')) @@ -23,7 +37,7 @@ def unpack_function(file_path, tmp_dir): ''' meta = {} for password in PW_LIST: - execution_string = 'fakeroot {} x -y -p{} -o{} {}'.format(UNPACKER_EXECUTEABLE, password, tmp_dir, file_path) + execution_string = f'fakeroot {UNPACKER_EXECUTABLE} x -y -p{password} -o{tmp_dir} {file_path}' output = execute_shell_command(execution_string) meta['output'] = output @@ -34,7 +48,7 @@ def unpack_function(file_path, tmp_dir): # Inform the user if not correct password was found if 'Wrong password' in meta['output']: - logging.warn('Password for {} not found in fact_extractor/unpacker/passwords directory'.format(file_path)) + logging.warning(f'Password for {file_path} not found in fact_extractor/unpacker/passwords directory') return meta diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/data/cramfs.img b/fact_extractor/plugins/unpacking/sevenz/test/data/cramfs.img similarity index 100% rename from fact_extractor/plugins/unpacking/generic_fs/test/data/cramfs.img rename to fact_extractor/plugins/unpacking/sevenz/test/data/cramfs.img diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/data/ext2.img.xz b/fact_extractor/plugins/unpacking/sevenz/test/data/ext2.img.xz similarity index 100% rename from fact_extractor/plugins/unpacking/generic_fs/test/data/ext2.img.xz rename to fact_extractor/plugins/unpacking/sevenz/test/data/ext2.img.xz diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/data/ext3.img.xz b/fact_extractor/plugins/unpacking/sevenz/test/data/ext3.img.xz similarity index 100% rename from fact_extractor/plugins/unpacking/generic_fs/test/data/ext3.img.xz rename to fact_extractor/plugins/unpacking/sevenz/test/data/ext3.img.xz diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/data/ext4.img.xz b/fact_extractor/plugins/unpacking/sevenz/test/data/ext4.img.xz similarity index 100% rename from fact_extractor/plugins/unpacking/generic_fs/test/data/ext4.img.xz rename to fact_extractor/plugins/unpacking/sevenz/test/data/ext4.img.xz diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/data/fat.img.xz b/fact_extractor/plugins/unpacking/sevenz/test/data/fat.img.xz similarity index 100% rename from fact_extractor/plugins/unpacking/generic_fs/test/data/fat.img.xz rename to fact_extractor/plugins/unpacking/sevenz/test/data/fat.img.xz diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/data/hfs.img.xz b/fact_extractor/plugins/unpacking/sevenz/test/data/hfs.img.xz similarity index 100% rename from fact_extractor/plugins/unpacking/generic_fs/test/data/hfs.img.xz rename to fact_extractor/plugins/unpacking/sevenz/test/data/hfs.img.xz diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/data/ntfs.img.xz b/fact_extractor/plugins/unpacking/sevenz/test/data/ntfs.img.xz similarity index 100% rename from fact_extractor/plugins/unpacking/generic_fs/test/data/ntfs.img.xz rename to fact_extractor/plugins/unpacking/sevenz/test/data/ntfs.img.xz diff --git a/fact_extractor/plugins/unpacking/sevenz/test/test_plugin_sevenz.py b/fact_extractor/plugins/unpacking/sevenz/test/test_plugin_sevenz.py index 17d9464f..cb2d895f 100644 --- a/fact_extractor/plugins/unpacking/sevenz/test/test_plugin_sevenz.py +++ b/fact_extractor/plugins/unpacking/sevenz/test/test_plugin_sevenz.py @@ -1,20 +1,57 @@ -import os -from test.unit.unpacker.test_unpacker import TestUnpackerBase +from pathlib import Path +from helperFunctions.file_system import decompress_test_file +from plugins.unpacking.sevenz.code.sevenz import MIME_PATTERNS +from test.unit.unpacker.test_unpacker import TestUnpackerBase -TEST_DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data') +TEST_DATA_DIR = Path(__file__).parent / 'data' class TestSevenZUnpacker(TestUnpackerBase): - def test_unpacker_selection_generic(self): - mimes = ['application/x-7z-compressed', 'application/x-lzma', 'application/zip', 'application/x-zip-compressed'] - for item in mimes: + for item in MIME_PATTERNS: self.check_unpacker_selection(item, '7z') - def test_extraction_sevenz(self): - self.check_unpacking_of_standard_unpack_set(os.path.join(TEST_DATA_DIR, "test.7z"), additional_prefix_folder="get_files_test", output=True) + def test_extraction_7z(self): + self.check_unpacking_of_standard_unpack_set( + TEST_DATA_DIR / 'test.7z', + additional_prefix_folder='get_files_test', + output=True, + ) + + def test_extraction_cramfs(self): + self.check_unpacking_of_standard_unpack_set( + TEST_DATA_DIR / 'cramfs.img', + output=True, + ) + + def test_extraction_fat(self): + with decompress_test_file(TEST_DATA_DIR / 'fat.img.xz') as test_file: + self.check_unpacking_of_standard_unpack_set( + test_file, output=True, additional_prefix_folder='get_files_test' + ) + + def test_extraction_hfs(self): + with decompress_test_file(TEST_DATA_DIR / 'hfs.img.xz') as test_file: + self.check_unpacking_of_standard_unpack_set( + test_file, output=True, additional_prefix_folder='untitled/get_files_test' + ) + + def test_extraction_ntfs(self): + with decompress_test_file(TEST_DATA_DIR / 'ntfs.img.xz') as test_file: + self.check_unpacking_of_standard_unpack_set( + test_file, output=True, additional_prefix_folder='get_files_test', ignore={'[SYSTEM]'} + ) + + def test_extraction_ext(self): + for index in [2, 3, 4]: + with decompress_test_file(TEST_DATA_DIR / f'ext{index}.img.xz') as test_file: + self.check_unpacking_of_standard_unpack_set( + test_file, output=True, additional_prefix_folder='get_files_test', ignore={'Journal'} + ) def test_extraction_password(self): - meta = self.check_unpacking_of_standard_unpack_set(os.path.join(TEST_DATA_DIR, "test_password.7z"), additional_prefix_folder="get_files_test", output=True) - self.assertEqual(meta['password'], 'test', "password info not set") + meta = self.check_unpacking_of_standard_unpack_set( + TEST_DATA_DIR / 'test_password.7z', additional_prefix_folder="get_files_test", output=True + ) + assert meta['password'] == 'test', 'password info not set' diff --git a/fact_extractor/test/unit/unpacker/test_unpacker.py b/fact_extractor/test/unit/unpacker/test_unpacker.py index bc46c3c0..90fde0f8 100644 --- a/fact_extractor/test/unit/unpacker/test_unpacker.py +++ b/fact_extractor/test/unit/unpacker/test_unpacker.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import gc import json import os @@ -6,8 +8,7 @@ from configparser import ConfigParser from pathlib import Path from tempfile import TemporaryDirectory -from typing import Union -from unittest.mock import patch, Mock +from unittest.mock import Mock, patch from helperFunctions.file_system import get_test_data_dir from unpacker.unpack import Unpacker @@ -38,30 +39,39 @@ def tearDown(self): gc.collect() def get_unpacker_meta(self): - return json.loads(Path(self.unpacker._report_folder, 'meta.json').read_text()) # pylint: disable=protected-access + return json.loads( + Path(self.unpacker._report_folder, 'meta.json').read_text() + ) # pylint: disable=protected-access def check_unpacker_selection(self, mime_type, plugin_name): name = self.unpacker.get_unpacker(mime_type)[1] self.assertEqual(name, plugin_name, 'wrong unpacker plugin selected') - def check_unpacking_of_standard_unpack_set(self, in_file: Union[Path, str], additional_prefix_folder='', output=True): + def check_unpacking_of_standard_unpack_set( + self, + in_file: Path | str, + additional_prefix_folder: str = '', + output: bool = True, + ignore: set[str] | None = None, + ): files, meta_data = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name) - files = set(files) - self.assertEqual(len(files), 3, 'file number incorrect') - self.assertEqual(files, { + files = {f for f in files if not any(rule in f for rule in ignore or set())} + assert len(files) == 3, 'file number incorrect' + assert files == { os.path.join(self.tmp_dir.name, additional_prefix_folder, 'testfile1'), os.path.join(self.tmp_dir.name, additional_prefix_folder, 'testfile2'), - os.path.join(self.tmp_dir.name, additional_prefix_folder, 'generic folder/test file 3_.txt') - }, 'not all files found') + os.path.join(self.tmp_dir.name, additional_prefix_folder, 'generic folder/test file 3_.txt'), + }, 'not all files found' if output: - self.assertIn('output', meta_data) + assert 'output' in meta_data return meta_data class TestUnpackerCore(TestUnpackerBase): - def test_generic_carver_found(self): - self.assertTrue('generic/carver' in list(self.unpacker.unpacker_plugins.keys()), 'generic carver plugin not found') + self.assertTrue( + 'generic/carver' in list(self.unpacker.unpacker_plugins.keys()), 'generic carver plugin not found' + ) name = self.unpacker.unpacker_plugins['generic/carver'][1] self.assertEqual(name, 'generic_carver', 'generic_carver plugin not found') @@ -143,15 +153,18 @@ def _unpack_fallback_check(self, fallback_mime, fallback_plugin_name): class TestUnpackerCoreMain(TestUnpackerBase): - def main_unpack_check(self, file_path, number_unpacked_files, number_of_excluded_files, first_unpacker): extracted_files = self.unpacker.unpack(file_path) meta_data = self.get_unpacker_meta() self.assertEqual(len(extracted_files), number_unpacked_files, 'not all files found') self.assertEqual(meta_data['plugin_used'], first_unpacker, 'Wrong plugin in Meta') - self.assertEqual(meta_data['number_of_unpacked_files'], number_unpacked_files, 'Number of unpacked files wrong in Meta') - self.assertEqual(meta_data['number_of_excluded_files'], number_of_excluded_files, 'Number of excluded files wrong in Meta') + self.assertEqual( + meta_data['number_of_unpacked_files'], number_unpacked_files, 'Number of unpacked files wrong in Meta' + ) + self.assertEqual( + meta_data['number_of_excluded_files'], number_of_excluded_files, 'Number of excluded files wrong in Meta' + ) def test_main_unpack_function(self): test_file_path = Path(get_test_data_dir(), 'container/test.zip') From 8e247d67b9a0a81814508512551afe5b62866853 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Wed, 16 Nov 2022 17:34:31 +0100 Subject: [PATCH 7/9] restore pylint config parameters --- .pre-commit-config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9b215d37..da5ba7be 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -36,6 +36,7 @@ repos: rev: 3.9.2 hooks: - id: flake8 + args: [--select=W504, --ignore=E501,W503] - repo: https://github.com/pre-commit/mirrors-pylint rev: v2.2.2 From 85ce4ae5046610e9bbbcfc1ee846ca24b2ed3e07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 17 Nov 2022 10:32:43 +0100 Subject: [PATCH 8/9] Update README.md mention extra kernel module dependencies of generic_fs unpacker --- README.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9cdccd19..62738f54 100644 --- a/README.md +++ b/README.md @@ -30,9 +30,15 @@ fact_extractor/install/pre_install.sh fact_extractor/install.py ``` -:warning: **We no longer support Ubuntu 16.04 and Python 3.5** +:warning: **We no longer support Ubuntu 16.04 and Python <3.7** (It may still work with a bit of tinkering, though) +:warning: For the `generic_fs` unpacker plugin to work with all file system types, you may need to install extra kernel modules + +```sh +sudo apt install linux-modules-extra-$(uname -r) +``` + The tool can then be run with ```bash @@ -92,7 +98,7 @@ This project is partly financed by [German Federal Office for Information Securi ## License ``` Firmware Analysis and Comparison Tool (FACT) extractor - Copyright (C) 2015-2019 Fraunhofer FKIE + Copyright (C) 2015-2022 Fraunhofer FKIE This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by From 0025eb0bcd14868cfd796870b8739367c1bfe959 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 17 Nov 2022 12:07:22 +0100 Subject: [PATCH 9/9] added warning in case fs kernel modules seem to be missing and also refactored the code and removed the common_helper_process dependency --- .../unpacking/generic_fs/code/generic_fs.py | 46 +++++++++++-------- .../generic_fs/test/test_plugin_generic_fs.py | 12 ++++- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py index 8522d98d..d4920e1b 100644 --- a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py +++ b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py @@ -2,12 +2,11 @@ This plugin mounts filesystem images and extracts their content ''' import re +from shlex import split +from subprocess import run, PIPE, STDOUT from tempfile import TemporaryDirectory from time import sleep -from common_helper_process import ( - execute_shell_command, execute_shell_command_get_return_code -) from fact_helper_file import get_file_type_from_path NAME = 'genericFS' @@ -41,17 +40,28 @@ def unpack_function(file_path, tmp_dir): def _mount_single_filesystem(file_path, mime_type, tmp_dir): - type_parameter = '-t {}'.format(TYPES[mime_type]) if mime_type in TYPES else '' - mount_dir = TemporaryDirectory() - output = execute_shell_command(f'sudo mount {type_parameter} -v -o ro,loop {file_path} {mount_dir.name}') - output += execute_shell_command(f'sudo cp -av {mount_dir.name}/* {tmp_dir}/') - output += execute_shell_command(f'sudo umount -v {mount_dir.name}') - mount_dir.cleanup() + type_parameter = f'-t {TYPES[mime_type]}' if mime_type in TYPES else '' + with TemporaryDirectory() as mount_dir: + output = _get_output(f'sudo mount {type_parameter} -v -o ro,loop {file_path} {mount_dir}') + output += _get_output(f'sudo cp -av {mount_dir}/* {tmp_dir}/') + output += _get_output(f'sudo umount -v {mount_dir}') + + if 'unknown filesystem type' in output: + output += '\nwarning: you may need to install additional kernel modules' return output +def _get_output(command: str) -> str: + environment = {'LANG': 'en_US.UTF-8'} # use LANG env variable to get unified localization output + return run(command, shell=True, env=environment, check=False, text=True, stdout=PIPE, stderr=STDOUT).stdout + + +def _run(command: str): + run(split(command), check=False) + + def _mount_from_boot_record(file_path, tmp_dir): - output, return_code = execute_shell_command_get_return_code('sudo kpartx -a -v {}'.format(file_path)) + output = _get_output(f'sudo kpartx -a -v {file_path}') sleep(1) # Necessary since initialization of special devices seem to take some time # kpartx may return an error on one partition but others are still loaded correctly. loop_devices = _extract_loop_devices(output) @@ -63,21 +73,20 @@ def _mount_from_boot_record(file_path, tmp_dir): if loop_devices: # Occasionally device mapping isn't removed correctly and results in losetup -d to fail, so remove explicitly for loop_dev in loop_devices: - execute_shell_command(f'sudo dmsetup remove /dev/mapper/{loop_dev}') + _run(f'sudo dmsetup remove /dev/mapper/{loop_dev}') # Bug in kpartx doesn't allow -d to work on long file names (as in /storage/path//_) # thus "host" loop device is used instead of filename - k_output, return_code = execute_shell_command_get_return_code(f'sudo kpartx -d -v {_get_host_loop(loop_devices)}') - execute_shell_command(f'sudo losetup -d {_get_host_loop(loop_devices)}') - return output + k_output + output += _get_output(f'sudo kpartx -d -v {_get_host_loop(loop_devices)}') + _run(f'sudo losetup -d {_get_host_loop(loop_devices)}') return output def _process_loop_device(loop_device, mount_point, target_directory, index): - output = execute_shell_command(f'sudo mount -o ro -v /dev/mapper/{loop_device} {mount_point}') - output += execute_shell_command(f'sudo cp -av {mount_point}/ {target_directory}/partition_{index}/') - return output + execute_shell_command(f'sudo umount -v {mount_point}') + output = _get_output(f'sudo mount -o ro -v /dev/mapper/{loop_device} {mount_point}') + output += _get_output(f'sudo cp -av {mount_point}/ {target_directory}/partition_{index}/') + return output + _get_output(f'sudo umount -v {mount_point}') def _extract_loop_devices(kpartx_output): @@ -85,7 +94,8 @@ def _extract_loop_devices(kpartx_output): def _get_host_loop(devices): - return '/dev/{}'.format(re.findall(r'(loop\d{1,2})', devices[0])[0]) + device = re.findall(r'(loop\d{1,2})', devices[0])[0] + return f'/dev/{device}' # ----> Do not edit below this line <---- diff --git a/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py b/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py index 53d67c5b..3c5589e7 100644 --- a/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py +++ b/fact_extractor/plugins/unpacking/generic_fs/test/test_plugin_generic_fs.py @@ -4,7 +4,7 @@ from tempfile import TemporaryDirectory from test.unit.unpacker.test_unpacker import TestUnpackerBase -from ..code.generic_fs import _extract_loop_devices +from ..code.generic_fs import _extract_loop_devices, _mount_single_filesystem, TYPES TEST_DATA_DIR = Path(__file__).parent / 'data' KPARTX_OUTPUT = ''' @@ -111,6 +111,16 @@ def test_extract_loop_devices(): assert loop_devices == ['loop7p1', 'loop7p2'] +def test_unknown_filesystem(): + try: + TYPES['foobar'] = 'foobar' + with TemporaryDirectory() as tmp_dir: + output = _mount_single_filesystem(TEST_DATA_DIR / 'cramfs.img', 'foobar', tmp_dir) + assert 'you may need to install additional kernel modules' in output + finally: + TYPES.pop('foobar') + + @contextmanager def decompress_test_file(test_file: Path) -> Path: with TemporaryDirectory() as tmp_dir: