-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbasic.py
154 lines (127 loc) · 5.44 KB
/
basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import time
import board
import busio
import neopixel
from adafruit_esp32spi.adafruit_esp32spi_wifimanager import (
ESPSPI_WiFiManager,
adafruit_esp32spi,
)
from adafruit_datetime import timezone, timedelta
from digitalio import DigitalInOut
import rtc
from _secrets import secrets
WIFI_TEST_URL = "http://wifitest.adafruit.com/testwifi/index.html"
class BasicApp:
"""Provides the common app setup and functionality."""
def __init__(self):
self.brightness = 1.00
self.board = board
self.cron_jobs = {}
self._cron_last_ran = {}
self.status_label = None
def set_boot_status(self, msg: str):
"""Set the on-boot status, if it's still being displayed."""
if self.status_label:
self.status_label.text = msg.replace(" ", "\n")
def cron_run(self):
"""Check for and run due cron jobs.
Cron jobs are defined as a dict of method names, to intervals(in seconds)
they should run."""
now = time.time()
for job in self.cron_jobs:
if job not in self._cron_last_ran or (
self._cron_last_ran[job] + self.cron_jobs[job] < now
):
print(f"Running scheduled job: {job}")
getattr(self, job)()
self._cron_last_ran[job] = now
def network_setup(self) -> ESPSPI_WiFiManager:
"""Activate the WiFi network."""
self.set_boot_status("WiFi Setup")
esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
self.network = ESPSPI_WiFiManager(
esp,
secrets,
status_pixel=neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2),
debug=True,
)
self.network_test()
def network_test(self):
"""Test that the network is working by fetching and printing data."""
self.set_boot_status("WiFi Test")
print("Fetching text from", WIFI_TEST_URL)
r = self.network.get(WIFI_TEST_URL)
print("-" * 40)
print(r.text)
print("-" * 40)
r.close()
print("Done!")
def parse_time(self, timestring: str, is_dst=-1):
"""Given a string of the format YYYY-MM-DDTHH:MM:SS.SS-HH:MM (and
optionally a DST flag), convert to and return an equivalent
time.struct_time (strptime() isn't available here). Calling function
can use time.mktime() on result if epoch seconds is needed instead.
Time string is assumed local time; UTC offset is ignored. If seconds
value includes a decimal fraction it's ignored.
"""
date_time = timestring.split("T") # Separate into date and time
year_month_day = date_time[0].split("-") # Separate time into Y/M/D
hour_minute_second = date_time[1].split("+")[0].split("-")[0].split(":")
return time.struct_time(
(
int(year_month_day[0]),
int(year_month_day[1]),
int(year_month_day[2]),
int(hour_minute_second[0]),
int(hour_minute_second[1]),
int(hour_minute_second[2].split(".")[0]),
-1,
-1,
is_dst,
)
)
def lookup_timezone(self, name: str, set_rtc=False):
"""Update system date/time from WorldTimeAPI public server;
no account required. Pass in time zone string
(http://worldtimeapi.org/api/timezone for list)
or None to use IP geolocation. Returns current local time as a
time.struct_time and UTC offset as string. This may throw an
exception on fetch_data() - it is NOT CAUGHT HERE, should be
handled in the calling code because different behaviors may be
needed in different situations (e.g. reschedule for later).
"""
time_url = f"http://worldtimeapi.org/api/timezone/{name}"
print(f"Fetching time from {time_url}...")
time_data: dict = self.network.get(
time_url, headers={"Accept": "application/json"}
).json()
# This is here because we sync the clock via the same API as lookup TZ offsets.
if set_rtc:
time_struct = self.parse_time(
timestring=time_data["datetime"], is_dst=time_data["dst"]
)
old_time = time.time()
rtc.RTC().datetime = time_struct
print(
"RTC updated from Internet: {0}, change was: {1}s".format(
time.localtime(), old_time - time.time()
)
)
delta = timedelta(seconds=time_data["raw_offset"])
if time_data["dst"]:
delta += timedelta(seconds=time_data["dst_offset"])
print("{:20s} {:d}".format(time_data["timezone"], delta.seconds))
return timezone(offset=delta, name=time_data["timezone"])
def color(self, color_in_hex: int) -> int:
"""Apply brightness to a color.
This is the math you do when the board don't support brightness setting.
"""
r = int(round((color_in_hex >> 16) * self.brightness, 0))
g = int(round((color_in_hex >> 8 & 0xFF) * self.brightness, 0))
b = int(round((color_in_hex & 0xFF) * self.brightness, 0))
# print(f"r:{r}, g:{g}, b:{b}")
return r << 16 | g << 8 | b