Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] 添加网易云音乐查询接口 #79

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ share/python-wheels/
*.egg
MANIFEST

.vscode/*

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
Expand Down
7 changes: 4 additions & 3 deletions api/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

from mod import searchx

headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/"}

# 跟踪重定向


def follow_redirects(url, max_redirects=10):
for _ in range(max_redirects):
response = requests.head(url, allow_redirects=False)
Expand All @@ -26,12 +29,10 @@ def local_cover_search(title: str, artist: str, album: str):
result: list = searchx.search_all(title=title, artist=artist, album=album, timeout=30)
for item in result:
if cover_url := item.get('cover'):
res = requests.get(cover_url,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/"})
res = requests.get(cover_url, headers=headers)
if res.status_code == 200:
return res.content, 200, {"Content-Type": res.headers['Content-Type']}


@app.route('/cover', methods=['GET'])
@cache.cached(timeout=86400, key_prefix=make_cache_key)
@no_error(exceptions=AttributeError)
Expand Down
9 changes: 5 additions & 4 deletions api/lyrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from mod.auth.authentication import require_auth


def read_file_with_encoding(file_path:str, encodings:list[str]):
def read_file_with_encoding(file_path: str, encodings: list[str]):
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
Expand All @@ -40,7 +40,7 @@ def lyrics():
if path:
lrc_path = os.path.splitext(path)[0] + '.lrc'
if os.path.isfile(lrc_path):
file_content: str|None = read_file_with_encoding(lrc_path, ['utf-8', 'gbk'])
file_content: str | None = read_file_with_encoding(lrc_path, ['utf-8', 'gbk'])
if file_content is not None:
return lrc.standard(file_content)
try:
Expand Down Expand Up @@ -96,8 +96,9 @@ def lrc_json():
for i in lyrics_list:
if not i:
continue
i['lyrics'] = lrc.standard(i['lyrics'])
response.append(i)
if lyric := i.get('lyrics'):
i['lyrics'] = lrc.standard(lyric)
response.append(i)
_response = jsonify(response)
_response.headers['Content-Type'] = 'application/json; charset=utf-8'
return jsonify(response)
7 changes: 6 additions & 1 deletion api/tag.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os

from . import *
Expand All @@ -22,7 +23,11 @@ def set_tag():
"具体请查看<https://docs.lrc.cx/docs/deploy/auth>以启用API鉴权功能。")
return render_template_string(webui.error()), 421

music_data = request.json
try:
# v1.3.2 版本的音流, 传入的 content_type 为空, 会报: UnsupportedMediaType: 415 Unsupported Media Type
music_data = request.json
except:
music_data = json.loads(request.data)
audio_path = music_data.get("path")
if not audio_path:
return "Missing 'path' key in JSON.", 422
Expand Down
7 changes: 5 additions & 2 deletions mod/auth/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ def decrypt(self, data: str) -> str:
:param data: encrypted data
:return: json string
"""
aes = pyaes.AESModeOfOperationCTR(self.key.encode(encoding='utf-8'))
return aes.decrypt(bytes.fromhex(data)).decode(encoding='utf-8')
try:
aes = pyaes.AESModeOfOperationCTR(self.key.encode(encoding='utf-8'))
return aes.decrypt(bytes.fromhex(data)).decode(encoding='utf-8')
except:
return ""

def change_key(self):
self.key = self.gen_key()
Expand Down
4 changes: 2 additions & 2 deletions mod/searchx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from concurrent import futures

from mod.searchx import api, kugou
from mod.searchx import api, kugou, netease


def search_all(title, artist, album, timeout=30):
funcs = [api, kugou]
funcs = [api, kugou, netease]
results = []

def request(task):
Expand Down
268 changes: 268 additions & 0 deletions mod/searchx/netease.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import json
import aiohttp
import asyncio
import logging

from functools import lru_cache

import urllib

from mod import textcompare, tools
from mygo.devtools import no_error

from mod.ttscn import t2s

headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0',
'origin': 'https://music.163.com',
'referer': 'https://music.163.com',
}

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# type: 1-songs, 10-albums, 100-artists, 1000-playlists
COMMON_SEARCH_URL_WANGYI = 'https://music.163.com/api/search/get/web?csrf_token=hlpretag=&hlposttag=&s={}&type={}&offset={}&total=true&limit={}'
ALBUM_SEARCH_URL_WANGYI = 'https://music.163.com/api/album/{}?ext=true'
LYRIC_URL_WANGYI = 'https://music.163.com/api/song/lyric?id={}&lv=1&tv=1'
ARTIST_SEARCH_URL = 'http://music.163.com/api/v1/artist/{}'
ALBUMS_SEARCH_URL = "http://music.163.com/api/artist/albums/{}?offset=0&total=true&limit=300"
ALBUM_INFO_URL = "http://music.163.com/api/album/{}?ext=true"


def listify(obj):
if isinstance(obj, list):
return obj
else:
return [obj]


async def search_artist_blur(session: aiohttp.ClientSession, artist_blur, limit=1):
""" 由于没有选择交互的过程, 因此 artist_blur 如果输入的不准确, 可能会查询到错误的歌手 """
# logging.info('开始搜索: ' + artist_blur)

num = 0
if not artist_blur:
logging.info('Missing artist. Skipping match')
return None

url = COMMON_SEARCH_URL_WANGYI.format(
urllib.parse.quote(artist_blur.lower()), 100, 0, limit)
artists = []
try:
json_data_r = await session.get(url, headers=headers)
response = json.loads(await json_data_r.text())

artist_results = response['result']
num = int(artist_results['artistCount'])
lim = min(limit, num)
# logging.info('搜索到的歌手数量:' + str(lim))
for i in range(lim):
try:
artists = listify(artist_results['artists'])
except:
logging.error('Error retrieving artist search results.')
except:
logging.error('Error retrieving artist search results.')
if len(artists) > 0:
return artists[0]
return None


async def search_albums(session: aiohttp.ClientSession, artist_id):
url = ALBUMS_SEARCH_URL.format(artist_id)
json_data_r = await session.get(url, headers=headers)
response = json.loads(await json_data_r.text())
if response['code'] == 200:
return response['hotAlbums']
return None


def filter_and_get_album_id(album_list, album):
most_similar = None
highest_similarity = 0

for candidate_album in album_list:
if album == candidate_album['name']:
return candidate_album['id']
similarity = textcompare.association(album, candidate_album['name'])
if similarity > highest_similarity:
highest_similarity = similarity
most_similar = candidate_album
return most_similar['id'] if most_similar is not None else None


async def get_album_info_by_id(session: aiohttp.ClientSession, album_id):
url = ALBUM_INFO_URL.format(album_id)
json_data_r = await session.get(url, headers=headers)
response = json.loads(await json_data_r.text())
if response['code'] == 200:
return response['album']
return None


async def get_album_info(session, artist, album):
artist = t2s(artist)
album = t2s(album)
# 1. 根据 artist, 获取 artist_id
if blur_result := await search_artist_blur(session, artist_blur=artist):
artist_id = blur_result['id']
# 2. 根据 artist_id 查询所有专辑
if album_list := await search_albums(session, artist_id):
# 3. 根据 album, 过滤, 并获取到 album_id
if album_id := filter_and_get_album_id(album_list, album):
# 4. 根据 album_id, 查询 album_info
return await get_album_info_by_id(session, album_id)
return None


async def get_cover_url(session: aiohttp.ClientSession, album_id: int):
url = ALBUM_SEARCH_URL_WANGYI.format(album_id)
json_data_r = await session.get(url, headers=headers)
json_data = json.loads(await json_data_r.text())
if json_data.get('album', False) and json_data.get('album').get('picUrl', False):
return json_data['album']['picUrl']
return None


async def get_lyrics(session: aiohttp.ClientSession, track_id: int):
url = LYRIC_URL_WANGYI.format(track_id)
json_data_r = await session.get(url, headers=headers)
json_data = json.loads(await json_data_r.text())
if json_data.get('lrc', False) and json_data.get('lrc').get('lyric', False):
return json_data['lrc']['lyric']
return None


async def a_search(title='', artist='', album=''):
"""
查询封面:
三者都传:获取歌曲封面
不传歌曲标题:获取专辑封面 --- 传歌手/歌曲
只传歌手名:获取歌手图片
查询歌词:
title 不能为空
album, artist 这两个可以为空
"""
if not any((title, artist, album)):
return None

async with aiohttp.ClientSession() as session:
# 查询歌曲, 包括封面和歌词
if title:
return await search_track(session, title=title, artist=artist, album=album)
elif artist and album:
# 只查询专辑封面
return await search_album(session, artist, album)
elif artist:
# 查询艺术家封面
return await search_artist(session, artist)
return None


async def search_artist(session, artist):
# 1. 根据 artist, 获取 artist_id
if blur_result := await search_artist_blur(session, artist_blur=artist):
music_json_data: dict = {
"cover": blur_result['img1v1Url']
}
return listify(music_json_data)
return None


async def search_album(session, artist, album):
if album_info := await get_album_info(session, artist, album):
music_json_data: dict = {
"cover": album_info['picUrl']
}
return listify(music_json_data)
return None


async def search_track(session, title, artist, album):
result_list = []
limit = 10
search_str = ' '.join([item for item in [title, artist, album] if item])
url = COMMON_SEARCH_URL_WANGYI.format(search_str, 1, 0, 100)

response = await session.get(url, headers=headers)

if response.status != 200:
return None

song_info_t: str = await response.text()
song_info: dict = json.loads(song_info_t)
song_info: list[dict] = song_info["result"]["songs"]
if len(song_info) < 1:
return None
candidate_songs = []
for song_item in song_info:
# 有些歌, 查询的 title 可能在别名里, 例如周杰伦的 八度空间-"分裂/离开", 有两个名字.
song_names: list = song_item['alias']
song_names.append(song_item['name'])
artists = song_item.get("artists", None)
singer_name = " ".join([x['name'] for x in artists]) if artists is not None else ""
album_ = song_item.get("album", None)
album_name = album_['name'] if album is not None else ''
# 取所有名字中最高的相似度
title_conform_ratio = max([textcompare.association(title, name) for name in song_names])

artist_conform_ratio = textcompare.assoc_artists(artist, singer_name)
album_conform_ratio = textcompare.association(album, album_name)

ratio: float = (title_conform_ratio * (artist_conform_ratio + album_conform_ratio) / 2.0) ** 0.5

if ratio >= 0.2:
song_id = song_item['id']
album_id = album_[
'id'] if album is not None else None
singer_id = [x['id'] for x in artists][0]
candidate_songs.append(
{'ratio': ratio, "item": {
"artist": singer_name,
"album": album_name,
"title": title,
"artist_id": singer_id,
"album_id": album_id,
"trace_id": song_id
}})

candidate_songs.sort(
key=lambda x: x['ratio'], reverse=True)
if len(candidate_songs) < 1:
return None

candidate_songs = candidate_songs[:min(len(candidate_songs), limit)]

for candidate in candidate_songs:
track = candidate['item']
ratio = candidate['ratio']

cover_url = await get_cover_url(session, track['album_id'])
lyrics = await get_lyrics(session, track['trace_id'])

# 结构化JSON数据
music_json_data: dict = {
"title": track['title'],
"album": track['album'],
"artists": track['artist'],
"lyrics": lyrics,
"cover": cover_url,
"hash": tools.calculate_md5(
f"title:{track['title']};artists:{track['artist']};album:{track['album']}")
}

result_list.append(music_json_data)
return result_list


@lru_cache(maxsize=64)
@no_error(throw=logger.info,
exceptions=(aiohttp.ClientError, asyncio.TimeoutError, KeyError, IndexError, AttributeError))
def search(title='', artist='', album=''):
return asyncio.run(a_search(title=title, artist=artist, album=album))


if __name__ == "__main__":
print(search(album="十年"))