Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements PKCE Authorization to enable access to HiRess files. #221

Merged
merged 25 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6f031a0
✨ First working PKCE login implement. HiRes downloads are possible now.
exislow Jan 19, 2024
60a3bad
✨ Set appropriate User-Agent. Fixes tamland/python-tidal#217
exislow Jan 22, 2024
d557286
✨ Implemented PKCE authorization to enable HiRes FLAC downloads. Fixe…
exislow Jan 23, 2024
69bf50c
♻️ Minor refactoring of urls.
exislow Jan 23, 2024
3b1d285
♻️ Code format.
exislow Jan 23, 2024
abf1643
🛠️ Reverted to actual login credentials for device linking.
exislow Jan 23, 2024
2a43d9a
✨ Changed from private to public methods.
exislow Jan 26, 2024
261f237
chore(deps-dev): bump pillow from 10.0.1 to 10.2.0
dependabot[bot] Jan 22, 2024
3d7d7b7
Use correct parse object
tehkillerbee Nov 29, 2023
e058766
Add OAuth file login load/save functionality.
tehkillerbee Jan 26, 2024
d227800
Add misc gitignores (json, csv)
tehkillerbee Jan 26, 2024
33ab53d
Added example script for transferring user favourites
tehkillerbee Jan 26, 2024
b09f396
Fixed header
tehkillerbee Jan 26, 2024
e211b5e
Update/cleanup readme, move example to subdir
tehkillerbee Jan 26, 2024
6173d12
Use oauth file login
tehkillerbee Jan 26, 2024
dab095b
Use same oauth file path as other examples
tehkillerbee Jan 26, 2024
34c05a3
Fix formatting
tehkillerbee Jan 26, 2024
b6c2aa2
Include request response on error. Print as warning.
tehkillerbee Jan 26, 2024
91226cb
Add path to both v1 and v2 api's
tehkillerbee Jan 26, 2024
1858fc1
Fix formatting
tehkillerbee Jan 26, 2024
472a018
✨ First working PKCE login implement. HiRes downloads are possible now.
exislow Jan 19, 2024
7b07328
✨ Implemented PKCE authorization to enable HiRes FLAC downloads. Fixe…
exislow Jan 23, 2024
c41d1f3
♻️ Code format.
exislow Jan 23, 2024
656833c
✨ Added PKCE secret and client id swap.
exislow Jan 27, 2024
8a5f54b
Merge branch 'master' into 188-support-for-hires
tehkillerbee Jan 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tidalapi/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
class Requests(object):
"""A class for handling api requests to TIDAL."""

user_agent: str

def __init__(self, session: "Session"):
# More Android User-Agents here: https://user-agents.net/browsers/android
self.user_agent = "Mozilla/5.0 (Linux; Android 12; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/91.0.4472.114 Safari/537.36"
self.session = session
self.config = session.config

