diff --git a/lib/TWCManager/Vehicle/TeslaAPI.py b/lib/TWCManager/Vehicle/TeslaAPI.py
index a5d95dbf..98669956 100644
--- a/lib/TWCManager/Vehicle/TeslaAPI.py
+++ b/lib/TWCManager/Vehicle/TeslaAPI.py
@@ -1,16 +1,26 @@
+import base64
+import hashlib
+import os
+import re
+import requests
+import time
+from urllib.parse import parse_qs
+
class TeslaAPI:
import json
- import re
- import requests
- import time
+ authURL = "https://auth.tesla.com/oauth2/v3/authorize"
+ browserUA = "Mozilla/5.0 (Linux; Android 10; Pixel 3 Build/QQ2A.200305.002; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/85.0.4183.81 Mobile Safari/537.36"
+ callbackURL = "https://auth.tesla.com/void/callback"
carApiLastErrorTime = 0
carApiBearerToken = ""
carApiRefreshToken = ""
carApiTokenExpireTime = time.time()
carApiLastStartOrStopChargeTime = 0
carApiLastChargeLimitApplyTime = 0
+ clientID = "81527cff06843c8634fdc09e8ac0abefb46ac849f38fe1e431c2ef2106796384"
+ clientSecret = "c7257eb71a564034f9419ee651c7d0e5f7aa6bfbd18bafb5c5c033b093bb2fa3"
lastChargeLimitApplied = 0
lastChargeCheck = 0
chargeUpdateInterval = 1800
@@ -18,7 +28,11 @@ class TeslaAPI:
config = None
debugLevel = 0
master = None
+ maxLoginRetries = 10
minChargeLevel = -1
+ refreshURL = "https://owner-api.teslamotors.com/oauth/token"
+ teslaUA = "TeslaApp/3.10.9-433/adff2e065/android/10"
+ verifier = ""
# Transient errors are ones that usually disappear if we retry the car API
# command a minute or less later.
@@ -54,10 +68,180 @@ def addVehicle(self, json):
self.carApiVehicles.append(CarApiVehicle(json, self, self.config))
return True
+ def apiLogin(self, email, password):
+
+ headers = {
+ "User-Agent": self.browserUA,
+ "x-tesla-user-agent": self.teslaUA,
+ "X-Requested-With": "com.teslamotors.tesla",
+ }
+
+ for attempt in range(self.maxLoginRetries):
+
+ self.verifier = base64.urlsafe_b64encode(os.urandom(86)).rstrip(b"=")
+ challenge = base64.urlsafe_b64encode(hashlib.sha256(self.verifier).digest()).rstrip(b"=")
+ state = base64.urlsafe_b64encode(os.urandom(16)).rstrip(b"=").decode("utf-8")
+
+ params = (
+ ("client_id", "ownerapi"),
+ ("code_challenge", challenge),
+ ("code_challenge_method", "S256"),
+ ("redirect_uri", self.callbackURL),
+ ("response_type", "code"),
+ ("scope", "openid email offline_access"),
+ ("state", state),
+ )
+
+ session = requests.Session()
+ resp = session.get(self.authURL, headers=headers, params=params)
+
+ if resp.ok and "
" in resp.text:
+ self.master.debugLog(
+ 6,
+ "TeslaAPI",
+ "Tesla Auth form fetch success, attempt: " + str(attempt),
+ )
+ break
+ else:
+ self.master.debugLog(
+ 6,
+ "TeslaAPI",
+ "Tesla auth form fetch failed, attempt: " + str(attempt),
+ )
+
+ time.sleep(3)
+ else:
+ self.master.debugLog(
+ 2,
+ "TeslaAPI",
+ "Wasn't able to find authentication form after " + str(attempt) + " attempts",
+ )
+ return 0
+
+ csrf = re.search(r'name="_csrf".+value="([^"]+)"', resp.text).group(1)
+ transaction_id = re.search(r'name="transaction_id".+value="([^"]+)"', resp.text).group(1)
+
+ data = {
+ "_csrf": csrf,
+ "_phase": "authenticate",
+ "_process": "1",
+ "transaction_id": transaction_id,
+ "cancel": "",
+ "identity": email,
+ "credential": password,
+ }
+
+ for attempt in range(self.maxLoginRetries):
+ resp = session.post(
+ self.authURL, headers=headers, params=params, data=data, allow_redirects=False
+ )
+ if resp.ok and (resp.status_code == 302 or "" in resp.text):
+ self.master.debugLog(
+ 2,
+ "TeslaAPI",
+ "Posted auth form successfully after " + str(attempt) + " attempts",
+ )
+ break
+ time.sleep(3)
+ else:
+ self.master.debugLog(
+ 2,
+ "TeslaAPI",
+ "Wasn't able to post authentication form after " + str(attempt) + " attempts",
+ )
+
+ code = parse_qs(resp.headers["location"])[self.callbackURL + "?code"]
+
+ headers = {"user-agent": self.browserUA,
+ "x-tesla-user-agent": self.teslaUA}
+ payload = {
+ "grant_type": "authorization_code",
+ "client_id": "ownerapi",
+ "code_verifier": self.verifier.decode("utf-8"),
+ "code": code,
+ "redirect_uri": self.callbackURL,
+ }
+
+ resp = session.post("https://auth.tesla.com/oauth2/v3/token", headers=headers, json=payload)
+ access_token = resp.json()["access_token"]
+
+ headers["authorization"] = "bearer " + access_token
+ payload = {
+ "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
+ "client_id": self.clientID,
+ }
+ resp = session.post("https://owner-api.teslamotors.com/oauth/token", headers=headers, json=payload)
+ try:
+ self.setCarApiBearerToken(resp.json()["access_token"])
+ self.setCarApiRefreshToken(resp.json()["refresh_token"])
+ self.setCarApiTokenExpireTime(time.time() + resp.json()["expires_in"])
+ self.master.queue_background_task({"cmd": "saveSettings"})
+
+ except KeyError:
+ self.master.debugLog(
+ 2,
+ "TeslaAPI",
+ "ERROR: Can't access Tesla car via API. Please log in again via web interface.",
+ )
+ self.updateCarApiLastErrorTime()
+ # Instead of just setting carApiLastErrorTime, erase tokens to
+ # prevent further authorization attempts until user enters password
+ # on web interface. I feel this is safer than trying to log in every
+ # ten minutes with a bad token because Tesla might decide to block
+ # remote access to your car after too many authorization errors.
+ self.setCarApiBearerToken("")
+ self.setCarApiRefreshToken("")
+ self.master.queue_background_task({"cmd": "saveSettings"})
+
+ def apiRefresh(self):
+ # Refresh tokens expire in 45
+ # days when first issued, so we'll get a new token every 15 days.
+ headers = {
+ "accept": "application/json",
+ "Content-Type": "application/json",
+ }
+ data = {
+ "client_id": self.clientID,
+ "client_secret": self.clientSecret,
+ "grant_type": "refresh_token",
+ "refresh_token": self.getCarApiRefreshToken(),
+ }
+ req = None
+ try:
+ req = requests.post(self.refreshURL, headers=headers, json=data)
+ self.master.debugLog(2, "TeslaAPI", "Car API request" + str(req))
+ apiResponseDict = self.json.loads(req.text)
+ except:
+ pass
+
+ try:
+ self.master.debugLog(
+ 4, "TeslaAPI", "Car API auth response" + str(apiResponseDict)
+ )
+ self.setCarApiBearerToken(apiResponseDict["access_token"])
+ self.setCarApiRefreshToken(apiResponseDict["refresh_token"])
+ self.setCarApiTokenExpireTime(now + apiResponseDict["expires_in"])
+
+ except KeyError:
+ self.master.debugLog(
+ 2,
+ "TeslaAPI",
+ "ERROR: Can't access Tesla car via API. Please log in again via web interface.",
+ )
+ self.updateCarApiLastErrorTime()
+ # Instead of just setting carApiLastErrorTime, erase tokens to
+ # prevent further authorization attempts until user enters password
+ # on web interface. I feel this is safer than trying to log in every
+ # ten minutes with a bad token because Tesla might decide to block
+ # remote access to your car after too many authorization errors.
+ self.setCarApiBearerToken("")
+ self.setCarApiRefreshToken("")
+ self.master.queue_background_task({"cmd": "saveSettings"})
+
def car_api_available(
self, email=None, password=None, charge=None, applyLimit=None
):
- now = self.time.time()
+ now = time.time()
apiResponseDict = {}
if self.getCarApiRetryRemaining():
@@ -87,88 +271,28 @@ def car_api_available(
"Entering car_api_available - next step is to query Tesla API",
)
- # Tesla car API info comes from https://timdorr.docs.apiary.io/
+ # Authentiate to Tesla API
if (
self.getCarApiBearerToken() == ""
or self.getCarApiTokenExpireTime() - now < 30 * 24 * 60 * 60
):
- req = None
- client_id = (
- "81527cff06843c8634fdc09e8ac0abefb46ac849f38fe1e431c2ef2106796384"
- )
- client_secret = (
- "c7257eb71a564034f9419ee651c7d0e5f7aa6bfbd18bafb5c5c033b093bb2fa3"
- )
- url = "https://owner-api.teslamotors.com/oauth/token"
- headers = None
- data = None
-
- # If we don't have a bearer token or our refresh token will expire in
- # under 30 days, get a new bearer token. Refresh tokens expire in 45
- # days when first issued, so we'll get a new token every 15 days.
if self.getCarApiRefreshToken() != "":
headers = {
"accept": "application/json",
"Content-Type": "application/json",
}
data = {
- "client_id": client_id,
- "client_secret": client_secret,
+ "client_id": self.clientID,
+ "client_secret": self.clientSecret,
"grant_type": "refresh_token",
"refresh_token": self.getCarApiRefreshToken(),
}
self.master.debugLog(8, "TeslaAPI", "Attempting token refresh")
+ self.apiRefresh()
elif email != None and password != None:
- headers = {
- "accept": "application/json",
- "Content-Type": "application/json",
- }
- data = {
- "client_id": client_id,
- "client_secret": client_secret,
- "grant_type": "password",
- "email": email,
- "password": password,
- }
self.master.debugLog(8, "TeslaAPI", "Attempting password auth")
-
- if headers and data:
- try:
- req = self.requests.post(url, headers=headers, json=data)
- self.master.debugLog(2, "TeslaAPI", "Car API request" + str(req))
- # Example response:
- # b'{"access_token":"4720d5f980c9969b0ca77ab39399b9103adb63ee832014fe299684201929380","token_type":"bearer","expires_in":3888000,"refresh_token":"110dd4455437ed351649391a3425b411755a213aa815171a2c6bfea8cc1253ae","created_at":1525232970}'
-
- apiResponseDict = self.json.loads(req.text)
- except:
- pass
- else:
- self.master.debugLog(2, "TeslaAPI", "Car API request is empty")
-
- try:
- self.master.debugLog(
- 4, "TeslaAPI", "Car API auth response" + str(apiResponseDict)
- )
- self.setCarApiBearerToken(apiResponseDict["access_token"])
- self.setCarApiRefreshToken(apiResponseDict["refresh_token"])
- self.setCarApiTokenExpireTime(now + apiResponseDict["expires_in"])
- except KeyError:
- self.master.debugLog(
- 2,
- "TeslaAPI",
- "ERROR: Can't access Tesla car via API. Please log in again via web interface.",
- )
- self.updateCarApiLastErrorTime()
- # Instead of just setting carApiLastErrorTime, erase tokens to
- # prevent further authorization attempts until user enters password
- # on web interface. I feel this is safer than trying to log in every
- # ten minutes with a bad token because Tesla might decide to block
- # remote access to your car after too many authorization errors.
- self.setCarApiBearerToken("")
- self.setCarApiRefreshToken("")
-
- self.master.queue_background_task({"cmd": "saveSettings"})
+ self.apiLogin(email, password)
if self.getCarApiBearerToken() != "":
if self.getVehicleCount() < 1:
@@ -178,7 +302,7 @@ def car_api_available(
"Authorization": "Bearer " + self.getCarApiBearerToken(),
}
try:
- req = self.requests.get(url, headers=headers)
+ req = requests.get(url, headers=headers)
self.master.debugLog(
8, "TeslaAPI", "Car API cmd vehicles " + str(req)
)
@@ -280,7 +404,7 @@ def car_api_available(
"Authorization": "Bearer " + self.getCarApiBearerToken(),
}
try:
- req = self.requests.post(url, headers=headers)
+ req = requests.post(url, headers=headers)
self.master.debugLog(
8, "TeslaAPI", "Car API cmd wake_up" + str(req)
)
@@ -359,7 +483,7 @@ def car_api_available(
# fast and only a reboot of the Raspberry resultet in
# possible reconnect to the API (even the Tesla App
# couldn't connect anymore).
- self.time.sleep(5)
+ time.sleep(5)
if now - vehicle.firstWakeAttemptTime <= 31 * 60:
# A car in offline state is presumably not connected
# wirelessly so our wake_up command will not reach
@@ -499,7 +623,7 @@ def car_api_available(
# quickly after we send wake_up. I haven't seen a problem sending a
# command immediately, but it seems safest to sleep 5 seconds after
# waking before sending a command.
- self.time.sleep(5)
+ time.sleep(5)
return True
@@ -547,7 +671,7 @@ def car_api_charge(self, charge):
# Do not call this function directly. Call by using background thread:
# queue_background_task({'cmd':'charge', 'charge':})
- now = self.time.time()
+ now = time.time()
apiResponseDict = {}
if not charge:
# Whenever we are going to tell vehicles to stop charging, set
@@ -632,7 +756,7 @@ def car_api_charge(self, charge):
# {'response': {'result': False, 'reason': 'could_not_wake_buses'}}
# Waiting 2 seconds seems to consistently avoid the error, but let's
# wait 5 seconds in case of hardware differences between cars.
- self.time.sleep(5)
+ time.sleep(5)
if charge:
self.applyChargeLimit(self.lastChargeLimitApplied, checkArrival=True)
@@ -647,7 +771,7 @@ def car_api_charge(self, charge):
# Retry up to 3 times on certain errors.
for _ in range(0, 3):
try:
- req = self.requests.post(url, headers=headers)
+ req = requests.post(url, headers=headers)
self.master.debugLog(
8,
"TeslaAPI",
@@ -704,7 +828,7 @@ def car_api_charge(self, charge):
+ error
+ "' when trying to start charging. Try again in 1 minute.",
)
- self.time.sleep(60)
+ time.sleep(60)
foundKnownError = True
break
if foundKnownError:
@@ -752,7 +876,7 @@ def car_api_charge(self, charge):
# If all retries fail, we'll try again in a
# minute because we set
# carApiLastStartOrStopChargeTime = now earlier.
- self.time.sleep(5)
+ time.sleep(5)
continue
else:
# Start or stop charge failed with an error I
@@ -809,7 +933,7 @@ def applyChargeLimit(self, limit, checkArrival=False, checkDeparture=False):
)
return "error"
- now = self.time.time()
+ now = time.time()
if (
not checkArrival
and not checkDeparture
@@ -953,7 +1077,7 @@ def applyChargeLimit(self, limit, checkArrival=False, checkDeparture=False):
# the vehicle sometimes refuses the start command because it's
# "fully charged" under the old limit, but then continues to say
# charging was stopped once the new limit is in place.
- self.time.sleep(5)
+ time.sleep(5)
if checkArrival:
self.updateChargeAtHome()
@@ -985,7 +1109,7 @@ def getCarApiRetryRemaining(self, vehicleLast=0):
return 0
else:
backoff = self.getCarApiErrorRetryMins() * 60
- lasterrortime = self.time.time() - lastError
+ lasterrortime = time.time() - lastError
if lasterrortime >= backoff:
return 0
else:
@@ -1041,7 +1165,7 @@ def setCarApiTokenExpireTime(self, value):
return True
def updateCarApiLastErrorTime(self):
- timestamp = self.time.time()
+ timestamp = time.time()
self.master.debugLog(
8,
"TeslaAPI",
@@ -1054,14 +1178,14 @@ def updateCarApiLastErrorTime(self):
return True
def updateLastStartOrStopChargeTime(self):
- self.carApiLastStartOrStopChargeTime = self.time.time()
+ self.carApiLastStartOrStopChargeTime = time.time()
return True
def updateChargeAtHome(self):
for car in self.carApiVehicles:
if car.atHome:
car.update_charge()
- self.lastChargeCheck = self.time.time()
+ self.lastChargeCheck = time.time()
@property
def numCarsAtHome(self):
@@ -1069,7 +1193,7 @@ def numCarsAtHome(self):
@property
def minBatteryLevelAtHome(self):
- if self.time.time() - self.lastChargeCheck > self.chargeUpdateInterval:
+ if time.time() - self.lastChargeCheck > self.chargeUpdateInterval:
self.master.queue_background_task({"cmd":"checkCharge"})
return min(
[car.batteryLevel for car in self.carApiVehicles if car.atHome],
@@ -1131,7 +1255,7 @@ def ready(self):
if (
self.firstWakeAttemptTime == 0
- and self.time.time() - self.lastAPIAccessTime < 2 * 60
+ and time.time() - self.lastAPIAccessTime < 2 * 60
):
# If it's been less than 2 minutes since we successfully woke this car, it
# should still be awake. No need to check. It returns to sleep state about
@@ -1171,7 +1295,7 @@ def get_car_api(self, url, checkReady=True, provesOnline=True):
# Retry up to 3 times on certain errors.
for _ in range(0, 3):
try:
- req = self.requests.get(url, headers=headers)
+ req = requests.get(url, headers=headers)
self.carapi.master.debugLog(
8, "TeslaVehic", "Car API cmd " + url + " " + str(req)
)
@@ -1205,7 +1329,7 @@ def get_car_api(self, url, checkReady=True, provesOnline=True):
+ error
+ "' when trying to get status. Try again in 1 minute.",
)
- self.time.sleep(60)
+ time.sleep(60)
foundKnownError = True
break
if foundKnownError:
@@ -1221,7 +1345,7 @@ def get_car_api(self, url, checkReady=True, provesOnline=True):
):
# Retry after 5 seconds. See notes in car_api_charge where
# 'could_not_wake_buses' is handled.
- self.time.sleep(5)
+ time.sleep(5)
continue
except (KeyError, TypeError):
# This catches cases like trying to access
@@ -1234,11 +1358,11 @@ def get_car_api(self, url, checkReady=True, provesOnline=True):
+ self.name
+ ". Will try again later.",
)
- self.lastErrorTime = self.time.time()
+ self.lastErrorTime = time.time()
return (False, None)
if provesOnline:
- self.lastAPIAccessTime = self.time.time()
+ self.lastAPIAccessTime = time.time()
return (True, response)
@@ -1247,7 +1371,7 @@ def update_location(self, cacheTime=60):
url = "https://owner-api.teslamotors.com/api/1/vehicles/"
url = url + str(self.ID) + "/data_request/drive_state"
- now = self.time.time()
+ now = time.time()
if now - self.lastDriveStatusTime < cacheTime:
return True
@@ -1266,7 +1390,7 @@ def update_charge(self):
url = "https://owner-api.teslamotors.com/api/1/vehicles/"
url = url + str(self.ID) + "/data_request/charge_state"
- now = self.time.time()
+ now = time.time()
if now - self.lastChargeStatusTime < 60:
return True
@@ -1274,7 +1398,7 @@ def update_charge(self):
(result, response) = self.get_car_api(url)
if result:
- self.lastChargeStatusTime = self.time.time()
+ self.lastChargeStatusTime = time.time()
self.chargeLimit = response["charge_limit_soc"]
self.batteryLevel = response["battery_level"]
self.timeToFullCharge = response["time_to_full_charge"]
@@ -1285,7 +1409,7 @@ def apply_charge_limit(self, limit):
if self.stopTryingToApplyLimit:
return True
- now = self.time.time()
+ now = time.time()
if (
now - self.lastLimitAttemptTime <= 300
@@ -1309,7 +1433,7 @@ def apply_charge_limit(self, limit):
for _ in range(0, 3):
try:
- req = self.requests.post(url, headers=headers, json=body)
+ req = requests.post(url, headers=headers, json=body)
self.carapi.master.debugLog(
8, "TeslaVehic", "Car API cmd set_charge_limit " + str(req)
)
@@ -1334,7 +1458,7 @@ def apply_charge_limit(self, limit):
self.lastAPIAccessTime = now
return True
elif reason == "could_not_wake_buses":
- self.time.sleep(5)
+ time.sleep(5)
continue
elif apiResponseDict["response"] == None:
if "error" in apiResponseDict:
@@ -1353,7 +1477,7 @@ def apply_charge_limit(self, limit):
+ error
+ "' when trying to set charge limit. Try again in 1 minute.",
)
- self.time.sleep(60)
+ time.sleep(60)
foundKnownError = True
break
if foundKnownError: