-
Notifications
You must be signed in to change notification settings - Fork 105
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PoC shared priority queue screen rotation
- Loading branch information
Showing
15 changed files
with
525 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from data.schedule import Schedule | ||
|
||
class Data: | ||
|
||
def __init__(self, screen_queue): | ||
self._screen_queue = screen_queue | ||
|
||
self.schedule = Schedule() | ||
|
||
def request_next_screen(self, screen): | ||
self._screen_queue.put(screen) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from data.status import Status | ||
|
||
class Game: | ||
|
||
@staticmethod | ||
def from_schedule(game_data): | ||
game = Game(game_data) | ||
|
||
if game.update(True) == Status.SUCCESS: | ||
return game | ||
|
||
return None | ||
|
||
def __init__(self, data): | ||
self._data = data | ||
|
||
self.id = data["game_id"] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import datetime, statsapi, time | ||
|
||
from utils import logger as ScoreboardLogger | ||
|
||
from data.status import Status | ||
from data.game import Game | ||
|
||
class Schedule: | ||
|
||
REFRESH_RATE = 5 * 60 # minutes | ||
|
||
def __init__(self): | ||
# Contains a list of parsed game objects | ||
self.games = [] | ||
# Cached request from statsapi | ||
self._games = [] | ||
|
||
self.update() | ||
|
||
def update(self): | ||
self.last_update = time.time() | ||
|
||
ScoreboardLogger.log(f"Updating schedule for {datetime.datetime.today()}") | ||
|
||
try: | ||
self.__fetch_updated_schedule(datetime.datetime.today().strftime("%Y-%m-%d")) | ||
except Exception as exception: | ||
ScoreboardLogger.exception("Networking error while refreshing schedule!") | ||
ScoreboardLogger.exception(exception) | ||
|
||
return Status.FAIL | ||
|
||
return Status.SUCCESS | ||
|
||
def __fetch_updated_schedule(self, date): | ||
self._games = statsapi.schedule(date.strftime("%Y-%m-%d")) | ||
|
||
self.games = [Game.from_schedule(game) for game in self._games] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from enum import Enum | ||
|
||
|
||
class Status(Enum): | ||
SUCCESS = 2 | ||
DEFERRED = 1 | ||
FAIL = 0 | ||
|
||
|
||
def ok(status): | ||
return status in [Status.SUCCESS, Status.DEFERRED] | ||
|
||
def fail(status): | ||
return status in [Status.FAIL] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import sys | ||
|
||
from utils import args | ||
from driver.mode import DriverMode | ||
|
||
class DriverWrapper: | ||
def __init__(self): | ||
self.hardware_load_failed = False | ||
self.mode = None | ||
|
||
if args().emulated: | ||
self.set_mode(DriverMode.SOFTWARE_EMULATION) | ||
else: | ||
self.set_mode(DriverMode.HARDWARE) | ||
|
||
|
||
@property | ||
def __name__(self): | ||
return 'driver' | ||
|
||
def is_hardware(self): | ||
return self.mode == DriverMode.HARDWARE | ||
|
||
def is_emulated(self): | ||
return self.mode == DriverMode.SOFTWARE_EMULATION | ||
|
||
def set_mode(self, mode): | ||
self.mode = mode | ||
|
||
if self.is_hardware(): | ||
try: | ||
import rgbmatrix | ||
|
||
self.driver = rgbmatrix | ||
except ImportError: | ||
import RGBMatrixEmulator | ||
|
||
self.mode = DriverMode.SOFTWARE_EMULATION | ||
self.driver = RGBMatrixEmulator | ||
self.hardware_load_failed = True | ||
else: | ||
import RGBMatrixEmulator | ||
|
||
self.driver = RGBMatrixEmulator | ||
|
||
def __getattr__(self, name): | ||
return getattr(self.driver, name) | ||
|
||
|
||
sys.modules['driver'] = DriverWrapper() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from enum import Enum | ||
|
||
|
||
class DriverMode(Enum): | ||
HARDWARE = 0 | ||
SOFTWARE_EMULATION = 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import statsapi, sys, threading | ||
|
||
from queue import PriorityQueue | ||
|
||
from version import SCRIPT_NAME, SCRIPT_VERSION | ||
|
||
from utils import args, led_matrix_options | ||
from utils import logger as ScoreboardLogger | ||
|
||
from screens.screen_manager import ScreenManager | ||
|
||
from data import Data | ||
|
||
import driver | ||
from driver import RGBMatrix, __version__ | ||
|
||
|
||
statsapi_version = tuple(map(int, statsapi.__version__.split("."))) | ||
if statsapi_version < (1, 5, 1): | ||
ScoreboardLogger.error("We require MLB-StatsAPI 1.5.1 or higher. You may need to re-run install.sh") | ||
sys.exit(1) | ||
elif statsapi_version < (1, 6, 1): | ||
ScoreboardLogger.warning("We recommend MLB-StatsAPI 1.6.1 or higher. You may want to re-run install.sh") | ||
|
||
|
||
def main(matrix): | ||
#TODO: Configure matrix dimensions | ||
ScoreboardLogger.info(f"{SCRIPT_NAME} - v#{SCRIPT_VERSION} (32x32)") | ||
|
||
canvas = matrix.CreateFrameCanvas() | ||
screen_queue = PriorityQueue(10) | ||
|
||
render_thread = threading.Thread( | ||
target=ScreenManager.start, | ||
args=[matrix, canvas, screen_queue], | ||
name="render_thread", | ||
daemon=True | ||
) | ||
|
||
render_thread.start() | ||
|
||
while render_thread.is_alive(): | ||
pass | ||
|
||
|
||
if __name__ == "__main__": | ||
command_line_args = args() | ||
matrix_options = led_matrix_options(command_line_args) | ||
|
||
matrix = RGBMatrix(options=matrix_options) | ||
|
||
try: | ||
main(matrix) | ||
except: | ||
ScoreboardLogger.exception("Untrapped error in main!") | ||
sys.exit(1) | ||
finally: | ||
matrix.Clear() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from enum import Enum | ||
|
||
class Screen(Enum): | ||
CLOCK = 1 | ||
WEATHER = 2 | ||
|
||
from screens.screen_manager import ScreenManager |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from datetime import datetime as dt | ||
|
||
class ScreenBase(): | ||
|
||
def __init__(self, manager): | ||
self._manager = manager | ||
|
||
self.start_time = None | ||
self.duration = 0 | ||
|
||
def request_next_screen(self, screen): | ||
self._manager.queue.put(screen) | ||
|
||
def track_duration(fn): | ||
def wrapper(self, *args, **kwargs): | ||
if self.start_time is None: | ||
self.start_time = dt.now() | ||
|
||
fn(self, *args, **kwargs) | ||
|
||
self.duration = (dt.now() - self.start_time).total_seconds() * 1000 | ||
|
||
return wrapper | ||
|
||
def render(self): | ||
raise NotImplementedError("Subclasses must implement render()") | ||
|
||
@track_duration | ||
def _render(self): | ||
self.render() | ||
|
||
@property | ||
def matrix(self): | ||
return self._manager.matrix | ||
|
||
@property | ||
def canvas(self): | ||
return self._manager.canvas |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import os, time | ||
|
||
from driver import graphics | ||
|
||
from screens import Screen | ||
from screens.base import ScreenBase | ||
|
||
|
||
class ClockScreen(ScreenBase): | ||
|
||
MAX_DURATION_SECONDS = 3 | ||
|
||
def __init__(self, *args): | ||
ScreenBase.__init__(self, *args) | ||
|
||
def render(self): | ||
time_format_str = "%H:%M%p" | ||
time_text = time.strftime(time_format_str) | ||
|
||
font_paths = ["../assets/fonts/patched", "../submodules/matrix/fonts"] | ||
for font_path in font_paths: | ||
path = f"{font_path}/4x6.bdf" | ||
if os.path.isfile(path): | ||
font = graphics.Font() | ||
font.LoadFont(path) | ||
|
||
graphics.DrawText(self.canvas, font, 5, 5, (255, 255, 255), time_text) | ||
|
||
if self.duration > self.MAX_DURATION_SECONDS * 1000: | ||
self.request_next_screen(Screen.WEATHER) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from screens import Screen | ||
from screens.base import ScreenBase | ||
from screens.clock import ClockScreen | ||
from screens.weather import WeatherScreen | ||
|
||
class ScreenManager(): | ||
|
||
SCREENS = { | ||
Screen.CLOCK: ClockScreen, | ||
Screen.WEATHER: WeatherScreen | ||
} | ||
|
||
@classmethod | ||
def start(cls, matrix, canvas, queue): | ||
ScreenManager(matrix, canvas, queue).__start() | ||
|
||
def __init__(self, matrix, canvas, queue): | ||
self.matrix = matrix | ||
self.canvas = canvas | ||
self.queue = queue | ||
self.screen = WeatherScreen(self) | ||
self.priority = "normal" # TODO | ||
|
||
def __start(self): | ||
while True: | ||
if not self.queue.empty(): | ||
screen = self.queue.get() | ||
screen_class = self.SCREENS.get(screen, None) | ||
|
||
if issubclass(screen_class, ScreenBase): | ||
self.screen = screen_class(self) | ||
# TODO: This could be a transition of some kind | ||
self.canvas.Clear() | ||
|
||
self.screen._render() | ||
|
||
self.canvas = self.matrix.SwapOnVSync(self.canvas) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import os | ||
|
||
from driver import graphics | ||
|
||
from screens import Screen | ||
from screens.base import ScreenBase | ||
|
||
|
||
class WeatherScreen(ScreenBase): | ||
|
||
MAX_DURATION_SECONDS = 3 | ||
|
||
def __init__(self, *args): | ||
ScreenBase.__init__(self, *args) | ||
|
||
def render(self): | ||
weather_text = "It's weathery" | ||
|
||
font_paths = ["../assets/fonts/patched", "../submodules/matrix/fonts"] | ||
for font_path in font_paths: | ||
path = f"{font_path}/4x6.bdf" | ||
if os.path.isfile(path): | ||
font = graphics.Font() | ||
font.LoadFont(path) | ||
|
||
graphics.DrawText(self.canvas, font, 0, 10, (255, 255, 255), weather_text) | ||
|
||
if self.duration > self.MAX_DURATION_SECONDS * 1000: | ||
self.request_next_screen(Screen.CLOCK) |
Oops, something went wrong.