forked from JayChen35/spotify-to-mp3-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
spotify_to_mp3.py
212 lines (188 loc) · 8.79 KB
/
spotify_to_mp3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# Downloads a Spotify playlist into a folder of MP3 tracks
# Jason Chen, 21 June 2020
import os
import spotipy
import spotipy.oauth2 as oauth2
import yt_dlp
from youtube_search import YoutubeSearch
import multiprocessing
# **************PLEASE READ THE README.md FOR USE INSTRUCTIONS**************
def write_tracks(text_file: str, tracks: dict):
# Writes the information of all tracks in the playlist to a text file.
# This includins the name, artist, and spotify URL. Each is delimited by a comma.
with open(text_file, 'w+', encoding='utf-8') as file_out:
while True:
for item in tracks['items']:
if 'track' in item:
track = item['track']
else:
track = item
try:
track_url = track['external_urls']['spotify']
track_name = track['name']
track_artist = track['artists'][0]['name']
csv_line = track_name + "," + track_artist + "," + track_url + "\n"
try:
file_out.write(csv_line)
except UnicodeEncodeError: # Most likely caused by non-English song names
print("Track named {} failed due to an encoding error. This is \
most likely due to this song having a non-English name.".format(track_name))
except KeyError:
print(u'Skipping track {0} by {1} (local only?)'.format(
track['name'], track['artists'][0]['name']))
# 1 page = 50 results, check if there are more pages
if tracks['next']:
tracks = spotify.next(tracks)
else:
break
def write_playlist(username: str, playlist_id: str):
results = spotify.user_playlist(username, playlist_id, fields='tracks,next,name')
playlist_name = results['name']
text_file = u'{0}.txt'.format(playlist_name, ok='-_()[]{}')
print(u'Writing {0} tracks to {1}.'.format(results['tracks']['total'], text_file))
tracks = results['tracks']
write_tracks(text_file, tracks)
return playlist_name
def find_and_download_songs(reference_file: str):
TOTAL_ATTEMPTS = 10
with open(reference_file, "r", encoding='utf-8') as file:
for line in file:
temp = line.split(",")
name, artist = temp[0], temp[1]
text_to_search = artist + " - " + name
best_url = None
attempts_left = TOTAL_ATTEMPTS
while attempts_left > 0:
try:
results_list = YoutubeSearch(text_to_search, max_results=1).to_dict()
best_url = "https://www.youtube.com{}".format(results_list[0]['url_suffix'])
break
except IndexError:
attempts_left -= 1
print("No valid URLs found for {}, trying again ({} attempts left).".format(
text_to_search, attempts_left))
if best_url is None:
print("No valid URLs found for {}, skipping track.".format(text_to_search))
continue
# Run you-get to fetch and download the link's audio
print("Initiating download for {}.".format(text_to_search))
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([best_url])
# Multiprocessed implementation of find_and_download_songs
# This method is responsible for manging and distributing the multi-core workload
def multicore_find_and_download_songs(reference_file: str, cpu_count: int):
# Extract songs from the reference file
lines = []
with open(reference_file, "r", encoding='utf-8') as file:
for line in file:
lines.append(line)
# Process allocation of songs per cpu
number_of_songs = len(lines)
songs_per_cpu = number_of_songs // cpu_count
# Calculates number of songs that dont evenly fit into the cpu list
# i.e. 4 cores and 5 songs, one core will have to process 1 extra song
extra_songs = number_of_songs - (cpu_count * songs_per_cpu)
# Create a list of number of songs which by index allocates it to a cpu
# 4 core cpu and 5 songs [2, 1, 1, 1] where each item is the number of songs
# Core 0^ 1^ 2^ 3^
cpu_count_list = []
for cpu in range(cpu_count):
songs = songs_per_cpu
if cpu < extra_songs:
songs = songs + 1
cpu_count_list.append(songs)
# Based on the cpu song allocation list split up the reference file
index = 0
file_segments = []
for cpu in cpu_count_list:
right = cpu + index
segment = lines[index:right]
index = index + cpu
file_segments.append(segment)
# Prepares all of the seperate processes before starting them
# Pass each process a new shorter list of songs vs 1 process being handed all of the songs
processes = []
segment_index = 0
for segment in file_segments:
p = multiprocessing.Process(target = multicore_handler, args=(segment, segment_index,))
processes.append(p)
segment_index = segment_index + 1
# Start the processes
for p in processes:
p.start()
# Wait for the processes to complete and exit as a group
for p in processes:
p.join()
# Just a wrapper around the original find_and_download_songs method to ensure future compatibility
# Preserves the same functionality just allows for several shorter lists to be used and cleaned up
def multicore_handler(reference_list: list, segment_index: int):
# Create reference filename based off of the process id (segment_index)
reference_filename = "{}.txt".format(segment_index)
# Write the reference_list to a new "reference_file" to enable compatibility
with open(reference_filename, 'w+', encoding='utf-8') as file_out:
for line in reference_list:
file_out.write(line)
# Call the original find_and_download method
find_and_download_songs(reference_filename)
# Clean up the extra list that was generated
if(os.path.exists(reference_filename)):
os.remove(reference_filename)
# This is prompt to handle the multicore queries
# An effort has been made to create an easily automated interface
# Autoeneable: bool allows for no prompts and defaults to max core usage
# Maxcores: int allows for automation of set number of cores to be used
# Buffercores: int allows for an allocation of unused cores (default 1)
def enable_multicore(autoenable=False, maxcores=None, buffercores=1):
native_cpu_count = multiprocessing.cpu_count() - buffercores
if autoenable:
if maxcores:
if(maxcores <= native_cpu_count):
return maxcores
else:
print("Too many cores requested, single core operation fallback")
return 1
return multiprocessing.cpu_count() - 1
multicore_query = input("Enable multiprocessing (Y or N): ")
if multicore_query not in ["Y","y","Yes","YES","YEs",'yes']:
return 1
core_count_query = int(input("Max core count (0 for allcores): "))
if(core_count_query == 0):
return native_cpu_count
if(core_count_query <= native_cpu_count):
return core_count_query
else:
print("Too many cores requested, single core operation fallback")
return 1
if __name__ == "__main__":
# Parameters
print("Please read README.md for use instructions.")
client_id = input("Client ID: ")
client_secret = input("Client secret: ")
username = input("Spotify username: ")
playlist_uri = input("Playlist URI/Link: ")
if playlist_uri.find("https://open.spotify.com/playlist/") != -1:
playlist_uri = playlist_uri.replace("https://open.spotify.com/playlist/", "")
multicore_support = enable_multicore(autoenable=False, maxcores=None, buffercores=1)
auth_manager = oauth2.SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
spotify = spotipy.Spotify(auth_manager=auth_manager)
playlist_name = write_playlist(username, playlist_uri)
reference_file = "{}.txt".format(playlist_name)
# Create the playlist folder
if not os.path.exists(playlist_name):
os.makedirs(playlist_name)
os.rename(reference_file, playlist_name + "/" + reference_file)
os.chdir(playlist_name)
# Enable multicore support
if multicore_support > 1:
multicore_find_and_download_songs(reference_file, multicore_support)
else:
find_and_download_songs(reference_file)
print("Operation complete.")