diff --git a/.env b/.env new file mode 100644 index 0000000..d28a0ee --- /dev/null +++ b/.env @@ -0,0 +1 @@ +PYTHONPATH=src \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..a37cbaf --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,42 @@ +name: MicroPython CI + +on: + push: + branches: + - main + pull_request: + branches: + - "*" + +jobs: + test: + name: Run Tests and Linting + runs-on: ubuntu-latest + + steps: + # Step 1: Checkout the repository + - name: Checkout Code + uses: actions/checkout@v3 + + # Step 2: Set up Python + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: 3.11 + + # Step 3: Install dependencies + - name: Install Dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + # Step 4: Run linting + - name: Run Linting + run: | + ruff format src tests + ruff check src tests --fix --exit-zero --line-length 100 --target-version py38 + + # Step 5: Run tests + - name: Run Tests + run: | + PYTHONPATH=src pytest -svv \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cc5edae --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +.mypy_cache +.coverage +htmlcov +wifi.dat \ No newline at end of file diff --git a/.justfile b/.justfile new file mode 100644 index 0000000..fe41062 --- /dev/null +++ b/.justfile @@ -0,0 +1,19 @@ +run-test: + PYTHONPATH=src pytest -svv +run-test-filter TEST: + PYTHONPATH=src pytest -svv -k "{{TEST}}" +lint: + ruff format src tests + ruff check src tests --fix --exit-zero --line-length 100 --target-version py38 + +install-requirement: + pip install -r requirements.txt +list: + mpremote ls +upload: + mpremote mkdir :lib || echo "Directory already exists." + mpremote cp src/wifi_manager/*.py :lib/wifi_manager/ + # just to check if the files are uploaded + mpremote ls :lib/wifi_manager/ +mount_and_run: + mpremote mount src/ run src/main.py \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..78c7e53 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,20 @@ +{ + "editor.rulers": [ + 100, + ], + "[python]": { + "editor.formatOnSave": true, + "editor.defaultFormatter": "charliermarsh.ruff", + }, + "pylint.enabled": false, + "ruff.lint.enable": true, + "ruff.lint.select": ["E501"], + "python.testing.pytestArgs": [ + "tests", + ], + "python.envFile": "${workspaceFolder}/.env", + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true, + "editor.fontSize": 14, + "flake8.enabled": false, +} \ No newline at end of file diff --git a/LICENSE b/LICENSE index 42d8de5..ca44e48 100644 --- a/LICENSE +++ b/LICENSE @@ -1,5 +1,6 @@ MIT License +Copyright (c) 2025 Youngmin Kim Copyright (c) 2021 Igor Ferreira Permission is hereby granted, free of charge, to any person obtaining a copy diff --git a/README.md b/README.md index c772469..214f603 100644 --- a/README.md +++ b/README.md @@ -1,50 +1,29 @@ # WiFi Manager -WiFi Manager for ESP8266 and ESP32 using MicroPython. It might work in any other board since it only uses standard MicroPython libraries, but that's not tested. +WiFi Manager for ESP32 using MicroPython. It might work in any other board since it only uses standard MicroPython libraries, but that's not tested. -![ESP8266](https://img.shields.io/badge/ESP-8266-000000.svg?longCache=true&style=flat&colorA=CC101F) ![ESP32](https://img.shields.io/badge/ESP-32-000000.svg?longCache=true&style=flat&colorA=CC101F) - -## What's new? - -Version 2.0 comes with some improvements: -- Better documentation (I hope); -- Some aesthetical changes in the code; -- Removal of unnecessary messages; -- Removal of the ability to set the ip address (to avoid unexpected problems); -- Option to reboot after network configuration (needs improvement); - -## Wishlist - -- [ ] Allow user to customize CSS; -- [ ] Custom fields for extra configuration (like mqtt server, etc) -- [ ] Turn this into a real python library with the option to be installed using pip; +![CI](https://github.com/ymkim92/micropython-wifi_manager/actions/workflows/ci.yml/badge.svg) ## How It Works - When your device starts up, it will try to connect to a previously saved wifi. - If there is no saved network or if it fails to connect, it will start an access point; -- By connecting to the access point and going to the address 192.168.4.1 you be able to find your network and input the credentials; +- By connecting to the access point and going to the address `192.168.4.1` you be able to find your network and input the credentials; - It will try to connect to the desired network, and if it's successful, it will save the credentials for future usage; - Be aware that the wifi credentials will be saved in a plain text file, and this can be a security fault depending on your application; ## Installation and Usage + +I use `justfile` to upload the scripts to a target device: -```python -# Download the "wifi_manager.py" file to your device; - -# Import the library: -from wifi_manager import WifiManager - -# Initialize it -wm = WifiManager() - -# By default the SSID is WiFiManager and the password is wifimanager. -# You can customize the SSID and password of the AP for your needs: -wm = WifiManager(ssid="my ssid",password="my password") +```sh +$ just upload +``` -# Start the connection: -wm.connect() +You may want to use `mount` of mpremote for test or debugging: +```sh +$ just mount_and_run ``` ## Methods diff --git a/just.bash b/just.bash new file mode 100644 index 0000000..4647ceb --- /dev/null +++ b/just.bash @@ -0,0 +1,134 @@ +_just() { + local i cur prev opts cmds + COMPREPLY=() + cur="${COMP_WORDS[COMP_CWORD]}" + prev="${COMP_WORDS[COMP_CWORD-1]}" + cmd="" + opts="" + + for i in ${COMP_WORDS[@]} + do + case "${i}" in + "$1") + cmd="just" + ;; + + *) + ;; + esac + done + + case "${cmd}" in + just) + opts=" -n -q -u -v -e -l -h -V -f -d -c -s --check --yes --dry-run --highlight --no-dotenv --no-highlight --quiet --shell-command --clear-shell-args --unsorted --unstable --verbose --changelog --choose --dump --edit --evaluate --fmt --init --list --summary --variables --help --version --chooser --color --command-color --dump-format --list-heading --list-prefix --justfile --set --shell --shell-arg --working-directory --command --completions --show --dotenv-filename --dotenv-path ... " + if [[ ${cur} == -* ]] ; then + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + return 0 + elif [[ ${COMP_CWORD} -eq 1 ]]; then + local recipes=$(just --summary 2> /dev/null) + + if echo "${cur}" | \grep -qF '/'; then + local path_prefix=$(echo "${cur}" | sed 's/[/][^/]*$/\//') + local recipes=$(just --summary 2> /dev/null -- "${path_prefix}") + local recipes=$(printf "${path_prefix}%s\t" $recipes) + fi + + if [[ $? -eq 0 ]]; then + COMPREPLY=( $(compgen -W "${recipes}" -- "${cur}") ) + return 0 + fi + fi + case "${prev}" in + + --chooser) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --color) + COMPREPLY=($(compgen -W "auto always never" -- "${cur}")) + return 0 + ;; + --command-color) + COMPREPLY=($(compgen -W "black blue cyan green purple red yellow" -- "${cur}")) + return 0 + ;; + --dump-format) + COMPREPLY=($(compgen -W "just json" -- "${cur}")) + return 0 + ;; + --list-heading) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --list-prefix) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --justfile) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + -f) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --set) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --shell) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --shell-arg) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --working-directory) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + -d) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --command) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + -c) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --completions) + COMPREPLY=($(compgen -W "zsh bash fish powershell elvish" -- "${cur}")) + return 0 + ;; + --show) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + -s) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --dotenv-filename) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + --dotenv-path) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + *) + COMPREPLY=() + ;; + esac + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + return 0 + ;; + + esac +} + +complete -F _just -o bashdefault -o default just diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..91c6456 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.ruff] +line-length = 100 +# keep normal rules and add this extra one. +lint.extend-select = ["E501"] \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..35904de --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +pythonpath = src +addopts = -svv \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5cf7972 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pytest +mpremote +ruff diff --git a/main.py b/src/main.py similarity index 56% rename from main.py rename to src/main.py index 32229e5..a036acd 100644 --- a/main.py +++ b/src/main.py @@ -1,4 +1,4 @@ -from wifi_manager import WifiManager +from wifi_manager.manager import WifiManager import utime # Example of usage @@ -8,7 +8,7 @@ while True: if wm.is_connected(): - print('Connected!') + print("Connected!") else: - print('Disconnected!') + print("Disconnected!") utime.sleep(10) diff --git a/src/wifi_manager/__init__.py b/src/wifi_manager/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/wifi_manager/manager.py b/src/wifi_manager/manager.py new file mode 100644 index 0000000..907affd --- /dev/null +++ b/src/wifi_manager/manager.py @@ -0,0 +1,67 @@ +import network +import time +from wifi_manager.network_utils import read_credentials +from wifi_manager.webserver import WebServer + + +class WifiManager: + def __init__(self, ssid="WifiManager", password="wifimanager", reboot=True, debug=False): + self.wlan_sta = network.WLAN(network.STA_IF) + self.wlan_sta.active(True) + self.wlan_ap = network.WLAN(network.AP_IF) + + if len(ssid) > 32: + raise Exception("The SSID cannot be longer than 32 characters.") + else: + self.ap_ssid = ssid + if len(password) < 8: + raise Exception("The password cannot be less than 8 characters long.") + else: + self.ap_password = password + + self.ap_authmode = 3 + self.wifi_credentials = "wifi.dat" + self.wlan_sta.disconnect() + self.reboot: bool = reboot + self.debug: bool = debug + + def connect(self): + if self.wlan_sta.isconnected(): + return + profiles = read_credentials(self.wifi_credentials, self.debug) + for ssid, *_ in self.wlan_sta.scan(): + ssid = ssid.decode("utf-8") + if ssid in profiles: + password = profiles[ssid] + if self.wifi_connect(ssid, password): + return + print("Could not connect to any WiFi network. Starting the configuration portal...") + self.web_server() + + def disconnect(self): + if self.wlan_sta.isconnected(): + self.wlan_sta.disconnect() + + def is_connected(self): + return self.wlan_sta.isconnected() + + def get_address(self): + return self.wlan_sta.ifconfig() + + def wifi_connect(self, ssid, password): + print("Trying to connect to:", ssid) + self.wlan_sta.connect(ssid, password) + for _ in range(100): + if self.wlan_sta.isconnected(): + print("\nConnected! Network information:", self.wlan_sta.ifconfig()) + return True + else: + print(".", end="") + time.sleep_ms(100) + print("\nConnection failed!") + self.wlan_sta.disconnect() + return False + + def web_server(self): + server = WebServer(self) + server.run() diff --git a/src/wifi_manager/network_utils.py b/src/wifi_manager/network_utils.py new file mode 100644 index 0000000..0a59f55 --- /dev/null +++ b/src/wifi_manager/network_utils.py @@ -0,0 +1,40 @@ +def write_credentials(wifi_credentials, profiles): + lines = [] + for ssid, password in profiles.items(): + lines.append("{0};{1}\n".format(ssid, password)) + with open(wifi_credentials, "w") as file: + file.write("".join(lines)) + + +def read_credentials(wifi_credentials, debug=False): + lines = [] + try: + with open(wifi_credentials) as file: + lines = file.readlines() + except Exception as error: + if debug: + print(error) + profiles = {} + for line in lines: + ssid, password = line.strip().split(";") + profiles[ssid] = password + return profiles + + +def url_decode(data): + if isinstance(data, str): + data = data.encode("utf-8") + result = bytearray() + i = 0 + while i < len(data): + if data[i : i + 1] == b"%": + if i + 2 < len(data) and data[i + 1 : i + 3].isalnum(): + try: + result.append(int(data[i + 1 : i + 3], 16)) + i += 3 + continue + except ValueError: + pass + result.append(data[i]) + i += 1 + return bytes(result) diff --git a/src/wifi_manager/webserver.py b/src/wifi_manager/webserver.py new file mode 100644 index 0000000..38f8fa4 --- /dev/null +++ b/src/wifi_manager/webserver.py @@ -0,0 +1,185 @@ +import re +import socket +import time +import machine +from .network_utils import url_decode, read_credentials, write_credentials + + +class WebServer: + def __init__(self, manager, sleep_fn=time.sleep, reset_fn=machine.reset, debug=False): + self.manager = manager + self.wlan_ap = manager.wlan_ap + self.wlan_sta = manager.wlan_sta + self.ap_ssid = manager.ap_ssid + self.ap_password = manager.ap_password + self.ap_authmode = manager.ap_authmode + self.reboot = manager.reboot + self.debug = debug + self.wifi_credentials = manager.wifi_credentials + self.sleep_fn = sleep_fn # Dependency injection for time.sleep + self.reset_fn = reset_fn # Dependency injection for machine.reset + + def _reboot_device(self): + """Reboot the device after a delay.""" + if self.reboot: + print("The device will reboot in 5 seconds.") + self.sleep_fn(5) + self.reset_fn() + + def _create_server_socket(self): + """Create and configure the server socket.""" + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(("", 80)) + server_socket.listen(1) + return server_socket + + def _parse_request(self, request): + """Parse the HTTP request and extract the URL.""" + try: + url = ( + re.search(b"(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request) + .group(1) + .decode("utf-8") + .rstrip("/") + ) + return url + except Exception as error: + if self.debug: + print(f"Error parsing request: {error}") + return None + + def _handle_client(self, client): + """Handle a single client connection.""" + try: + client.settimeout(5.0) + request = b"" + while True: + chunk = client.recv(128) + if not chunk: + break + request += chunk + if b"\r\n\r\n" in request: + break + + if self.debug: + print(f"Request received: {request}") + + url = self._parse_request(request) + if url == "": + self.handle_root(client) + elif url == "configure": + print(f"##########request {request}") + self.handle_configure(client, request) + else: + self.handle_not_found(client) + except Exception as error: + if self.debug: + print(f"Error handling client: {error}") + finally: + client.close() + + def run(self): + """Start the web server.""" + self.wlan_ap.active(True) + self.wlan_ap.config( + essid=self.ap_ssid, password=self.ap_password, authmode=self.ap_authmode + ) + print( + f"Connect to {self.ap_ssid} with the password {self.ap_password} " + f"and access the captive portal at {self.wlan_ap.ifconfig()[0]}" + ) + + server_socket = self._create_server_socket() + while True: + if self.wlan_sta.isconnected(): + self.wlan_ap.active(False) + self._reboot_device() + return # just for testing + + client, _ = server_socket.accept() + self._handle_client(client) + + def send_header(self, client, status_code=200): + """Send HTTP headers to the client.""" + client.send(f"HTTP/1.1 {status_code} OK\r\n".encode("utf-8")) + client.send("Content-Type: text/html\r\n".encode("utf-8")) + client.send("Connection: close\r\n\r\n".encode("utf-8")) + + def send_response(self, client, payload, status_code=200): + """Send an HTTP response with HTML content.""" + self.send_header(client, status_code) + client.sendall( + f""" + + + + WiFi Manager + + + + + + {payload} + + + """.encode("utf-8") + ) + client.close() + + def handle_root(self, client): + """Handle the root URL.""" + ssid_options = "".join( + f""" +

