Skip to content

Commit

Permalink
Enable login to china again
Browse files Browse the repository at this point in the history
  • Loading branch information
rikroe committed Nov 24, 2021
1 parent 5652c8a commit a8aead1
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 118 deletions.
285 changes: 178 additions & 107 deletions bimmer_connected/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
import urllib
from threading import Lock
from typing import Any, Callable, Dict, List

import jwt
import requests
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
from requests.auth import HTTPBasicAuth
from requests.exceptions import HTTPError
from requests.models import Response

from bimmer_connected.const import (
AUTH_CHINA_LOGIN_URL,
AUTH_CHINA_PUBLIC_KEY_URL,
AUTH_URL,
OAUTH_CONFIG_URL,
VEHICLES_URL,
X_USER_AGENT
)
from bimmer_connected.country_selector import (
Regions,
get_server_url,
get_ocp_apim_key,
get_server_url
)
from bimmer_connected.vehicle import ConnectedDriveVehicle, CarBrand
from bimmer_connected.const import AUTH_URL, OAUTH_CONFIG_URL, VEHICLES_URL, X_USER_AGENT
from bimmer_connected.vehicle import CarBrand, ConnectedDriveVehicle

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,125 +85,185 @@ def __init__(self, username: str, password: str, region: Regions, log_responses:
def _get_oauth_token(self) -> None:
"""Get a new auth token from the server."""
with self._lock:
if self._token_expiration is not None and datetime.datetime.now() < self._token_expiration:
if self._token_expiration is not None and datetime.datetime.utcnow() < self._token_expiration:
_LOGGER.debug('Old token is still valid. Not getting a new one.')
return

try:
# We need a session for cross-request cookies
oauth_session = requests.Session()
# oauth_settings = get_gcdm_oauth_authorization(self._region)
r_oauth_settings = oauth_session.get(
OAUTH_CONFIG_URL.format(server=self.server_url),
headers={
"ocp-apim-subscription-key": get_ocp_apim_key(self._region),
"x-user-agent": X_USER_AGENT.format("bmw"),
}
)
r_oauth_settings.raise_for_status()
oauth_settings = r_oauth_settings.json()
if self.region in [Regions.REST_OF_WORLD, Regions.NORTH_AMERICA]:
token_data = self._login_row_na()
if self.region == Regions.CHINA:
token_data = self._login_china()

# My BMW login flow
_LOGGER.debug("Authenticating against GCDM with MyBMW flow.")

# Setting up PKCS data
verifier_bytes = os.urandom(64)
code_verifier = base64.urlsafe_b64encode(verifier_bytes).rstrip(b'=')

challenge_bytes = hashlib.sha256(code_verifier).digest()
code_challenge = base64.urlsafe_b64encode(challenge_bytes).rstrip(b'=')
self._oauth_token = token_data["access_token"]
self._token_expiration = token_data["expires_at"]
_LOGGER.debug(
"got new token %s with expiration date %s",
self._oauth_token,
self._token_expiration,
)

state_bytes = os.urandom(16)
state = base64.urlsafe_b64encode(state_bytes).rstrip(b'=')
except Exception as ex: # pylint: disable=broad-except
_LOGGER.exception(ex)
raise ex

authenticate_url = AUTH_URL.format(gcdm_base_url=oauth_settings["gcdmBaseUrl"])
authenticate_headers = {
"Content-Type": "application/x-www-form-urlencoded",
def _login_row_na(self):
"""Login to Rest of World and North America."""
try:
# We need a session for cross-request cookies
oauth_session = requests.Session()
# oauth_settings = get_gcdm_oauth_authorization(self._region)
r_oauth_settings = oauth_session.get(
OAUTH_CONFIG_URL.format(server=self.server_url),
headers={
"ocp-apim-subscription-key": get_ocp_apim_key(self._region),
"x-user-agent": X_USER_AGENT.format("bmw"),
}
)
r_oauth_settings.raise_for_status()
oauth_settings = r_oauth_settings.json()

# My BMW login flow
_LOGGER.debug("Authenticating against GCDM with MyBMW flow.")

# Setting up PKCS data
verifier_bytes = os.urandom(64)
code_verifier = base64.urlsafe_b64encode(verifier_bytes).rstrip(b'=')

challenge_bytes = hashlib.sha256(code_verifier).digest()
code_challenge = base64.urlsafe_b64encode(challenge_bytes).rstrip(b'=')

state_bytes = os.urandom(16)
state = base64.urlsafe_b64encode(state_bytes).rstrip(b'=')

authenticate_url = AUTH_URL.format(gcdm_base_url=oauth_settings["gcdmBaseUrl"])
authenticate_headers = {
"Content-Type": "application/x-www-form-urlencoded",
}

# we really need all of these parameters
oauth_base_values = {
"client_id": oauth_settings["clientId"],
"response_type": "code",
"redirect_uri": oauth_settings["returnUrl"],
"state": state,
"nonce": "login_nonce",
"scope": " ".join(oauth_settings["scopes"]),
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}

authenticate_data = urllib.parse.urlencode(
dict(
oauth_base_values,
**{
"grant_type": "authorization_code",
"username": self._username,
"password": self._password,
}
)
)
response = oauth_session.post(
authenticate_url,
headers=authenticate_headers,
data=authenticate_data,
)
response.raise_for_status()
authorization = dict(urllib.parse.parse_qsl(response.json()["redirect_to"]))["authorization"]

# we really need all of these parameters
oauth_base_values = {
"client_id": oauth_settings["clientId"],
"response_type": "code",
"redirect_uri": oauth_settings["returnUrl"],
"state": state,
"nonce": "login_nonce",
"scope": " ".join(oauth_settings["scopes"]),
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
code_data = urllib.parse.urlencode(
dict(oauth_base_values, **{"authorization": authorization})
)
response = oauth_session.post(
authenticate_url, headers=authenticate_headers, data=code_data, allow_redirects=False
)
response.raise_for_status()
code = dict(urllib.parse.parse_qsl(response.next.path_url.split('?')[1]))["code"]

token_url = oauth_settings["tokenEndpoint"]

# My BMW login flow
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
# "Authorization": oauth_settings["token"]["Authorization"],
}
token_values = {
"code": code,
"code_verifier": code_verifier,
"redirect_uri": oauth_settings["returnUrl"],
"grant_type": "authorization_code",
}

token_data = urllib.parse.urlencode(token_values)
response = oauth_session.post(
token_url,
headers=token_headers,
data=token_data,
auth=HTTPBasicAuth(oauth_settings["clientId"], oauth_settings["clientSecret"])
)
response.raise_for_status()
response_json = response.json()

authenticate_data = urllib.parse.urlencode(
dict(
oauth_base_values,
**{
"grant_type": "authorization_code",
"username": self._username,
"password": self._password,
}
)
)
response = oauth_session.post(
authenticate_url,
headers=authenticate_headers,
data=authenticate_data,
)
response.raise_for_status()
authorization = dict(urllib.parse.parse_qsl(response.json()["redirect_to"]))["authorization"]
expiration_time = int(response_json["expires_in"])
expires_at = datetime.datetime.utcnow() + datetime.timedelta(
seconds=expiration_time
)

code_data = urllib.parse.urlencode(
dict(oauth_base_values, **{"authorization": authorization})
)
response = oauth_session.post(
authenticate_url, headers=authenticate_headers, data=code_data, allow_redirects=False
)
response.raise_for_status()
code = dict(urllib.parse.parse_qsl(response.next.path_url.split('?')[1]))["code"]
return {
"access_token": response_json["access_token"],
"expires_at": expires_at
}
except HTTPError as ex:
try:
err = response.json()
_LOGGER.error("Authentication failed (%s): %s", err["error"], err["error_description"])
except Exception: # pylint: disable=broad-except
_LOGGER.error("Authentication failed: %s", response.text)
raise ex

token_url = oauth_settings["tokenEndpoint"]
def _login_china(self):
try:
login_header = {'x-user-agent': X_USER_AGENT.format("bmw")}

# My BMW login flow
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
# "Authorization": oauth_settings["token"]["Authorization"],
}
token_values = {
"code": code,
"code_verifier": code_verifier,
"redirect_uri": oauth_settings["returnUrl"],
"grant_type": "authorization_code",
}
response = requests.request(
"GET",
AUTH_CHINA_PUBLIC_KEY_URL.format(server=self.server_url),
headers=login_header,
)
response.raise_for_status()
pem_public_key = response.json()["data"]["value"]

public_key = RSA.import_key(pem_public_key)
cipher_rsa = PKCS1_v1_5.new(public_key)
encrypted = cipher_rsa.encrypt(self._password.encode())
pw_encrypted = base64.b64encode(encrypted).decode('UTF-8')

response = requests.request(
"POST",
AUTH_CHINA_LOGIN_URL.format(server=self.server_url),
headers=login_header,
json={"mobile": self._username, "password": pw_encrypted}
)
response.raise_for_status()
response_json = response.json()["data"]

token_data = urllib.parse.urlencode(token_values)
response = oauth_session.post(
token_url,
headers=token_headers,
data=token_data,
auth=HTTPBasicAuth(oauth_settings["clientId"], oauth_settings["clientSecret"])
)
response.raise_for_status()
response_json = response.json()
decoded_token = jwt.decode(
response_json["access_token"],
algorithms=["HS256"],
options={"verify_signature": False}
)

self._oauth_token = response_json["access_token"]
expiration_time = int(response_json["expires_in"])
self._token_expiration = datetime.datetime.now() + datetime.timedelta(
seconds=expiration_time
)
_LOGGER.debug(
"got new token %s with expiration date %s",
self._oauth_token,
self._token_expiration,
)
except HTTPError as ex:
try:
err = response.json()
_LOGGER.error("Authentication failed (%s): %s", err["error"], err["error_description"])
except Exception: # pylint: disable=broad-except
_LOGGER.error("Authentication failed: %s", response.text)
raise ex
except Exception as ex: # pylint: disable=broad-except
_LOGGER.exception(ex)
raise ex
return {
"access_token": response_json["access_token"],
"expires_at": datetime.datetime.utcfromtimestamp(decoded_token["exp"])
}
except HTTPError as ex:
try:
err = response.json()
_LOGGER.error("Authentication failed (%s): %s", err["error"], err["error_description"])
except Exception: # pylint: disable=broad-except
_LOGGER.error("Authentication failed: %s", response.text)
raise ex

def request_header(self, brand: CarBrand = None) -> Dict[str, str]:
"""Generate a header for HTTP requests to the server."""
Expand Down
4 changes: 4 additions & 0 deletions bimmer_connected/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
X_USER_AGENT = 'android(v1.07_20200330);{};1.7.0(11152)'

BASE_URL = 'https://{server}'

AUTH_CHINA_PUBLIC_KEY_URL = BASE_URL + '/eadrax-coas/v1/cop/publickey'
AUTH_CHINA_LOGIN_URL = BASE_URL + '/eadrax-coas/v1/login/pwd'

OAUTH_CONFIG_URL = BASE_URL + '/eadrax-ucs/v1/presentation/oauth/config'

VEHICLES_URL = BASE_URL + '/eadrax-vcs/v1/vehicles'
Expand Down
8 changes: 2 additions & 6 deletions bimmer_connected/country_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class Regions(Enum):
"""Regions of the world with separate servers."""
NORTH_AMERICA = 0
# CHINA = 1
CHINA = 1
REST_OF_WORLD = 2


Expand All @@ -23,7 +23,7 @@ class Regions(Enum):
_SERVER_URLS_EADRAX = {
Regions.NORTH_AMERICA: "cocoapi.bmwgroup.us",
Regions.REST_OF_WORLD: "cocoapi.bmwgroup.com",
# Regions.CHINA: None,
Regions.CHINA: "myprofile.bmw.com.cn",
}

_OCP_APIM_KEYS = {
Expand Down Expand Up @@ -85,10 +85,6 @@ def get_region_from_name(name: str) -> Regions:
This function is not case-sensitive.
"""
if name.lower() == 'china':
raise NotImplementedError(
"Support for region 'china' is currently not available. Please use `bimmer_connected<0.8.0"
)
for region in Regions:
if name.lower() == region.name.lower():
return region
Expand Down
2 changes: 1 addition & 1 deletion bimmer_connected/vehicle_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class FuelIndicator(SerializableBaseClass): # pylint: disable=too-few-public-me
This class provides a nicer API than parsing the JSON format directly.
"""

def __init__(self, fuel_indicator_dict: dict):
def __init__(self, fuel_indicator_dict: List):
self.remaining_range_fuel: int = None
self.remaining_range_electric: int = None
self.remaining_range_combined: int = None
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
requests
typing>=3,<4;python_version<"3.5"
pycryptodome==3.11.0
7 changes: 7 additions & 0 deletions test/responses/auth/auth_cn_login_error.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"data": null,
"code": 699121,
"error": true,
"msgType": "toast",
"description": "系统异常,请稍后再试!"
}
12 changes: 12 additions & 0 deletions test/responses/auth/auth_cn_login_pwd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"data": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJqdGkiOiJEVU1NWSQxJEEkMTYzNzcwNzkxNjc4MiIsIm5iZiI6MTYzNzcwNzkxNiwiZXhwIjoxNjM3NzExMjE2LCJpYXQiOjE2Mzc3MDc5MTZ9.hpi-P97W68g7avGwu9dcBRapIsaG4F8MwOdPHe6PuTA",
"token_type": "Bearer",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJqdGkiOiJEVU1NWSQxJFIkMTYzNzcwNTc5NTA3NSIsIm5iZiI6MTYzNzcwNTc5NSwiZXhwIjoxNjQ1NDgwODk1LCJpYXQiOjE2Mzc3MDU3OTV9.dGVpbpfrJOo895jiU6Rk16ESYz80klfJbIX9M4KD1hQ",
"usid": "DUMMY",
"cid": "DUMMY"
},
"code": 200,
"error": false,
"description": "ok"
}
Loading

0 comments on commit a8aead1

Please sign in to comment.