|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# This file is part of Checkbox. |
| 3 | +# |
| 4 | +# Copyright 2024-2025 Canonical Ltd. |
| 5 | +# Authors: |
| 6 | +# Fernando Bravo <[email protected]> |
| 7 | +# Jeffery Yen <songpao2262gmail.com> |
| 8 | +# |
| 9 | +# Checkbox is free software: you can redistribute it and/or modify |
| 10 | +# it under the terms of the GNU General Public License version 3, |
| 11 | +# as published by the Free Software Foundation. |
| 12 | +# |
| 13 | +# Checkbox is distributed in the hope that it will be useful, |
| 14 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 15 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 16 | +# GNU General Public License for more details. |
| 17 | +# |
| 18 | +# You should have received a copy of the GNU General Public License |
| 19 | +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. |
| 20 | + |
| 21 | +import unittest |
| 22 | +import subprocess |
| 23 | +import contextlib |
| 24 | +from unittest.mock import patch, MagicMock, call |
| 25 | + |
| 26 | +from checkbox_support.scripts.usb_read_write import ( |
| 27 | + mount_usb_storage, |
| 28 | + write_test_unit, |
| 29 | +) |
| 30 | + |
| 31 | + |
| 32 | +class TestUsbReadWrite(unittest.TestCase): |
| 33 | + |
| 34 | + @patch("os.path") |
| 35 | + @patch("subprocess.Popen") |
| 36 | + @patch("subprocess.check_output") |
| 37 | + @patch("subprocess.run") |
| 38 | + def test_write_test_unit( |
| 39 | + self, mock_run, mock_check_output, mock_popen, mock_os |
| 40 | + ): |
| 41 | + mock_os.join.return_value = "output_file" |
| 42 | + |
| 43 | + mock_process = MagicMock() |
| 44 | + mock_process.communicate.return_value = ( |
| 45 | + b"2048+1 records in\n2048+1 records out\n1049076 bytes (1.0 MB) " |
| 46 | + b"copied, 0.00473357 s, 222 MB/s\n", |
| 47 | + None, |
| 48 | + ) |
| 49 | + mock_popen.return_value = mock_process |
| 50 | + |
| 51 | + random_file = MagicMock() |
| 52 | + random_file.tfile.name = "random_file" |
| 53 | + write_test_unit(random_file) |
| 54 | + |
| 55 | + mock_popen.assert_called_once_with( |
| 56 | + [ |
| 57 | + "dd", |
| 58 | + "if=random_file", |
| 59 | + "of=output_file", |
| 60 | + "bs=1M", |
| 61 | + "oflag=sync", |
| 62 | + ], |
| 63 | + stderr=subprocess.STDOUT, |
| 64 | + stdout=subprocess.PIPE, |
| 65 | + env={"LC_NUMERIC": "C"}, |
| 66 | + ) |
| 67 | + mock_popen.return_value.communicate.assert_called_with() |
| 68 | + |
| 69 | + @patch("os.path") |
| 70 | + @patch("subprocess.Popen") |
| 71 | + @patch("subprocess.check_output") |
| 72 | + @patch("subprocess.run") |
| 73 | + def test_write_test_unit_wrong_units( |
| 74 | + self, mock_run, mock_check_output, mock_popen, mock_os |
| 75 | + ): |
| 76 | + mock_os.join.return_value = "output_file" |
| 77 | + |
| 78 | + mock_process = MagicMock() |
| 79 | + mock_process.communicate.return_value = ( |
| 80 | + b"2048+1 records in\n2048+1 records out\n1049076 bytes (1.0 MB) " |
| 81 | + b"copied, 0.00473357 s, 222 ***/s\n", |
| 82 | + None, |
| 83 | + ) |
| 84 | + mock_popen.return_value = mock_process |
| 85 | + |
| 86 | + random_file = MagicMock() |
| 87 | + random_file.tfile.name = "random_file" |
| 88 | + with self.assertRaises(SystemExit): |
| 89 | + write_test_unit(random_file) |
| 90 | + |
| 91 | + @patch("os.path") |
| 92 | + @patch("subprocess.Popen") |
| 93 | + @patch("subprocess.check_output") |
| 94 | + @patch("subprocess.run") |
| 95 | + def test_write_test_unit_io_error( |
| 96 | + self, mock_run, mock_check_output, mock_popen, mock_os |
| 97 | + ): |
| 98 | + mock_os.join.return_value = "output_file" |
| 99 | + |
| 100 | + mock_process = MagicMock() |
| 101 | + mock_process.communicate.return_value = ( |
| 102 | + b"2048+1 records in\n2048+1 records out\n1049076 bytes (1.0 MB) " |
| 103 | + b"copied, 0.00473357 s, 222 MBs\n", |
| 104 | + None, |
| 105 | + ) |
| 106 | + mock_popen.return_value = mock_process |
| 107 | + |
| 108 | + dmesg = MagicMock() |
| 109 | + dmesg.stdout.decode.return_value = "I/O error" |
| 110 | + mock_run.return_value = dmesg |
| 111 | + |
| 112 | + random_file = MagicMock() |
| 113 | + random_file.tfile.name = "random_file" |
| 114 | + with self.assertRaises(SystemExit): |
| 115 | + write_test_unit(random_file) |
| 116 | + |
| 117 | + |
| 118 | +class TestMountUsbStorage(unittest.TestCase): |
| 119 | + @patch("checkbox_support.scripts.usb_read_write.subprocess.call") |
| 120 | + @patch( |
| 121 | + "checkbox_support.scripts.usb_read_write.os.path.join", |
| 122 | + return_value="/dev/sda1", |
| 123 | + ) |
| 124 | + @patch("checkbox_support.scripts.usb_read_write.sys.exit") |
| 125 | + def test_mount_usb_storage_success(self, mock_exit, mock_join, mock_call): |
| 126 | + """ |
| 127 | + Test the success scenario: |
| 128 | + - Simulate that all subprocess.call calls return 0 (success), |
| 129 | + with the mount command returning 0 to indicate a successful mount. |
| 130 | + - Verify that the following commands are called in order: |
| 131 | + 1. ["sync"] |
| 132 | + 2. ["udevadm", "settle", "--timeout=10"] |
| 133 | + 3. ["umount", FOLDER_TO_MOUNT] (here we patch FOLDER_TO_MOUNT to |
| 134 | + "/mnt/usb") |
| 135 | + 4. ["umount", "/dev/sda1"] |
| 136 | + 5. ["mount", "/dev/sda1", "/mnt/usb"] |
| 137 | + - Upon exiting the context, the finally block should also call |
| 138 | + unmount on the folder (["umount", "/mnt/usb"]). |
| 139 | + - And sys.exit should not be called. |
| 140 | + """ |
| 141 | + |
| 142 | + # Simulate that all the calls are passing |
| 143 | + mock_call.return_value = 0 |
| 144 | + |
| 145 | + # Patch FOLDER_TO_MOUNT to be "/mnt/usb" |
| 146 | + with patch( |
| 147 | + "checkbox_support.scripts.usb_read_write.FOLDER_TO_MOUNT", |
| 148 | + "/mnt/usb", |
| 149 | + ): |
| 150 | + # Enter the mount_usb_storage context using a context manager |
| 151 | + with contextlib.ExitStack() as stack: |
| 152 | + stack.enter_context(mount_usb_storage("sda1")) |
| 153 | + # When entering the context, the following commands should be |
| 154 | + # executed in order: |
| 155 | + expected_calls_entry = [ |
| 156 | + call(["sync"]), |
| 157 | + call(["udevadm", "settle", "--timeout=10"]), |
| 158 | + call(["umount", "/mnt/usb"], stderr=subprocess.PIPE), |
| 159 | + call(["umount", "/dev/sda1"], stderr=subprocess.PIPE), |
| 160 | + call(["mount", "/dev/sda1", "/mnt/usb"]), |
| 161 | + ] |
| 162 | + for expected in expected_calls_entry: |
| 163 | + self.assertIn(expected, mock_call.call_args_list) |
| 164 | + # When exiting the context, the finally block should execute: |
| 165 | + expected_final_call = call(["umount", "/mnt/usb"]) |
| 166 | + self.assertIn(expected_final_call, mock_call.call_args_list) |
| 167 | + # Ensure that sys.exit was not called |
| 168 | + mock_exit.assert_not_called() |
| 169 | + |
| 170 | + @patch("checkbox_support.scripts.usb_read_write.subprocess.call") |
| 171 | + @patch("logging.error") |
| 172 | + def test_mount_usb_storage_failure(self, mock_log, mock_call): |
| 173 | + """ |
| 174 | + Test the failure scenario: |
| 175 | + - Simulate that the mount command returns 1 (failure). |
| 176 | + - The program should call sys.exit(1) and raise a SystemExit. |
| 177 | + """ |
| 178 | + |
| 179 | + def call_side_effect(args, **kwargs): |
| 180 | + if args[0] == "mount": |
| 181 | + return 1 # Simulate mount failure |
| 182 | + else: |
| 183 | + return 0 |
| 184 | + |
| 185 | + mock_call.side_effect = call_side_effect |
| 186 | + |
| 187 | + with patch( |
| 188 | + "checkbox_support.scripts.usb_read_write.FOLDER_TO_MOUNT", |
| 189 | + "/mnt/usb", |
| 190 | + ): |
| 191 | + with self.assertRaises(SystemExit) as context: |
| 192 | + with mount_usb_storage("sda1"): |
| 193 | + # As soon as we enter the context, a non-zero return from |
| 194 | + # mount should trigger sys.exit(1) |
| 195 | + pass |
| 196 | + |
| 197 | + self.assertEqual(context.exception.code, 1) |
| 198 | + |
| 199 | + # Verify that the program logs the error message |
| 200 | + mock_log.assert_called_once_with("mount /dev/sda1 on /mnt/usb failed.") |
| 201 | + |
| 202 | + |
| 203 | +if __name__ == "__main__": |
| 204 | + unittest.main() |
0 commit comments