Expand All @@ -76,6 +80,10 @@ def basic_request(

if not headers:
headers = {}

if "User-Agent" not in headers:
headers["User-Agent"] = self.user_agent

if self.session.token_type and self.session.access_token is not None:
headers["authorization"] = (
self.session.token_type + " " + self.session.access_token
Expand Down
137 changes: 133 additions & 4 deletions tidalapi/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import base64
import concurrent.futures
import datetime
import hashlib
import logging
import os
import random
import time
import uuid
Expand All @@ -40,7 +42,7 @@
cast,
no_type_check,
)
from urllib.parse import urljoin
from urllib.parse import parse_qs, urlencode, urljoin, urlsplit

import requests

Expand Down Expand Up @@ -93,6 +95,8 @@ class Config:
"""

api_location: str = "https://api.tidal.com/v1/"
api_oauth2_token: str = "https://auth.tidal.com/v1/oauth2/token"
api_pkce_auth: str = "https://login.tidal.com/authorize"
api_token: str
client_id: str
client_secret: str
Expand All @@ -101,6 +105,12 @@ class Config:
quality: str
video_quality: str
video_url: str = "https://resources.tidal.com/videos/%s/%ix%i.mp4"
# Necessary for PKCE authorization only
client_unique_key: str
code_verifier: str
code_challenge: str
pkce_uri_redirect: str = "https://tidal.com/android/login/auth"
client_id_pkce: str

@no_type_check
def __init__(
Expand Down Expand Up @@ -180,6 +190,19 @@ def __init__(
self.client_id = "".join(self.client_id)
self.client_secret = self.client_id
tehkillerbee marked this conversation as resolved.
Show resolved Hide resolved
self.client_id = self.api_token
# PKCE Authorization. We will keep the former `client_id` as a fallback / will only be used for non PCKE
# authorizations.
self.client_unique_key = format(random.getrandbits(64), "02x")
self.code_verifier = base64.urlsafe_b64encode(os.urandom(32))[:-1].decode(
"utf-8"
)
self.code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(self.code_verifier.encode("utf-8")).digest()
)[:-1].decode("utf-8")
self.client_id_pkce = base64.b64decode(
base64.b64decode(b"TmtKRVUxSmtjRXM=")
+ base64.b64decode(b"NWFIRkZRbFJuVlE9PQ==")
).decode("utf-8")


class Case(Enum):
Expand Down Expand Up @@ -355,7 +378,7 @@ def load_oauth_session(
:param refresh_token: (Optional) A refresh token that lets you get a new access
token after it has expired
:param expiry_time: (Optional) The datetime the access token will expire
:return: True if we believe the log in was successful, otherwise false.
:return: True if we believe the login was successful, otherwise false.
"""
self.token_type = token_type
self.access_token = access_token
Expand Down Expand Up @@ -399,6 +422,102 @@ def login(self, username: str, password: str) -> bool:
self.user = user.User(self, user_id=body["userId"]).factory()
return True

def login_pkce(self, fn_print: Callable[[str], None] = print) -> None:
"""Login handler for PKCE based authentication. This is the only way how to get
access to HiRes (Up to 24-bit, 192 kHz) FLAC files.

This handler will ask you to follow a URL, process with the login in the browser
and copy & paste the URL of the redirected browser page.

:param fn_print: A function which will be called to print the instructions,
defaults to `print()`.
:type fn_print: Callable, optional
:return:
"""
# Get login url
url_login: str = self._pkce_login_url()

fn_print("READ CAREFULLY!")
fn_print("---------------")
fn_print(
"You need to open this link and login with your username and password. "
"Afterwards you will be redirected to an 'Oops' page. "
"To complete the login you must copy the URL from this 'Oops' page and paste it to the input field."
)
fn_print(url_login)

# Get redirect URL from user input.
url_redirect: str = input("Paste 'Ooops' page URL here and press <ENTER>:")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get this url programmatically rather than require the copy paste?

If not, can we then make other called functions non-private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't realise that my pending comment needed to actually be set as a review, also, did not necessarily mean to review, rather just leave a comment.
I actually mean to submit this days ago. Apologies for taking so long to realise my mistake

# Query for auth tokens
json: dict[str, Union[str, int]] = self._pkce_get_auth_token(url_redirect)

# Parse and set tokens.
self._process_auth_token(json)

def _pkce_login_url(self) -> str:
"""Returns the Login-URL to login via web browser.

:return: The URL the user has to use for login.
:rtype: str
"""
params: request.Params = {
"response_type": "code",
"redirect_uri": self.config.pkce_uri_redirect,
"client_id": self.config.client_id_pkce,
"lang": "EN",
"appMode": "android",
"client_unique_key": self.config.client_unique_key,
"code_challenge": self.config.code_challenge,
"code_challenge_method": "S256",
"restrict_signup": "true",
}

return self.config.api_pkce_auth + "?" + urlencode(params)

def _pkce_get_auth_token(self, url_redirect: str) -> dict[str, Union[str, int]]:
"""Parses the redirect url to extract access and refresh tokens.

:param url_redirect: URL of the 'Ooops' page, where the user was redirected to
after login.
:type url_redirect: str
:return: A parsed JSON object with access and refresh tokens and other
information.
:rtype: dict[str, str | int]
"""
# w_usr=WRITE_USR, r_usr=READ_USR_DATA, w_sub=WRITE_SUBSCRIPTION
scope_default: str = "r_usr+w_usr+w_sub"

# Extract the code parameter from query string
if url_redirect and "https://" in url_redirect:
code: str = parse_qs(urlsplit(url_redirect).query)["code"][0]
else:
raise Exception("The provided redirect url looks wrong: " + url_redirect)

# Set post data and call the API
data: request.Params = {
"code": code,
"client_id": self.config.client_id_pkce,
"grant_type": "authorization_code",
"redirect_uri": self.config.pkce_uri_redirect,
"scope": scope_default,
"code_verifier": self.config.code_verifier,
"client_unique_key": self.config.client_unique_key,
}
response = self.request_session.post(self.config.api_oauth2_token, data)

# Check response
if not response.ok:
log.error("Login failed: %s", response.text)
response.raise_for_status()

# Parse the JSON response.
try:
token: dict[str, Union[str, int]] = response.json()
except:
raise Exception("Wrong one-time authorization code", response)

return token

def login_oauth_simple(self, function: Callable[[str], None] = print) -> None:
"""Login to TIDAL using a remote link. You can select what function you want to
use to display the link.
Expand Down Expand Up @@ -439,6 +558,16 @@ def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:

def _process_link_login(self, json: JsonObj) -> None:
json = self._wait_for_link_login(json)
self._process_auth_token(json)

def _process_auth_token(self, json: dict[str, Union[str, int]]) -> None:
"""Parses the authorization response and sets the token values to the specific
variables for further usage.

:param json: Parsed JSON response after login / authorization.
:type json: dict[str, str | int]
:return: None
"""
self.access_token = json["access_token"]
self.expiry_time = datetime.datetime.utcnow() + datetime.timedelta(
seconds=json["expires_in"]
Expand All @@ -455,7 +584,7 @@ def _wait_for_link_login(self, json: JsonObj) -> Any:
expiry = float(json["expiresIn"])
interval = float(json["interval"])
device_code = json["deviceCode"]
url = "https://auth.tidal.com/v1/oauth2/token"
url = self.config.api_oauth2_token
params = {
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
Expand Down Expand Up @@ -484,7 +613,7 @@ def token_refresh(self, refresh_token: str) -> bool:
:return: True if we believe the token was successfully refreshed, otherwise
False
"""
url = "https://auth.tidal.com/v1/oauth2/token"
url = self.config.api_oauth2_token
params = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
Expand Down