-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqlab.py
349 lines (303 loc) · 12.2 KB
/
qlab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
import threading
import time
import json
from flask import Flask, render_template, request, jsonify, send_from_directory # type: ignore
from flask_cors import CORS # type: ignore
from pythonosc import udp_client, dispatcher, osc_server # type: ignore
from PIL import ImageGrab, Image # type: ignore
import os
import platform
import base64
import logging
import socket
import subprocess
# suppress all logging messages to error level
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
# Flask app setup
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# get local ip
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't have to be reachable
s.connect(("10.254.254.254", 1))
ip = s.getsockname()[0]
except Exception:
ip = "127.0.0.1"
finally:
s.close()
return ip
# If platform is not macOS, abort and print an error message
if platform.system() != "Darwin":
print("Error: This script is only compatible with macOS.\nIt can be modified to work on other platforms, just remove the macOS-specific parts such as AppleScript, etc.")
exit(1)
LOCAL_IP = get_local_ip()
# If local ip != 127.0.0.1, use it as the default QLab IP
if LOCAL_IP != "127.0.0.1":
print(f"Detected local IP: {LOCAL_IP}")
QLAB_IP = LOCAL_IP
else:
# Otherwise, abort and print an error message
print("Error: Could not detect local IP address. Please try again in 5 seconds, or set QLAB_IP manually.")
exit(1)
# OSC Client setup
QLAB_IP = LOCAL_IP # Default QLab IP
QLAB_PORT = 53000 # Default QLab OSC port
OSC_LISTEN_PORT = 53001 # Port to listen for responses from QLab
osc_client = udp_client.SimpleUDPClient(QLAB_IP, QLAB_PORT)
# OSC Dispatcher setup
osc_dispatcher = dispatcher.Dispatcher()
# Global variables
detected_devices = [{"id": QLAB_IP, "name": "Main QLab"}]
workspaces = []
selected_device = None
selected_workspace = None
selected_cue_number = "N/A"
selected_cue_name = "N/A"
active_cue_number = "N/A"
active_cue_name = "N/A"
# Handle workspace response
def handle_workspace_response(_, *args):
global workspaces
# print(f"Raw workspace response: {args}")
try:
if len(args) >= 1:
response = json.loads(args[0])
if "data" in response:
workspaces = response["data"]
# print(f"Updated workspaces: {workspaces}")
return
except Exception as e:
print(f"Error parsing workspace response: {e}")
workspaces = [] # Reset if the response is invalid
# Register OSC handlers
osc_dispatcher.map("/reply/workspaces", handle_workspace_response)
# Start OSC server
def start_osc_server():
server = osc_server.ThreadingOSCUDPServer(("0.0.0.0", OSC_LISTEN_PORT), osc_dispatcher)
# print(f"OSC Server listening on port {OSC_LISTEN_PORT}")
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
start_osc_server()
# Flask routes
@app.route("/")
def index():
return render_template(
"index.html",
devices=detected_devices,
workspaces=workspaces,
selected_workspace=selected_workspace,
)
@app.route("/fetch_workspaces", methods=["POST"])
def fetch_workspaces():
global selected_device, workspaces
selected_device = request.form.get("device_id")
if selected_device:
try:
osc_client.send_message("/workspaces", [])
# print(f"Sent request for workspaces for device: {selected_device}")
return jsonify({"status": "success", "message": "Fetching workspaces"})
except Exception as e:
print(f"Error fetching workspaces: {e}")
return jsonify({"status": "error", "message": "Failed to fetch workspaces"}), 500
return jsonify({"status": "error", "message": "No device selected"}), 400
@app.route("/current_workspaces", methods=["GET"])
def current_workspaces():
global workspaces
if workspaces:
return jsonify({"workspaces": workspaces})
print("No workspaces available")
return jsonify({"status": "error", "message": "No workspaces available"}), 400
@app.route("/select_workspace", methods=["POST"])
def select_workspace():
global selected_workspace
selected_workspace = request.form.get("workspace_id")
# print(f"Selected workspace set to: {selected_workspace}")
if selected_workspace:
try:
osc_client.send_message("/workspace/connect", [selected_workspace])
return jsonify({"status": "success", "workspace": selected_workspace, "message": "Workspace connected successfully"})
except Exception as e:
print(f"Error connecting to workspace: {e}")
return jsonify({"status": "error", "message": "Failed to connect to workspace"}), 500
return jsonify({"status": "error", "message": "No workspace selected"}), 400
@app.route("/button_action", methods=["POST"])
def button_action():
try:
# print("Received POST data:", request.form)
action = request.form.get("data-action")
if selected_workspace:
osc_command = {
"go": f"/workspace/{selected_workspace}/go",
"next": f"/workspace/{selected_workspace}/select/next",
"previous": f"/workspace/{selected_workspace}/select/previous",
"panic": f"/workspace/{selected_workspace}/panic",
"stop": f"/workspace/{selected_workspace}/hardStop",
"pause": f"/workspace/{selected_workspace}/pause",
"resume": f"/workspace/{selected_workspace}/resume",
}.get(action)
if osc_command:
osc_client.send_message(osc_command, [])
# print(f"Sent OSC command: {osc_command}")
return jsonify({"status": "success", "action": action})
else:
return jsonify({"status": "error", "message": "Unknown action"}), 400
else:
return jsonify({"status": "error", "message": "No workspace selected"}), 400
except Exception as e:
print(f"Error: {e}")
return jsonify({"status": "error", "message": "Something went wrong"}), 500
# Periodically fetch current and selected cues
def fetch_current_cue_periodically():
global selected_cue_number, selected_cue_name, active_cue_number, active_cue_name
# Initialize these to None so we know when they haven't been set yet.
selected_cue_number = None
selected_cue_name = None
active_cue_number = None
active_cue_name = None
while True:
try:
# Fetch the selected cue number
current_selected_cue_number_script = """
tell application id "com.figure53.QLab.4"
tell front workspace
try
set selectedCue to first item of (selected as list)
return name of selectedCue
on error
return "N/A"
end try
end tell
end tell
"""
result = subprocess.run(
["osascript", "-e", current_selected_cue_number_script],
capture_output=True,
text=True,
check=True
)
cue_number = result.stdout.strip()
if cue_number != "N/A":
selected_cue_number = cue_number
# Fetch the selected cue name
selected_cue_name_script = """
tell application id "com.figure53.QLab.4"
tell front workspace
try
set selectedCue to first item of (selected as list)
return q display name of selectedCue
on error
return "N/A"
end try
end tell
end tell
"""
result = subprocess.run(
["osascript", "-e", selected_cue_name_script],
capture_output=True,
text=True,
check=True
)
cue_name = result.stdout.strip()
if cue_name != "N/A":
selected_cue_name = cue_name
# Fetch the active cue number
active_cue_number_script = """
tell application id "com.figure53.QLab.4"
tell front workspace
try
set activeCue to last item of (active cues as list)
return q number of activeCue
on error
return "N/A"
end try
end tell
end tell
"""
result = subprocess.run(
["osascript", "-e", active_cue_number_script],
capture_output=True,
text=True,
check=True
)
cue_number = result.stdout.strip()
if cue_number != "N/A":
active_cue_number = cue_number
# Fetch the active cue name
active_cue_name_script = """
tell application id "com.figure53.QLab.4"
tell front workspace
try
set activeCue to last item of (active cues as list)
return q display name of activeCue
on error
return "N/A"
end try
end tell
end tell
"""
result = subprocess.run(
["osascript", "-e", active_cue_name_script],
capture_output=True,
text=True,
check=True
)
cue_name = result.stdout.strip()
if cue_name != "N/A":
active_cue_name = cue_name
except subprocess.CalledProcessError as e:
print(f"Error: {e.stderr.strip()}")
# # Log current values (debugging purposes)
# print(f"Selected cue: {selected_cue_number} - {selected_cue_name}")
# print(f"Active cue: {active_cue_number} - {active_cue_name}")
# Wait before checking again
time.sleep(0.25)
# Start the periodic task in a separate thread
fetch_current_cue_thread = threading.Thread(target=fetch_current_cue_periodically, daemon=True)
fetch_current_cue_thread.start()
# Ensure the "static" directory exists
if not os.path.exists('static'):
os.makedirs('static')
def capture_screenshot():
# Grab a screenshot of window with title "QLab" and resize it to 720x480 and get the base64 string
while True:
# Get the screenshot of the window with title "QLab"
os.system("screencapture -l$(osascript -e 'tell app \"QLab\" to id of window 1') -x static/screenshot.png")
# Open the screenshot image
screenshot = Image.open("static/screenshot.png")
# Resize the image to 720x480
screenshot = screenshot.resize((720, 480))
# Save the resized image
screenshot.save("static/screenshot.png")
# Save the base64 string of the image
with open("static/screenshot.png", "rb") as img_file:
img_base64 = base64.b64encode(img_file.read()).decode("utf-8")
# Store the base64 string in a global variable
global current_screenshot
current_screenshot = img_base64
# print("Screenshot captured and encoded.")
time.sleep(0.25)
# Initialize a global variable to hold the base64 string
current_screenshot = ""
# Start the screenshot capture thread
screenshot_thread = threading.Thread(target=capture_screenshot, daemon=True)
screenshot_thread.start()
@app.route("/screenshot", methods=["GET"])
def get_screenshot():
if current_screenshot:
return jsonify({"screenshot": current_screenshot})
return jsonify({"status": "error", "message": "No screenshot available"}), 400
# Cue information endpoint
@app.route("/cue_info", methods=["GET"])
def cue_info():
global selected_cue_number, selected_cue_name, active_cue_number, active_cue_name
return jsonify({
"selected_cue_number": selected_cue_number,
"selected_cue_name": selected_cue_name,
"active_cue_number": active_cue_number,
"active_cue_name": active_cue_name,
})
if __name__ == "__main__":
app.run(debug=False, host="0.0.0.0", port=5000, use_reloader=False, threaded=True)