+

+ """ + for ssid, *_ in self.wlan_sta.scan() + ) + self.send_response( + client, + f""" +

WiFi Manager

+
+ {ssid_options} +

+

+

+
+ """, + ) + + def handle_configure(self, client, request): + """Handle the configure URL.""" + match = re.search(b"ssid=([^&]*)&password=(.*)", url_decode(request)) + if not match: + self.send_response(client, "

Parameters not found!

", 400) + return + + ssid = match.group(1).decode("utf-8") + password = match.group(2).decode("utf-8") + + if not ssid: + self.send_response( + client, "

SSID must be provided!

Go back and try again!

", 400 + ) + elif self.manager.wifi_connect(ssid, password): + self.send_response( + client, + f"

Successfully connected to

{ssid}

IP address: " + f"{self.wlan_sta.ifconfig()[0]}

", + ) + profiles = read_credentials(self.wifi_credentials, self.debug) + profiles[ssid] = password + write_credentials(self.wifi_credentials, profiles) + self._reboot_device() + else: + self.send_response( + client, f"

Could not connect to

{ssid}

Go back and try again!

" + ) + self.sleep_fn(5) + + def handle_not_found(self, client): + """Handle unknown URLs.""" + self.send_response(client, "

Page not found!

", 404) diff --git a/tests/test_manager.py b/tests/test_manager.py new file mode 100644 index 0000000..ef378b8 --- /dev/null +++ b/tests/test_manager.py @@ -0,0 +1,117 @@ +import sys +import types + +if "machine" not in sys.modules: + sys.modules["machine"] = types.ModuleType("machine") + sys.modules["machine"].reset = lambda: None +if "network" not in sys.modules: + sys.modules["network"] = types.ModuleType("network") + sys.modules["network"].WLAN = lambda iface: None + +import pytest +from unittest.mock import patch, Mock +from wifi_manager.manager import WifiManager +from wifi_manager.network_utils import write_credentials + + +@pytest.fixture(autouse=True) +def patch_network(monkeypatch): + import wifi_manager.manager as manager_mod + + # Create mock WLAN class + mock_wlan = Mock() + mock_wlan._connected = False + mock_wlan.isconnected.side_effect = lambda: mock_wlan._connected + mock_wlan.scan.return_value = [(b"ssid1",), (b"ssid2",)] + mock_wlan.ifconfig.return_value = ("192.168.1.2", "255.255.255.0", "192.168.1.1", "8.8.8.8") + mock_wlan.connect.side_effect = lambda ssid, password: setattr( + mock_wlan, "_connected", ssid == "ssid1" and password == "pass1" + ) + mock_wlan.disconnect.side_effect = lambda: setattr(mock_wlan, "_connected", False) + + # Mock the network module + sys.modules["network"] = types.ModuleType("network") + sys.modules["network"].STA_IF = 0 + sys.modules["network"].AP_IF = 1 + sys.modules["network"].WLAN = lambda iface: mock_wlan + + # Patch the network module + monkeypatch.setattr(manager_mod, "network", sys.modules["network"]) + + +def test_wifi_manager_init(): + wm = WifiManager(ssid="TestSSID", password="TestPass123") + assert wm.ap_ssid == "TestSSID" + assert wm.ap_password == "TestPass123" + assert wm.reboot is True + + +def test_wifi_manager_ssid_length(): + with pytest.raises(Exception): + WifiManager(ssid="x" * 33) + + +def test_wifi_manager_password_length(): + with pytest.raises(Exception): + WifiManager(password="short") + + +def test_wifi_manager_connect(tmp_path): + wm = WifiManager(ssid="TestSSID", password="TestPass123") + wm.wifi_credentials = str(tmp_path / "wifi.dat") + + # Save credentials for ssid1 + write_credentials(wm.wifi_credentials, {"ssid1": "pass1"}) + wm.connect() + assert wm.is_connected() + + +@patch("wifi_manager.manager.read_credentials", return_value={}) +def test_wifi_manager_connected_already(mock_read_credentials): + wm = WifiManager(ssid="TestSSID", password="TestPass123") + wm.wlan_sta._connected = True # Simulate already connected + wm.connect() + mock_read_credentials.assert_not_called() + + +@patch("wifi_manager.manager.WebServer") +def test_wifi_manager_connect_not_connected_and_no_credentials(mock_webserver, tmp_path): + mock_instance = Mock() + mock_webserver.return_value = mock_instance + wm = WifiManager(ssid="TestSSID", password="TestPass123") + wm.wifi_credentials = str(tmp_path / "wifi.dat") + + write_credentials(wm.wifi_credentials, {"ssid3": "pass3"}) + wm.connect() + assert not wm.is_connected() + mock_webserver.assert_called_once_with(wm) + mock_instance.run.assert_called_once() + + +def test_wifi_manager_disconnect(): + wm = WifiManager(ssid="TestSSID", password="TestPass123") + wm.wlan_sta._connected = True + wm.disconnect() + assert wm.is_connected() is False + + +@patch("wifi_manager.manager.time") +def test_manager_wifi_connect_100times_failure(mock_time): + wm = WifiManager(ssid="TestSSID", password="TestPass123") + assert wm.is_connected() is False + wm.wlan_sta.isconnected.side_effect = [False] * 100 + ret = wm.wifi_connect("TestSSID", "TestPass123") + assert ret is False + mock_time.sleep_ms.assert_called_with(100) + assert mock_time.sleep_ms.call_count == 100 + + +@patch("wifi_manager.manager.time") +def test_manager_wifi_connect_99times_failure_then_ok(mock_time): + wm = WifiManager(ssid="TestSSID", password="TestPass123") + assert wm.is_connected() is False + wm.wlan_sta.isconnected.side_effect = [False] * 99 + [True] + ret = wm.wifi_connect("TestSSID", "TestPass123") + assert ret is True + mock_time.sleep_ms.assert_called_with(100) + assert mock_time.sleep_ms.call_count == 99 diff --git a/tests/test_network_utils.py b/tests/test_network_utils.py new file mode 100644 index 0000000..8c4d3d0 --- /dev/null +++ b/tests/test_network_utils.py @@ -0,0 +1,46 @@ +from wifi_manager.network_utils import write_credentials, read_credentials, url_decode + + +def test_write_and_read_credentials(tmp_path): + file_path = tmp_path / "wifi.dat" + profiles = {"ssid1": "pass1", "ssid2": "pass2"} + write_credentials(str(file_path), profiles) + loaded = read_credentials(str(file_path)) + assert loaded == profiles + + +def test_write_and_read_credentials_exception(tmp_path): + file_path = tmp_path / "wifi.dat" + loaded = read_credentials(str(file_path), debug=True) + assert loaded == {} + + +def test_write_and_read_empty_credentials(tmp_path): + file_path = tmp_path / "wifi.dat" + profiles = {} + write_credentials(str(file_path), profiles) + loaded = read_credentials(str(file_path)) + assert loaded == profiles + + +def test_url_decode_basic(): + assert url_decode("abc%20def") == b"abc def" + assert url_decode(b"abc%20def") == b"abc def" + assert url_decode("") == b"" + + +def test_url_decode_non_encoded(): + assert url_decode("plainstring") == b"plainstring" + assert url_decode(b"plainstring") == b"plainstring" + + +def test_url_decode_partial_percent(): + # Should not raise, just return as-is + assert url_decode("abc%2") == b"abc%2" + assert url_decode(b"abc%2") == b"abc%2" + + +def test_url_decode_invalid_percent(): + # Should not raise, just return as-is + assert url_decode("abc%zz") == b"abc%zz" + assert url_decode(b"abc%zz") == b"abc%zz" diff --git a/tests/test_webserver.py b/tests/test_webserver.py new file mode 100644 index 0000000..0694ef0 --- /dev/null +++ b/tests/test_webserver.py @@ -0,0 +1,257 @@ +import sys +import types + +if "machine" not in sys.modules: + sys.modules["machine"] = types.ModuleType("machine") + sys.modules["machine"].reset = lambda: None + +import pytest +from unittest.mock import patch, Mock +from wifi_manager.webserver import WebServer + + +@pytest.fixture +def mock_manager(): + mock = Mock() + mock.wlan_ap.ifconfig.return_value = ["192.168.4.1"] + mock.wlan_sta.isconnected.return_value = False + mock.wlan_sta.scan.return_value = [(b"TestSSID",)] + mock.ap_ssid = "MyAP" + mock.ap_password = "password123" + mock.ap_authmode = 3 + mock.reboot = False + mock.wifi_credentials = "wifi.dat" + mock.wifi_connect.return_value = True + return mock + + +def test_reboot_device_true(mock_manager): + mock_sleep = Mock() + mock_reset = Mock() + server = WebServer(mock_manager, sleep_fn=mock_sleep, reset_fn=mock_reset) + server.reboot = True + server._reboot_device() + mock_sleep.assert_called_once_with(5) + + +def test_reboot_device_false(mock_manager): + mock_sleep = Mock() + mock_reset = Mock() + server = WebServer(mock_manager, sleep_fn=mock_sleep, reset_fn=mock_reset) + + server.reboot = False + server._reboot_device() + mock_sleep.assert_not_called() + + +def test_run_no_connection_then_ok(mock_manager): + server = WebServer(mock_manager) + mock_socket = Mock() + server._create_server_socket = Mock(return_value=mock_socket) + server._handle_client = Mock() + client = Mock() + mock_socket.accept.return_value = (client, None) + mock_manager.wlan_sta.isconnected.side_effect = [False, True] + + server.run() + mock_socket.accept.assert_called_once() + server._handle_client.assert_called_once_with(client) + + +def test_parse_request_valid(mock_manager): + server = WebServer(mock_manager) + request = b"GET /configure HTTP/1.1\r\n\r\n" + url = server._parse_request(request) + assert url == "configure" + + +def test_parse_request_invalid(mock_manager, capsys): + server = WebServer(mock_manager, debug=True) + request = b"BAD REQUEST" + url = server._parse_request(request) + captured = capsys.readouterr() + assert url is None + assert "Error parsing request" in captured.out + + +def test_send_header(mock_manager): + server = WebServer(mock_manager) + client = Mock() + server.send_header(client) + assert client.send.call_count == 3 + + +def test_send_response(mock_manager): + server = WebServer(mock_manager) + client = Mock() + server.send_response(client, "

Hello

") + client.sendall.assert_called() + client.close.assert_called() + + +def test_handle_root(mock_manager): + server = WebServer(mock_manager) + client = Mock() + server.handle_root(client) + client.sendall.assert_called() + + +@patch("wifi_manager.webserver.write_credentials") +def test_handle_configure_success(mock_write, mock_manager): + server = WebServer(mock_manager, sleep_fn=lambda x: None, reset_fn=lambda: None) + client = Mock() + mock_manager.wlan_sta.ifconfig.return_value = ["192.168.4.1"] + test_request = ( + b"POST /configure HTTP/1.1\r\n" + b"Host: 192.168.4.1\r\nConnection: keep-alive\r\n" + b"Content-Length: 26\r\nCache-Control: max-age=0\r\n" + b"Origin: http://192.168.4.1\r\nContent-Type: application/x-www-form-urlencoded\r\n" + b"Upgrade-Insecure-Requests: 1\r\n" + b"User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + b"(KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36\r\n" + b"Accept: text/html,application/xhtml+xml,application/xml;q=0.9," + b"image/avif,image/webp,image/apng,*/*;q=0.8\r\nSec-GPC: 1\r\n" + b"Referer: http://192.168.4.1/\r\nAccept-Encoding: gzip, deflate\r\n" + b"Accept-Language: en-GB,en-US;q=0.9,en;q=0.8,ko;q=0.7\r\n\r\nssid=Kimmies&password=1234" + ) + server.handle_configure(client, test_request) + client.sendall.assert_called() + mock_write.assert_called() + + +@patch("wifi_manager.webserver.socket") +def test_create_server_socket(mock_socket, mock_manager): + mock_socket.socket.return_value = mock_socket + mock_socket.bind = Mock() + mock_socket.listen = Mock() + mock_socket.setsockopt = Mock() + + server = WebServer(mock_manager) + server._create_server_socket() + + assert mock_socket.socket.call_count == 1 + assert mock_socket.bind.call_count == 1 + assert mock_socket.listen.call_count == 1 + assert mock_socket.setsockopt.call_count == 1 + assert mock_socket.bind.call_args[0][0] == ("", 80) + assert mock_socket.listen.call_args[0][0] == 1 + + +def test_handle_client_root(mock_manager): + """Test handling a client request for the root URL.""" + mock_client = Mock() + mock_client.recv.side_effect = [ + b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n", # Simulate HTTP GET request + ] + server = WebServer(mock_manager, debug=True) + + with patch.object(server, "handle_root") as mock_handle_root: + server._handle_client(mock_client) + + # Verify the root handler was called + mock_handle_root.assert_called_once_with(mock_client) + + # Verify the client connection was closed + mock_client.close.assert_called_once() + + +def test_handle_client_connection_closed(mock_manager): + server = WebServer(mock_manager) + mock_client = Mock() + # Empty chunk simulates closed connection + mock_client.recv.side_effect = [b"GET /", b"", b"more data"] + + server._handle_client(mock_client) + + # Should stop after receiving empty chunk + assert mock_client.recv.call_count == 2 + # from handle_not_found + mock_client.sendall.assert_called_once() + mock_client.close.assert_called() + assert mock_client.close.call_count == 2 + + +def test_handle_client_configure(mock_manager): + """Test handling a client request for the configure URL.""" + mock_client = Mock() + mock_client.recv.side_effect = [ + b"POST /configure HTTP/1.1\r\nHost: localhost\r\n\r\nssid=TestSSID&password=TestPass123", + b"", + ] + server = WebServer(mock_manager, debug=True) + + with patch.object(server, "handle_configure") as mock_handle_configure: + server._handle_client(mock_client) + + # Verify the configure handler was called + mock_handle_configure.assert_called_once_with( + mock_client, + b"POST /configure HTTP/1.1\r\nHost: localhost\r\n\r\n" + b"ssid=TestSSID&password=TestPass123", + ) + + # Verify the client connection was closed + mock_client.close.assert_called_once() + + +def test_handle_client_not_found(mock_manager): + """Test handling a client request for an unknown URL.""" + mock_client = Mock() + mock_client.recv.side_effect = [ + b"GET /unknown HTTP/1.1\r\nHost: localhost\r\n\r\n", + b"", + ] + server = WebServer(mock_manager, debug=True) + + with patch.object(server, "handle_not_found") as mock_handle_not_found: + server._handle_client(mock_client) + + # Verify the not found handler was called + mock_handle_not_found.assert_called_once_with(mock_client) + + # Verify the client connection was closed + mock_client.close.assert_called_once() + + +def test_handle_client_timeout(mock_manager): + """Test handling a client request with a timeout.""" + mock_client = Mock() + mock_client.recv.side_effect = TimeoutError # Simulate a timeout + server = WebServer(mock_manager, debug=True) + + server._handle_client(mock_client) + + # Verify the client connection was closed even on timeout + mock_client.close.assert_called_once() + + +def test_handle_configure_missing_ssid(mock_manager): + """Test handle_configure when SSID is empty""" + server = WebServer(mock_manager) + mock_client = Mock() + + # Test with empty SSID + with patch("wifi_manager.webserver.url_decode", return_value=b"ssid=&password=test123"): + server.handle_configure(mock_client, b"") + mock_client.sendall.assert_called() + # from send_header + assert mock_client.send.call_count == 3 + assert b"HTTP/1.1 400" in mock_client.send.call_args_list[0][0][0] + # Verify error message was sent + assert b"SSID must be provided!" in mock_client.sendall.call_args[0][0] + + +def test_handle_configure_missing_parameters(mock_manager): + """Test handle_configure when parameters are missing from the request""" + server = WebServer(mock_manager) + mock_client = Mock() + + # Test with empty request + with patch("wifi_manager.webserver.url_decode", return_value=b""): + server.handle_configure(mock_client, b"") + mock_client.sendall.assert_called() + # from send_header + assert mock_client.send.call_count == 3 + assert b"HTTP/1.1 400" in mock_client.send.call_args_list[0][0][0] + # Verify error message was sent + assert b"Parameters not found!" in mock_client.sendall.call_args[0][0] diff --git a/wifi_manager.py b/wifi_manager.py deleted file mode 100644 index 6eb63af..0000000 --- a/wifi_manager.py +++ /dev/null @@ -1,295 +0,0 @@ -# Author: Igor Ferreira -# License: MIT -# Version: 2.1.0 -# Description: WiFi Manager for ESP8266 and ESP32 using MicroPython. - -import machine -import network -import socket -import re -import time - - -class WifiManager: - - def __init__(self, ssid = 'WifiManager', password = 'wifimanager', reboot = True, debug = False): - self.wlan_sta = network.WLAN(network.STA_IF) - self.wlan_sta.active(True) - self.wlan_ap = network.WLAN(network.AP_IF) - - # Avoids simple mistakes with wifi ssid and password lengths, but doesn't check for forbidden or unsupported characters. - if len(ssid) > 32: - raise Exception('The SSID cannot be longer than 32 characters.') - else: - self.ap_ssid = ssid - if len(password) < 8: - raise Exception('The password cannot be less than 8 characters long.') - else: - self.ap_password = password - - # Set the access point authentication mode to WPA2-PSK. - self.ap_authmode = 3 - - # The file were the credentials will be stored. - # There is no encryption, it's just a plain text archive. Be aware of this security problem! - self.wifi_credentials = 'wifi.dat' - - # Prevents the device from automatically trying to connect to the last saved network without first going through the steps defined in the code. - self.wlan_sta.disconnect() - - # Change to True if you want the device to reboot after configuration. - # Useful if you're having problems with web server applications after WiFi configuration. - self.reboot = reboot - - self.debug = debug - - - def connect(self): - if self.wlan_sta.isconnected(): - return - profiles = self.read_credentials() - for ssid, *_ in self.wlan_sta.scan(): - ssid = ssid.decode("utf-8") - if ssid in profiles: - password = profiles[ssid] - if self.wifi_connect(ssid, password): - return - print('Could not connect to any WiFi network. Starting the configuration portal...') - self.web_server() - - - def disconnect(self): - if self.wlan_sta.isconnected(): - self.wlan_sta.disconnect() - - - def is_connected(self): - return self.wlan_sta.isconnected() - - - def get_address(self): - return self.wlan_sta.ifconfig() - - - def write_credentials(self, profiles): - lines = [] - for ssid, password in profiles.items(): - lines.append('{0};{1}\n'.format(ssid, password)) - with open(self.wifi_credentials, 'w') as file: - file.write(''.join(lines)) - - - def read_credentials(self): - lines = [] - try: - with open(self.wifi_credentials) as file: - lines = file.readlines() - except Exception as error: - if self.debug: - print(error) - pass - profiles = {} - for line in lines: - ssid, password = line.strip().split(';') - profiles[ssid] = password - return profiles - - - def wifi_connect(self, ssid, password): - print('Trying to connect to:', ssid) - self.wlan_sta.connect(ssid, password) - for _ in range(100): - if self.wlan_sta.isconnected(): - print('\nConnected! Network information:', self.wlan_sta.ifconfig()) - return True - else: - print('.', end='') - time.sleep_ms(100) - print('\nConnection failed!') - self.wlan_sta.disconnect() - return False - - - def web_server(self): - self.wlan_ap.active(True) - self.wlan_ap.config(essid = self.ap_ssid, password = self.ap_password, authmode = self.ap_authmode) - server_socket = socket.socket() - server_socket.close() - server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('', 80)) - server_socket.listen(1) - print('Connect to', self.ap_ssid, 'with the password', self.ap_password, 'and access the captive portal at', self.wlan_ap.ifconfig()[0]) - while True: - if self.wlan_sta.isconnected(): - self.wlan_ap.active(False) - if self.reboot: - print('The device will reboot in 5 seconds.') - time.sleep(5) - machine.reset() - self.client, addr = server_socket.accept() - try: - self.client.settimeout(5.0) - self.request = b'' - try: - while True: - if '\r\n\r\n' in self.request: - # Fix for Safari browser - self.request += self.client.recv(512) - break - self.request += self.client.recv(128) - except Exception as error: - # It's normal to receive timeout errors in this stage, we can safely ignore them. - if self.debug: - print(error) - pass - if self.request: - if self.debug: - print(self.url_decode(self.request)) - url = re.search('(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP', self.request).group(1).decode('utf-8').rstrip('/') - if url == '': - self.handle_root() - elif url == 'configure': - self.handle_configure() - else: - self.handle_not_found() - except Exception as error: - if self.debug: - print(error) - return - finally: - self.client.close() - - - def send_header(self, status_code = 200): - self.client.send("""HTTP/1.1 {0} OK\r\n""".format(status_code)) - self.client.send("""Content-Type: text/html\r\n""") - self.client.send("""Connection: close\r\n""") - - - def send_response(self, payload, status_code = 200): - self.send_header(status_code) - self.client.sendall(""" - - - - WiFi Manager - - - - - - {0} - - - """.format(payload)) - self.client.close() - - - def handle_root(self): - self.send_header() - self.client.sendall(""" - - - - WiFi Manager - - - - - -

