-
Notifications
You must be signed in to change notification settings - Fork 8
/
obdDash.py
105 lines (78 loc) · 3.29 KB
/
obdDash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import socketio
import random
import time
import json
import obd
import obdUtils
import time
import re
SENSOR_TYPE = 'OBD'
ERROR = 'ERR'
RETRY_INTERVAL = 1 #Delay in seconds when retrying to connect to node server
delay = 0.3 #Delay in seconds before sending data. This can be decreased, but .3s seems to be the fastest
#that the pi can display on the web dashboard. Otherwise the web page updating begins lagging
idleTime = 0.0 #Time the engine is idling in seconds
currentDtcCodes = []
dtcCodesChanged = False
numTries = 1
def emitDtcCodes():
global dtcCodesChanged
dtcCmd = obd.commands.GET_DTC
response = connection.query(dtcCmd)
dtcCodes = response.value
for i in dtcCodes: #add new codes if they don't exist
dtcCodesChanged = True
if dtcCodes[i] not in currentDtcCodes:
currentDtcCodes.append(dtcCodes[i])
for i in currentDtcCodes: #remove any codes that were resolved
dtcCodesChanged = True
if (currentDtcCodes[i] not in dtcCodes):
currentDtcCodes.pop(i)
if dtcCodesChanged:
sio.emit('dtcData', currentDtcCodes())
dtcCodesChanged = False
def emitTelemetry():
global idleTime
while True:
try:
speedCmd = obd.commands.SPEED # select an OBD command (sensor)
response = connection.query(speedCmd) # send the command, and parse the response
speed = str(response.value.to("mph").magnitude)
rpmCmd = obd.commands.RPM
response = connection.query(rpmCmd)
rpm = response.value.magnitude
throttleCmd = obd.commands.THROTTLE_POS
response = connection.query(throttleCmd)
throttle = str(response.value.magnitude)
runTimeCmd = obd.commands.RUN_TIME
response = connection.query(runTimeCmd)
runTime = str(response.value)
if (float(speed) < .1):
idleTime += delay
data = {'speed': speed, 'rpm': rpm, 'throttle': throttle, 'runTime': runTime, 'idleTime': idleTime}
sio.emit('data', json.dumps(data))
emitDtcCodes()
time.sleep(delay)
except Exception as ex: #logs any errors
errorLog = obdUtils.createLogMessage(ERROR, SENSOR_TYPE, type(ex).__name__, ex.args)
print(errorLog)
sio.emit('log', json.dumps(errorLog)) #will only work if exception is unrelated to node server connection
continue
connection = obd.OBD()
while True: #loop until a connection is made with the server instead of immediately exiting
try:
print("OBD connection established!")
sio = socketio.Client()
sio.connect('http://localhost:3000')
emitTelemetry()
break;
except Exception as ex:
errorLog = obdUtils.createLogMessage(ERROR, SENSOR_TYPE, type(ex).__name__, ex.args)
print(errorLog)
sio.emit('log', json.dumps(errorLog)) #will only work if exception is unrelated to node server connection
sleep(RETRY_INTERVAL)
continue
@sio.event
def connect():
print("Connected to node server!")
#emitTelemetry()