diff --git a/tidalapi/request.py b/tidalapi/request.py index ff25469..4c6268a 100644 --- a/tidalapi/request.py +++ b/tidalapi/request.py @@ -50,7 +50,11 @@ class Requests(object): """A class for handling api requests to TIDAL.""" + user_agent: str + def __init__(self, session: "Session"): + # More Android User-Agents here: https://user-agents.net/browsers/android + self.user_agent = "Mozilla/5.0 (Linux; Android 12; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/91.0.4472.114 Safari/537.36" self.session = session self.config = session.config @@ -76,6 +80,10 @@ def basic_request( if not headers: headers = {} + + if "User-Agent" not in headers: + headers["User-Agent"] = self.user_agent + if self.session.token_type and self.session.access_token is not None: headers["authorization"] = ( self.session.token_type + " " + self.session.access_token diff --git a/tidalapi/session.py b/tidalapi/session.py index c55b590..ed3eb41 100644 --- a/tidalapi/session.py +++ b/tidalapi/session.py @@ -21,9 +21,11 @@ import base64 import concurrent.futures import datetime +import hashlib import json import locale import logging +import os import random import time import uuid @@ -43,7 +45,7 @@ cast, no_type_check, ) -from urllib.parse import urljoin +from urllib.parse import parse_qs, urlencode, urljoin, urlsplit import requests @@ -95,6 +97,8 @@ class Config: Additionally, num_videos will turn into num_tracks in playlists. """ + api_oauth2_token: str = "https://auth.tidal.com/v1/oauth2/token" + api_pkce_auth: str = "https://login.tidal.com/authorize" api_v1_location: str = "https://api.tidal.com/v1/" api_v2_location: str = "https://api.tidal.com/v2/" api_token: str @@ -105,6 +109,12 @@ class Config: quality: str video_quality: str video_url: str = "https://resources.tidal.com/videos/%s/%ix%i.mp4" + # Necessary for PKCE authorization only + client_unique_key: str + code_verifier: str + code_challenge: str + pkce_uri_redirect: str = "https://tidal.com/android/login/auth" + client_id_pkce: str @no_type_check def __init__( @@ -184,6 +194,23 @@ def __init__( self.client_id = "".join(self.client_id) self.client_secret = self.client_id self.client_id = self.api_token + # PKCE Authorization. We will keep the former `client_id` as a fallback / will only be used for non PCKE + # authorizations. + self.client_unique_key = format(random.getrandbits(64), "02x") + self.code_verifier = base64.urlsafe_b64encode(os.urandom(32))[:-1].decode( + "utf-8" + ) + self.code_challenge = base64.urlsafe_b64encode( + hashlib.sha256(self.code_verifier.encode("utf-8")).digest() + )[:-1].decode("utf-8") + self.client_id_pkce = base64.b64decode( + base64.b64decode(b"TmtKRVUxSmtjRXM=") + + base64.b64decode(b"NWFIRkZRbFJuVlE9PQ==") + ).decode("utf-8") + self.client_secret_pkce = base64.b64decode( + base64.b64decode(b"ZUdWMVVHMVpOMjVpY0ZvNVNVbGlURUZqVVQ=") + + base64.b64decode(b"a3pjMmhyWVRGV1RtaGxWVUZ4VGpaSlkzTjZhbFJIT0QwPQ==") + ).decode("utf-8") class Case(Enum): @@ -359,7 +386,7 @@ def load_oauth_session( :param refresh_token: (Optional) A refresh token that lets you get a new access token after it has expired :param expiry_time: (Optional) The datetime the access token will expire - :return: True if we believe the log in was successful, otherwise false. + :return: True if we believe the login was successful, otherwise false. """ self.token_type = token_type self.access_token = access_token @@ -431,6 +458,109 @@ def login_oauth_file(self, oauth_file: Path) -> bool: log.info("TIDAL Login KO") return False + def login_pkce(self, fn_print: Callable[[str], None] = print) -> None: + """Login handler for PKCE based authentication. This is the only way how to get + access to HiRes (Up to 24-bit, 192 kHz) FLAC files. + + This handler will ask you to follow a URL, process with the login in the browser + and copy & paste the URL of the redirected browser page. + + :param fn_print: A function which will be called to print the instructions, + defaults to `print()`. + :type fn_print: Callable, optional + :return: + """ + # Get login url + url_login: str = self.pkce_login_url() + + fn_print("READ CAREFULLY!") + fn_print("---------------") + fn_print( + "You need to open this link and login with your username and password. " + "Afterwards you will be redirected to an 'Oops' page. " + "To complete the login you must copy the URL from this 'Oops' page and paste it to the input field." + ) + fn_print(url_login) + + # Get redirect URL from user input. + url_redirect: str = input("Paste 'Ooops' page URL here and press :") + # Query for auth tokens + json: dict[str, Union[str, int]] = self.pkce_get_auth_token(url_redirect) + + # Parse and set tokens. + self.process_auth_token(json) + + # Swap the client_id and secret + #self.client_enable_hires() + + def client_enable_hires(self): + self.config.client_id = self.config.client_id_pkce + self.config.client_secret = self.config.client_secret_pkce + + def pkce_login_url(self) -> str: + """Returns the Login-URL to login via web browser. + + :return: The URL the user has to use for login. + :rtype: str + """ + params: request.Params = { + "response_type": "code", + "redirect_uri": self.config.pkce_uri_redirect, + "client_id": self.config.client_id_pkce, + "lang": "EN", + "appMode": "android", + "client_unique_key": self.config.client_unique_key, + "code_challenge": self.config.code_challenge, + "code_challenge_method": "S256", + "restrict_signup": "true", + } + + return self.config.api_pkce_auth + "?" + urlencode(params) + + def pkce_get_auth_token(self, url_redirect: str) -> dict[str, Union[str, int]]: + """Parses the redirect url to extract access and refresh tokens. + + :param url_redirect: URL of the 'Ooops' page, where the user was redirected to + after login. + :type url_redirect: str + :return: A parsed JSON object with access and refresh tokens and other + information. + :rtype: dict[str, str | int] + """ + # w_usr=WRITE_USR, r_usr=READ_USR_DATA, w_sub=WRITE_SUBSCRIPTION + scope_default: str = "r_usr+w_usr+w_sub" + + # Extract the code parameter from query string + if url_redirect and "https://" in url_redirect: + code: str = parse_qs(urlsplit(url_redirect).query)["code"][0] + else: + raise Exception("The provided redirect url looks wrong: " + url_redirect) + + # Set post data and call the API + data: request.Params = { + "code": code, + "client_id": self.config.client_id_pkce, + "grant_type": "authorization_code", + "redirect_uri": self.config.pkce_uri_redirect, + "scope": scope_default, + "code_verifier": self.config.code_verifier, + "client_unique_key": self.config.client_unique_key, + } + response = self.request_session.post(self.config.api_oauth2_token, data) + + # Check response + if not response.ok: + log.error("Login failed: %s", response.text) + response.raise_for_status() + + # Parse the JSON response. + try: + token: dict[str, Union[str, int]] = response.json() + except: + raise Exception("Wrong one-time authorization code", response) + + return token + def login_oauth_simple(self, function: Callable[[str], None] = print) -> None: """Login to TIDAL using a remote link. You can select what function you want to use to display the link. @@ -496,6 +626,16 @@ def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]: def _process_link_login(self, json: JsonObj) -> None: json = self._wait_for_link_login(json) + self.process_auth_token(json) + + def process_auth_token(self, json: dict[str, Union[str, int]]) -> None: + """Parses the authorization response and sets the token values to the specific + variables for further usage. + + :param json: Parsed JSON response after login / authorization. + :type json: dict[str, str | int] + :return: None + """ self.access_token = json["access_token"] self.expiry_time = datetime.datetime.utcnow() + datetime.timedelta( seconds=json["expires_in"] @@ -512,7 +652,7 @@ def _wait_for_link_login(self, json: JsonObj) -> Any: expiry = float(json["expiresIn"]) interval = float(json["interval"]) device_code = json["deviceCode"] - url = "https://auth.tidal.com/v1/oauth2/token" + url = self.config.api_oauth2_token params = { "client_id": self.config.client_id, "client_secret": self.config.client_secret, @@ -541,7 +681,7 @@ def token_refresh(self, refresh_token: str) -> bool: :return: True if we believe the token was successfully refreshed, otherwise False """ - url = "https://auth.tidal.com/v1/oauth2/token" + url = self.config.api_oauth2_token params = { "grant_type": "refresh_token", "refresh_token": refresh_token,