WiFi Manager

-
- """.format(self.ap_ssid)) - for ssid, *_ in self.wlan_sta.scan(): - ssid = ssid.decode("utf-8") - self.client.sendall(""" -

- """.format(ssid)) - self.client.sendall(""" -

-

-
- - - """) - self.client.close() - - - def handle_configure(self): - match = re.search('ssid=([^&]*)&password=(.*)', self.url_decode(self.request)) - if match: - ssid = match.group(1).decode('utf-8') - password = match.group(2).decode('utf-8') - if len(ssid) == 0: - self.send_response(""" -

SSID must be providaded!

-

Go back and try again!

- """, 400) - elif self.wifi_connect(ssid, password): - self.send_response(""" -

Successfully connected to

-

{0}

-

IP address: {1}

- """.format(ssid, self.wlan_sta.ifconfig()[0])) - profiles = self.read_credentials() - profiles[ssid] = password - self.write_credentials(profiles) - time.sleep(5) - else: - self.send_response(""" -

Could not connect to

-

{0}

-

Go back and try again!

- """.format(ssid)) - time.sleep(5) - else: - self.send_response(""" -

Parameters not found!

- """, 400) - time.sleep(5) - - - def handle_not_found(self): - self.send_response(""" -

Page not found!

- """, 404) - - - def url_decode(self, url_string): - - # Source: https://forum.micropython.org/viewtopic.php?t=3076 - # unquote('abc%20def') -> b'abc def' - # Note: strings are encoded as UTF-8. This is only an issue if it contains - # unescaped non-ASCII characters, which URIs should not. - - if not url_string: - return b'' - - if isinstance(url_string, str): - url_string = url_string.encode('utf-8') - - bits = url_string.split(b'%') - - if len(bits) == 1: - return url_string - - res = [bits[0]] - appnd = res.append - hextobyte_cache = {} - - for item in bits[1:]: - try: - code = item[:2] - char = hextobyte_cache.get(code) - if char is None: - char = hextobyte_cache[code] = bytes([int(code, 16)]) - appnd(char) - appnd(item[2:]) - except Exception as error: - if self.debug: - print(error) - appnd(b'%') - appnd(item) - - return b''.join(res)