diff --git a/adafruit_wsgi/esp32spi_wsgiserver.py b/adafruit_wsgi/esp32spi_wsgiserver.py
new file mode 100644
index 0000000..d43a27a
--- /dev/null
+++ b/adafruit_wsgi/esp32spi_wsgiserver.py
@@ -0,0 +1,255 @@
+# SPDX-FileCopyrightText: Copyright (c) 2019 Matt Costi for Adafruit Industries
+# SPDX-License-Identifier: MIT
+A simple WSGI (Web Server Gateway Interface) server that interfaces with the ESP32 over SPI.
+Opens a specified port on the ESP32 to listen for incoming HTTP Requests and
+Accepts an Application object that must be callable, which gets called
+whenever a new HTTP Request has been received.
+The Application MUST accept 2 ordered parameters:
+ 1. environ object (incoming request data)
+ 2. start_response function. Must be called before the Application
+ callable returns, in order to set the response status and headers.
+The Application MUST return a single string in a list,
+which is the response data
+Requires update_poll being called in the applications main event loop.
+For more details about Python WSGI see:
+* Author(s): Matt Costi
+# pylint: disable=no-name-in-module, protected-access
+import io
+import gc
+import time
+from micropython import const
+import adafruit_esp32spi.adafruit_esp32spi_socket as socket
+_the_interface = None # pylint: disable=invalid-name
+def set_interface(iface):
+ """Helper to set the global internet interface"""
+ global _the_interface # pylint: disable=global-statement, invalid-name
+ _the_interface = iface
+ socket.set_interface(iface)
+NO_SOCK_AVAIL = const(255)
+def parse_headers(client):
+ """
+ Parses the header portion of an HTTP request from the socket.
+ Expects first line of HTTP request to have been read already.
+ """
+ headers = {}
+ while True:
+ # line = str(client.readline(), "utf-8")
+ line = str(socket_readline(client), "utf-8")
+ if not line:
+ break
+ title, content = line.split(":", 1)
+ headers[title.strip().lower()] = content.strip()
+ return headers
+def socket_readline(_socket, eol=b"\r\n"):
+ """Attempt to return as many bytes as we can up to but not including
+ end-of-line character (default is '\\r\\n')"""
+ # print("Socket readline")
+ stamp = time.monotonic()
+ while eol not in _socket._buffer:
+ # there's no line already in there, read some more
+ avail = _socket._available()
+ if avail:
+ _socket._buffer += _the_interface.socket_read(_socket._socknum, avail)
+ elif _socket._timeout > 0 and time.monotonic() - stamp > _socket._timeout:
+ _socket.close() # Make sure to close socket so that we don't exhaust sockets.
+ raise OSError("Didn't receive full response, failing out")
+ firstline, _socket._buffer = _socket._buffer.split(eol, 1)
+ gc.collect()
+ return firstline
+# pylint: disable=invalid-name
+class WSGIServer:
+ """
+ A simple server that implements the WSGI interface
+ """
+ def __init__(self, port=80, debug=False, application=None):
+ self.application = application
+ self.port = port
+ self._server_sock = socket.socket(socknum=NO_SOCK_AVAIL)
+ self._client_sock = socket.socket(socknum=NO_SOCK_AVAIL)
+ self._debug = debug
+ self._response_status = None
+ self._response_headers = []
+ def start(self):
+ """
+ starts the server and begins listening for incoming connections.
+ Call update_poll in the main loop for the application callable to be
+ invoked on receiving an incoming request.
+ """
+ self._server_sock = socket.socket()
+ _the_interface.start_server(self.port, self._server_sock._socknum)
+ if self._debug:
+ ip = _the_interface.pretty_ip(_the_interface.ip_address)
+ print("Server available at {0}:{1}".format(ip, self.port))
+ print(
+ "Server status: ",
+ _the_interface.server_state(self._server_sock._socknum),
+ )
+ def update_poll(self):
+ """
+ Call this method inside your main event loop to get the server
+ check for new incoming client requests. When a request comes in,
+ the application callable will be invoked.
+ """
+ self.client_available()
+ if self._client_sock and self._client_sock._available():
+ environ = self._get_environ(self._client_sock)
+ result = self.application(environ, self._start_response)
+ self.finish_response(result)
+ def finish_response(self, result):
+ """
+ Called after the application callbile returns result data to respond with.
+ Creates the HTTP Response payload from the response_headers and results data,
+ and sends it back to client.
+ :param string result: the data string to send back in the response to the client.
+ """
+ try:
+ response = "HTTP/1.1 {0}\r\n".format(self._response_status or "500 ISE")
+ for header in self._response_headers:
+ response += "{0}: {1}\r\n".format(*header)
+ response += "\r\n"
+ self._client_sock.send(response.encode("utf-8"))
+ if isinstance(result, bytes): # send whole response if possible (see #174)
+ self._client_sock.send(result)
+ elif isinstance(result, str):
+ self._client_sock.send(result.encode("utf-8"))
+ else: # fall back to sending byte-by-byte
+ for data in result:
+ if isinstance(data, bytes):
+ self._client_sock.send(data)
+ else:
+ self._client_sock.send(data.encode("utf-8"))
+ gc.collect()
+ finally:
+ if self._debug > 2:
+ print("closing")
+ self._client_sock.close()
+ def client_available(self):
+ """
+ returns a client socket connection if available.
+ Otherwise, returns None
+ :return: the client
+ :rtype: Socket
+ """
+ sock = None
+ if self._server_sock._socknum != NO_SOCK_AVAIL:
+ if self._client_sock._socknum != NO_SOCK_AVAIL:
+ # check previous received client socket
+ if self._debug > 2:
+ print("checking if last client sock still valid")
+ if self._client_sock._connected() and self._client_sock._available():
+ sock = self._client_sock
+ if not sock:
+ # check for new client sock
+ if self._debug > 2:
+ print("checking for new client sock")
+ client_sock_num = _the_interface.socket_available(
+ self._server_sock._socknum
+ )
+ sock = socket.socket(socknum=client_sock_num)
+ else:
+ print("Server has not been started, cannot check for clients!")
+ if sock and sock._socknum != NO_SOCK_AVAIL:
+ if self._debug > 2:
+ print("client sock num is: ", sock._socknum)
+ self._client_sock = sock
+ return self._client_sock
+ return None
+ def _start_response(self, status, response_headers):
+ """
+ The application callable will be given this method as the second param
+ This is to be called before the application callable returns, to signify
+ the response can be started with the given status and headers.
+ :param string status: a status string including the code and reason. ex: "200 OK"
+ :param list response_headers: a list of tuples to represent the headers.
+ ex ("header-name", "header value")
+ """
+ self._response_status = status
+ self._response_headers = [
+ ("Server", "esp32WSGIServer"),
+ ("Connection", "close"),
+ ] + response_headers
+ def _get_environ(self, client):
+ """
+ The application callable will be given the resulting environ dictionary.
+ It contains metadata about the incoming request and the request body ("wsgi.input")
+ :param Socket client: socket to read the request from
+ """
+ env = {}
+ # line = str(client.readline(), "utf-8")
+ line = str(socket_readline(client), "utf-8")
+ (method, path, ver) = line.rstrip("\r\n").split(None, 2)
+ env["wsgi.version"] = (1, 0)
+ env["wsgi.url_scheme"] = "http"
+ env["wsgi.multithread"] = False
+ env["wsgi.multiprocess"] = False
+ env["wsgi.run_once"] = False
+ env["REQUEST_METHOD"] = method
+ env["SCRIPT_NAME"] = ""
+ env["SERVER_NAME"] = _the_interface.pretty_ip(_the_interface.ip_address)
+ env["SERVER_PROTOCOL"] = ver
+ env["SERVER_PORT"] = self.port
+ if path.find("?") >= 0:
+ env["PATH_INFO"] = path.split("?")[0]
+ env["QUERY_STRING"] = path.split("?")[1]
+ else:
+ env["PATH_INFO"] = path
+ headers = parse_headers(client)
+ if "content-type" in headers:
+ env["CONTENT_TYPE"] = headers.get("content-type")
+ if "content-length" in headers:
+ env["CONTENT_LENGTH"] = headers.get("content-length")
+ body = client.recv(int(env["CONTENT_LENGTH"]))
+ env["wsgi.input"] = io.StringIO(body)
+ else:
+ body = client.recv(0)
+ env["wsgi.input"] = io.StringIO(body)
+ for name, value in headers.items():
+ key = "HTTP_" + name.replace("-", "_").upper()
+ if key in env:
+ value = "{0},{1}".format(env[key], value)
+ env[key] = value
+ return env
diff --git a/examples/static/index.html b/examples/static/index.html
new file mode 100755
index 0000000..df08ec7
--- /dev/null
+++ b/examples/static/index.html
@@ -0,0 +1,16 @@
+ LED color picker demo!
diff --git a/examples/static/led_color_picker_example.js b/examples/static/led_color_picker_example.js
new file mode 100755
index 0000000..810ca44
--- /dev/null
+++ b/examples/static/led_color_picker_example.js
@@ -0,0 +1,129 @@
+// SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries
+// SPDX-License-Identifier: MIT
+let canvas = document.getElementById('colorPicker');
+let ctx = canvas.getContext("2d");
+ctx.width = 300;
+ctx.height = 300;
+function drawColorPicker() {
+ /**
+ * Color picker inspired by:
+ * https://medium.com/@bantic/hand-coding-a-color-wheel-with-canvas-78256c9d7d43
+ */
+ let radius = 150;
+ let image = ctx.createImageData(2*radius, 2*radius);
+ let data = image.data;
+ for (let x = -radius; x < radius; x++) {
+ for (let y = -radius; y < radius; y++) {
+ let [r, phi] = xy2polar(x, y);
+ if (r > radius) {
+ // skip all (x,y) coordinates that are outside of the circle
+ continue;
+ }
+ let deg = rad2deg(phi);
+ // Figure out the starting index of this pixel in the image data array.
+ let rowLength = 2*radius;
+ let adjustedX = x + radius; // convert x from [-50, 50] to [0, 100] (the coordinates of the image data array)
+ let adjustedY = y + radius; // convert y from [-50, 50] to [0, 100] (the coordinates of the image data array)
+ let pixelWidth = 4; // each pixel requires 4 slots in the data array
+ let index = (adjustedX + (adjustedY * rowLength)) * pixelWidth;
+ let hue = deg;
+ let saturation = r / radius;
+ let value = 1.0;
+ let [red, green, blue] = hsv2rgb(hue, saturation, value);
+ let alpha = 255;
+ data[index] = red;
+ data[index+1] = green;
+ data[index+2] = blue;
+ data[index+3] = alpha;
+ }
+ }
+ ctx.putImageData(image, 0, 0);
+function xy2polar(x, y) {
+ let r = Math.sqrt(x*x + y*y);
+ let phi = Math.atan2(y, x);
+ return [r, phi];
+// rad in [-π, π] range
+// return degree in [0, 360] range
+function rad2deg(rad) {
+ return ((rad + Math.PI) / (2 * Math.PI)) * 360;
+ // hue in range [0, 360]
+ // saturation, value in range [0,1]
+ // return [r,g,b] each in range [0,255]
+ // See: https://en.wikipedia.org/wiki/HSL_and_HSV#From_HSV
+function hsv2rgb(hue, saturation, value) {
+ let chroma = value * saturation;
+ let hue1 = hue / 60;
+ let x = chroma * (1- Math.abs((hue1 % 2) - 1));
+ let r1, g1, b1;
+ if (hue1 >= 0 && hue1 <= 1) {
+ ([r1, g1, b1] = [chroma, x, 0]);
+ } else if (hue1 >= 1 && hue1 <= 2) {
+ ([r1, g1, b1] = [x, chroma, 0]);
+ } else if (hue1 >= 2 && hue1 <= 3) {
+ ([r1, g1, b1] = [0, chroma, x]);
+ } else if (hue1 >= 3 && hue1 <= 4) {
+ ([r1, g1, b1] = [0, x, chroma]);
+ } else if (hue1 >= 4 && hue1 <= 5) {
+ ([r1, g1, b1] = [x, 0, chroma]);
+ } else if (hue1 >= 5 && hue1 <= 6) {
+ ([r1, g1, b1] = [chroma, 0, x]);
+ }
+ let m = value - chroma;
+ let [r,g,b] = [r1+m, g1+m, b1+m];
+ // Change r,g,b values from [0,1] to [0,255]
+ return [255*r,255*g,255*b];
+function onColorPick(event) {
+ coords = getCursorPosition(canvas, event)
+ imageData = ctx.getImageData(coords[0],coords[1],1,1)
+ rgbObject = {
+ r: imageData.data[0],
+ g: imageData.data[1],
+ b: imageData.data[2]
+ }
+ console.log(`r: ${rgbObject.r} g: ${rgbObject.g} b: ${rgbObject.b}`);
+ data = JSON.stringify(rgbObject);
+ window.fetch("/ajax/ledcolor", {
+ method: "POST",
+ body: data,
+ headers: {
+ 'Content-Type': 'application/json; charset=utf-8',
+ },
+ }).then(response => {
+ console.log("sucess!: " + response)
+ }, error => {
+ console.log("error!: " + error)
+ })
+function getCursorPosition(canvas, event) {
+ const rect = canvas.getBoundingClientRect()
+ const x = event.clientX - rect.left
+ const y = event.clientY - rect.top
+ console.log("x: " + x + " y: " + y)
+ return [x,y]
+canvas.addEventListener('mousedown', onColorPick);
diff --git a/examples/wsgi_simpletest.py b/examples/wsgi_simpletest.py
index 9047552..738b206 100644
--- a/examples/wsgi_simpletest.py
+++ b/examples/wsgi_simpletest.py
@@ -8,7 +8,7 @@
from adafruit_esp32spi import adafruit_esp32spi
import adafruit_esp32spi.adafruit_esp32spi_wifimanager as wifimanager
-import adafruit_esp32spi.adafruit_esp32spi_wsgiserver as server
+import adafruit_wsgi.esp32spi_wsgiserver as server
from adafruit_wsgi.wsgi_app import WSGIApp
# Get wifi details and more from a secrets.py file
diff --git a/examples/wsgi_static_files_server.py b/examples/wsgi_static_files_server.py
new file mode 100755
index 0000000..eacd70b
--- /dev/null
+++ b/examples/wsgi_static_files_server.py
@@ -0,0 +1,246 @@
+# SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries
+# SPDX-License-Identifier: MIT
+import os
+import board
+import busio
+from digitalio import DigitalInOut
+import neopixel
+from adafruit_esp32spi import adafruit_esp32spi
+import adafruit_esp32spi.adafruit_esp32spi_wifimanager as wifimanager
+import adafruit_wsgi.esp32spi_wsgiserver as server
+# This example depends on the 'static' folder in the examples folder
+# being copied to the root of the circuitpython filesystem.
+# This is where our static assets like html, js, and css live.
+# Get wifi details and more from a secrets.py file
+ from secrets import secrets
+except ImportError:
+ print("WiFi secrets are kept in secrets.py, please add them there!")
+ raise
+ import json as json_module
+except ImportError:
+ import ujson as json_module
+print("ESP32 SPI simple web server test!")
+# If you are using a board with pre-defined ESP32 Pins:
+esp32_cs = DigitalInOut(board.ESP_CS)
+esp32_ready = DigitalInOut(board.ESP_BUSY)
+esp32_reset = DigitalInOut(board.ESP_RESET)
+# If you have an externally connected ESP32:
+# esp32_cs = DigitalInOut(board.D9)
+# esp32_ready = DigitalInOut(board.D10)
+# esp32_reset = DigitalInOut(board.D5)
+spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
+esp = adafruit_esp32spi.ESP_SPIcontrol(
+ spi, esp32_cs, esp32_ready, esp32_reset
+) # pylint: disable=line-too-long
+print("MAC addr:", [hex(i) for i in esp.MAC_address])
+print("MAC addr actual:", [hex(i) for i in esp.MAC_address_actual])
+# Use below for Most Boards
+status_light = neopixel.NeoPixel(
+ board.NEOPIXEL, 1, brightness=0.2
+) # Uncomment for Most Boards
+# Uncomment below for ItsyBitsy M4
+# import adafruit_dotstar as dotstar
+# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=1)
+## If you want to connect to wifi with secrets:
+wifi = wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
+## If you want to create a WIFI hotspot to connect to with secrets:
+# secrets = {"ssid": "My ESP32 AP!", "password": "supersecret"}
+# wifi = wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
+# wifi.create_ap()
+## To you want to create an un-protected WIFI hotspot to connect to with secrets:"
+# secrets = {"ssid": "My ESP32 AP!"}
+# wifi = wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
+# wifi.create_ap()
+class SimpleWSGIApplication:
+ """
+ An example of a simple WSGI Application that supports
+ basic route handling and static asset file serving for common file types
+ """
+ INDEX = "/index.html"
+ CHUNK_SIZE = 8912 # max number of bytes to read at once when reading files
+ def __init__(self, static_dir=None, debug=False):
+ self._debug = debug
+ self._listeners = {}
+ self._start_response = None
+ self._static = static_dir
+ if self._static:
+ self._static_files = ["/" + file for file in os.listdir(self._static)]
+ def __call__(self, environ, start_response):
+ """
+ Called whenever the server gets a request.
+ The environ dict has details about the request per wsgi specification.
+ Call start_response with the response status string and headers as a list of tuples.
+ Return a single item list with the item being your response data string.
+ """
+ if self._debug:
+ self._log_environ(environ)
+ self._start_response = start_response
+ status = ""
+ headers = []
+ resp_data = []
+ key = self._get_listener_key(
+ environ["REQUEST_METHOD"].lower(), environ["PATH_INFO"]
+ )
+ if key in self._listeners:
+ status, headers, resp_data = self._listeners[key](environ)
+ if environ["REQUEST_METHOD"].lower() == "get" and self._static:
+ path = environ["PATH_INFO"]
+ if path in self._static_files:
+ status, headers, resp_data = self.serve_file(
+ path, directory=self._static
+ )
+ elif path == "/" and self.INDEX in self._static_files:
+ status, headers, resp_data = self.serve_file(
+ self.INDEX, directory=self._static
+ )
+ self._start_response(status, headers)
+ return resp_data
+ def on(self, method, path, request_handler):
+ """
+ Register a Request Handler for a particular HTTP method and path.
+ request_handler will be called whenever a matching HTTP request is received.
+ request_handler should accept the following args:
+ (Dict environ)
+ request_handler should return a tuple in the shape of:
+ (status, header_list, data_iterable)
+ :param str method: the method of the HTTP request
+ :param str path: the path of the HTTP request
+ :param func request_handler: the function to call
+ """
+ self._listeners[self._get_listener_key(method, path)] = request_handler
+ def serve_file(self, file_path, directory=None):
+ status = "200 OK"
+ headers = [("Content-Type", self._get_content_type(file_path))]
+ full_path = file_path if not directory else directory + file_path
+ def resp_iter():
+ with open(full_path, "rb") as file:
+ while True:
+ chunk = file.read(self.CHUNK_SIZE)
+ if chunk:
+ yield chunk
+ else:
+ break
+ return (status, headers, resp_iter())
+ def _log_environ(self, environ): # pylint: disable=no-self-use
+ print("environ map:")
+ for name, value in environ.items():
+ print(name, value)
+ def _get_listener_key(self, method, path): # pylint: disable=no-self-use
+ return "{0}|{1}".format(method.lower(), path)
+ def _get_content_type(self, file): # pylint: disable=no-self-use
+ ext = file.split(".")[-1]
+ if ext in ("html", "htm"):
+ return "text/html"
+ if ext == "js":
+ return "application/javascript"
+ if ext == "css":
+ return "text/css"
+ if ext in ("jpg", "jpeg"):
+ return "image/jpeg"
+ if ext == "png":
+ return "image/png"
+ return "text/plain"
+# Our HTTP Request handlers
+def led_on(environ): # pylint: disable=unused-argument
+ print("led on!")
+ status_light.fill((0, 0, 100))
+ return web_app.serve_file("static/index.html")
+def led_off(environ): # pylint: disable=unused-argument
+ print("led off!")
+ status_light.fill(0)
+ return web_app.serve_file("static/index.html")
+def led_color(environ): # pylint: disable=unused-argument
+ json = json_module.loads(environ["wsgi.input"].getvalue())
+ print(json)
+ rgb_tuple = (json.get("r"), json.get("g"), json.get("b"))
+ status_light.fill(rgb_tuple)
+ return ("200 OK", [], [])
+# Here we create our application, setting the static directory location
+# and registering the above request_handlers for specific HTTP requests
+# we want to listen and respond to.
+static = "/static"
+ static_files = os.listdir(static)
+ if "index.html" not in static_files:
+ raise RuntimeError(
+ """
+ This example depends on an index.html, but it isn't present.
+ Please add it to the {0} directory""".format(
+ static
+ )
+ )
+except OSError as e:
+ raise RuntimeError(
+ """
+ This example depends on a static asset directory.
+ Please create one named {0} in the root of the device filesystem.""".format(
+ static
+ )
+ ) from e
+web_app = SimpleWSGIApplication(static_dir=static)
+web_app.on("GET", "/led_on", led_on)
+web_app.on("GET", "/led_off", led_off)
+web_app.on("POST", "/ajax/ledcolor", led_color)
+# Here we setup our server, passing in our web_app as the application
+wsgiServer = server.WSGIServer(80, application=web_app)
+print("open this IP in your browser: ", esp.pretty_ip(esp.ip_address))
+# Start the server
+while True:
+ # Our main loop where we have the server poll for incoming requests
+ try:
+ wsgiServer.update_poll()
+ # Could do any other background tasks here, like reading sensors
+ except OSError as e:
+ print("Failed to update server, restarting ESP32\n", e)
+ wifi.reset()
+ continue