forked from john-corcoran/internetarchive-downloader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathia_downloader.py
1915 lines (1789 loc) · 81.4 KB
/
ia_downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""Script to perform simultaneous, resumable and hash-verified downloads from Internet Archive"""
import argparse
import datetime
import hashlib
import io
import logging
import multiprocessing
import multiprocessing.pool
import os
import pathlib
import platform
import re
import signal
import sys
import time
import typing
dark_grey = "\x1b[90;20m"
bold_grey = "\x1b[37;1m"
blue = "\x1b[94;20m"
green = "\x1b[92;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
python_major, python_minor = platform.python_version_tuple()[0:2]
if int(python_major) < 3 or int(python_minor) < 7:
print(
"Please use Python 3.7 or above (version currently installed is {})".format(
platform.python_version()
)
)
sys.exit()
try:
import internetarchive
import requests
import tqdm
except ModuleNotFoundError:
print(
"Error loading Internet Archive module or dependencies - ensure that the Internet Archive"
" Python Library has been installed:"
" https://archive.org/services/docs/api/internetarchive/installation.html"
)
sys.exit()
class MsgCounterHandler(logging.Handler):
"""Custom logging handler to count number of calls per log level"""
def __init__(self, *args, **kwargs) -> None:
super(MsgCounterHandler, self).__init__(*args, **kwargs)
self.count = {}
self.count["WARNING"] = 0
self.count["ERROR"] = 0
def emit(self, record) -> None:
levelname = record.levelname
if levelname not in self.count:
self.count[levelname] = 0
self.count[levelname] += 1
class ColorFormatter(logging.Formatter):
msg_format = "%(asctime)s - %(levelname)s - %(message)s"
FORMATS = {
logging.DEBUG: dark_grey + msg_format + reset,
logging.INFO: dark_grey + msg_format + reset,
logging.WARNING: yellow + msg_format + reset,
logging.ERROR: red + msg_format + reset,
logging.CRITICAL: bold_red + msg_format + reset,
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%Y-%m-%d %H:%M:%S")
return formatter.format(record)
class TermEscapeCodeFilter(logging.Filter):
"""A class to strip the escape codes from log messages destined for log files"""
def filter(self, record):
escape_re = re.compile(r"\x1b\[[0-9;]*m")
record.msg_without_colours = re.sub(escape_re, "", str(record.msg))
return True
def prepare_logging(
datetime_string: str, folder_path: str, identifier: str, args: typing.Dict[str, typing.Any]
) -> typing.Tuple[logging.Logger, MsgCounterHandler]:
"""Prepare and return logging object to be used throughout script"""
# INFO events and above will be written to both the console and a log file
# DEBUG events and above will be written only to a (separate) log file
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
# 'Quiet' logger for when quiet flag used in functions
quiet = logging.getLogger("quiet")
quiet.setLevel(logging.ERROR)
log_file_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(msg_without_colours)s")
log_file_filter = TermEscapeCodeFilter()
debug_log = logging.FileHandler(
os.path.join(folder_path, "{}_{}_debug.log".format(datetime_string, identifier))
)
debug_log.setLevel(logging.DEBUG)
debug_log.setFormatter(log_file_formatter)
debug_log.addFilter(log_file_filter)
info_log = logging.FileHandler(
os.path.join(folder_path, "{}_{}_info.log".format(datetime_string, identifier))
)
info_log.setLevel(logging.INFO)
info_log.setFormatter(log_file_formatter)
info_log.addFilter(log_file_filter)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(ColorFormatter())
counter_handler = MsgCounterHandler()
log.addHandler(debug_log)
log.addHandler(info_log)
log.addHandler(console_handler)
log.addHandler(counter_handler)
# Log platform details and commandline arguments
platform_detail_requests = [
"python_version",
"system",
"machine",
"platform",
"version",
"mac_ver",
]
for platform_detail_request in platform_detail_requests:
try:
log.debug(
"{}: {}".format(
platform_detail_request, getattr(platform, platform_detail_request)()
)
)
except: # pylint: disable=W0702
pass
# Sanitise username and passwords if credentials flag is present
if "credentials" in args:
if args["credentials"] is not None:
args["credentials"] = ["***", "***"]
log.debug("commandline_args: {}".format(args))
return log, counter_handler
def check_argument_int_greater_than_one(value: str) -> int:
"""Confirm numeric values provided as command line arguments are >= 1"""
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError("{} is an invalid positive int value".format(value))
return ivalue
def bytes_filesize_to_readable_str(bytes_filesize: int) -> str:
"""Convert bytes integer to kilobyte/megabyte/gigabyte/terabyte equivalent string"""
if bytes_filesize < 1024:
return "{} B"
num = float(bytes_filesize)
for unit in ["B", "KB", "MB", "GB"]:
if abs(num) < 1024.0:
return "{:.1f} {}".format(num, unit)
num /= 1024.0
return "{:.1f} {}".format(num, "TB")
def file_paths_in_folder(folder_path: str) -> typing.List[str]:
"""Return sorted list of paths of files at a directory (and its subdirectories)"""
log = logging.getLogger(__name__)
def walk_error(os_error: OSError) -> None:
"""Log any errors occurring during os.walk"""
log.warning(
"'%s' could not be accessed during folder scanning - any contents will not be"
" processed. Try running script as admin",
os_error.filename,
)
file_paths = []
for root, _, file_names in os.walk(folder_path, onerror=walk_error):
for name in file_names:
file_paths.append(os.path.join(root, name))
return sorted(file_paths)
def get_metadata_from_hashfile(
hash_file_path: str,
hash_flag: bool,
identifier_filter: typing.Optional[typing.List[str]] = None,
file_filters: typing.Optional[typing.List[str]] = None,
invert_file_filtering: bool = False,
) -> typing.Dict[str, str]:
"""Return dict of file paths and associated metadata parsed from IA hash metadata CSV"""
results = {} # type: typing.Dict[str, str]
with open(hash_file_path, "r", encoding="utf-8") as file_handler:
for line in file_handler:
identifier, file_path, size, md5, mtime = line.strip().split("|")
if file_filters is not None:
if not invert_file_filtering:
if not any(
substring.lower() in file_path.lower() for substring in file_filters
):
continue
else:
if any(substring.lower() in file_path.lower() for substring in file_filters):
continue
if identifier_filter is None or identifier in identifier_filter:
if hash_flag:
results[
os.path.join(identifier, os.path.normpath(file_path))
] = md5.lower().strip()
else:
results[
os.path.join(identifier, os.path.normpath(file_path))
] = size.lower().strip()
return results
def get_metadata_from_files_in_folder(
folder_path: str,
hash_flag: bool,
relative_paths_from_ia_metadata: typing.Optional[typing.List[str]] = None,
) -> typing.Dict[str, str]:
"""Return dict of file paths and metadata of files at a directory (and its subdirectories)"""
log = logging.getLogger(__name__)
results = {} # type: typing.Dict[str, str]
if relative_paths_from_ia_metadata is not None:
file_paths = [
os.path.join(folder_path, relative_path)
for relative_path in relative_paths_from_ia_metadata
]
else:
file_paths = file_paths_in_folder(folder_path)
if hash_flag:
for file_path in tqdm.tqdm(file_paths):
if os.path.isfile(file_path): # We will alert on this elsewhere if the file isn't found
try:
md5 = md5_hash_file(file_path)
results[
os.path.normpath(os.path.relpath(file_path, folder_path))
] = md5.lower().strip()
except (PermissionError, OSError):
log.warning(
"PermissionError/OSError occurred when accessing file '%s' - try running "
"script as admin",
file_path,
)
else:
# Return file sizes if we're not checking hash values
for file_path in file_paths:
if os.path.isfile(file_path): # We will alert on this elsewhere if the file isn't found
try:
file_size = os.path.getsize(file_path)
results[os.path.normpath(os.path.relpath(file_path, folder_path))] = str(
file_size
)
except (PermissionError, OSError):
log.warning(
"PermissionError/OSError occurred when accessing file '%s' - try running "
"script as admin",
file_path,
)
return results
def md5_hash_file(filepath: str) -> str:
"""Return str containing lowercase MD5 hash value of file at a file path"""
block_size = 64 * 1024
md5 = hashlib.md5()
with open(filepath, "rb") as file_handler:
while True:
data = file_handler.read(block_size)
if not data:
break
md5.update(data)
return md5.hexdigest()
def get_safe_path_name(path_name: str) -> str:
"""Return the provided file_name string with all non alphanumeric characters removed"""
def safe_char(char):
if char in {"*", '"', "/", "\\", ":", "|", "?"}:
return "_"
else:
return char
return "".join(safe_char(char) for char in path_name).rstrip("_")
def hash_pool_initializer() -> None:
"""Ignore CTRL+C in the hash worker processes (workers are daemonic so will close when the
main process terminates)
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def check_hash(file_path: str, md5_value_from_ia: str) -> typing.Tuple[str, str]:
"""Called as a separate process from the file_download function; returns results from an MD5
hash check of a file
"""
try:
md5_value_local = md5_hash_file(file_path)
except FileNotFoundError:
return (
"warning",
"'{}' file seems to have been deleted before hashing could complete".format(
os.path.basename(file_path)
),
)
except (PermissionError, OSError):
return (
"warning",
"PermissionError/OSError when attempting to hash '{}'".format(
os.path.basename(file_path)
),
)
if md5_value_local.lower().strip() == md5_value_from_ia.lower().strip():
return (
"debug",
"'{}' file hash ('{}') matches between local file and IA metadata".format(
os.path.basename(file_path), md5_value_local
),
)
return (
"warning",
"'{}' file hash does not match between local file ({}) and IA metadata ({})".format(
os.path.basename(file_path), md5_value_local, md5_value_from_ia
),
)
def log_update_callback(result: typing.List[typing.Tuple[str, str]]) -> None:
"""Function invoked when a hash operation completes; takes result of check_hash and adds to
log
"""
log = logging.getLogger(__name__)
log_level, log_message = result[0]
getattr(log, log_level)(log_message)
def does_file_have_416_issue(file_path: str) -> bool:
"""Check to see if a file has an embedded '416 status' error in its tail
Internet Archive servers can sometimes suddenly throw a 416 status ("Requested Range Not
Satisfiable") on resumable / split downloads. When this occurs, sometimes the partially
downloaded file will have content in its tail similar to:
<html><head><title>416 Requested Range Not Satisfiable</title></head>
<body><center><h1>416 Requested Range Not Satisfiable</h1></center>
<hr><center>nginx/1.18.0 (Ubuntu)</center></body></html>
In testing, can't just remove this tail and resume the download, as when diffing a completed
verified file against a partially downloaded '416' file, the file data deviates not at the tail
but much earlier in the file. So, this function checks to see if this issue has occurred, to
make a decision during download of whether the partially downloaded file needs to be removed
and started again
"""
with open(file_path, "rb") as file_handler:
file_handler.seek(-1024, os.SEEK_END)
if b"416 Requested Range Not Satisfiable" in file_handler.read():
return True
return False
def file_download(
download_details: typing.Tuple[
str,
str,
int,
str,
int,
str,
typing.Optional[multiprocessing.pool.Pool],
bool,
int,
typing.Optional[typing.Tuple[int, int]],
typing.Optional[int],
]
) -> None:
"""Called as separate threads from the download function; takes one of the files to be
downloaded from the download_queue and downloads, with subsequent (optional) MD5 hash
verification
"""
log = logging.getLogger(__name__)
(
identifier,
ia_file_name,
ia_file_size,
ia_md5,
ia_mtime,
output_folder,
hash_pool,
resume_flag,
split_count,
bytes_range,
chunk_number,
) = download_details
start_time = datetime.datetime.now()
file_size_split_limit = 10485760 # 10MB
dest_file_path = os.path.join(os.path.join(output_folder, identifier), ia_file_name)
dest_file_name = ia_file_name
expected_file_size = ia_file_size
# If our thread is part of a file split, update expectations on file paths/names and sizes
if chunk_number is not None:
dest_file_path += ".{}".format(chunk_number)
dest_file_name = os.path.basename(dest_file_path)
expected_file_size = bytes_range[1] - bytes_range[0] + 1
# If the destination file path exists already (i.e. file has already been (at least partially)
# downloaded), but the file size doesn't match expectations (i.e. download was incomplete),
# either re-download from scratch or attempt resume, depending on resume_flag argument
initial_file_size = 0
if os.path.isfile(dest_file_path):
if ia_file_size != -1: # -1 denotes that IA metadata does not contain size info
initial_file_size = os.path.getsize(dest_file_path)
if initial_file_size == expected_file_size:
log.debug(
"'{}' - will be skipped as file with expected file size already present at '{}'"
.format(dest_file_name, dest_file_path)
)
return
else:
if initial_file_size < expected_file_size:
if resume_flag:
log.info(
"'{}' - exists as downloaded file '{}' but file size indicates download"
" was not completed; will be resumed ({:.1%} remaining)".format(
dest_file_name,
dest_file_path,
1 - (initial_file_size / expected_file_size),
)
)
else:
log.info(
"'{}' - exists as downloaded file '{}' but file size indicates download"
" was not completed; will be redownloaded".format(
dest_file_name, dest_file_path
)
)
else:
log.warning(
"'{}' - exists as downloaded file '{}', but with a larger file size than"
" expected - was the file modified (either locally or on Internet Archive)"
" since it was downloaded?".format(dest_file_name, dest_file_path)
)
return
else:
log.info(
"'{}' - exists as downloaded file '{}' but file size metadata unavailable from IA"
" to confirm whether file size is as expected; will be redownloaded".format(
dest_file_name, dest_file_path
)
)
# If this thread is expected to create new threads for split file downloading, first need to
# check that the web server returns a 206 status code with a 'Range' request, indicating the
# requested can be split
if split_count > 1 and ia_file_size > file_size_split_limit:
response_list = internetarchive.download(
identifier,
files=[ia_file_name],
destdir=output_folder,
on_the_fly=True,
return_responses=True,
)
response = response_list[0] # type: requests.Response
request = response.request # type: requests.PreparedRequest
headers = request.headers
# We're just testing this connection, so don't need the whole byte range
headers["Range"] = "bytes={}-{}".format(0, 10)
new_response = requests.get(request.url, headers=headers, timeout=12, stream=True)
if new_response.status_code == 206:
log.debug(
"'{}' - returns a 206 status when requesting a Range - can therefore split download"
.format(ia_file_name)
)
elif new_response.status_code == 200:
log.debug(
"'{}' - returns a 200 status when requesting a Range - download will not be split"
.format(ia_file_name)
)
split_count = 1
else:
log.info(
"'{}' - unexpected status code {} returned when testing file splitting -"
" download will be attempted without splitting".format(
ia_file_name, new_response.status_code
)
)
split_count = 1
# Perform file download splitting
if split_count > 1 and ia_file_size > file_size_split_limit:
download_queue = []
chunk_sizes = {}
# Create byte ranges that will be used in each chunk thread, and create the download_queue
# the thread pool will take download from
for chunk_counter in range(split_count):
if chunk_counter == 0:
lower_bytes_range = 0
else:
lower_bytes_range = ((ia_file_size // split_count) * chunk_counter) + 1
if chunk_counter == split_count - 1: # For the last chunk, make sure we get everything
upper_bytes_range = ia_file_size - 1
else:
upper_bytes_range = (ia_file_size // split_count) * (chunk_counter + 1)
download_queue.append(
(
identifier,
ia_file_name,
ia_file_size,
ia_md5,
ia_mtime,
output_folder,
hash_pool,
resume_flag,
1, # split_count
(lower_bytes_range, upper_bytes_range),
chunk_counter,
)
)
chunk_sizes[chunk_counter] = upper_bytes_range - lower_bytes_range + 1
with multiprocessing.pool.ThreadPool(split_count) as download_pool:
# Chunksize 1 used to ensure downloads occur in filename order
log.info("'{}' - will be downloaded in {} parts".format(ia_file_name, split_count))
download_pool.map(file_download, download_queue, chunksize=1)
download_pool.close()
download_pool.join()
# When file chunk downloads have finished in above thread pool, check the chunks are the
# expected size
failed_indicator = False
for chunk_counter in range(split_count):
chunk_file_path = "{}.{}".format(dest_file_path, chunk_counter)
if not os.path.isfile(chunk_file_path):
log.warning(
"'{}' - chunk {} (sub-file '{}') cannot be found".format(
ia_file_name, chunk_counter, chunk_file_path
)
)
failed_indicator = True
elif os.path.getsize(chunk_file_path) != chunk_sizes[chunk_counter]:
log.warning(
"'{}' - chunk {} (sub-file '{}') is not the expected size (expected size {},"
" actual size {})".format(
ia_file_name,
chunk_counter,
chunk_file_path,
chunk_sizes[chunk_counter],
os.path.getsize(chunk_file_path),
)
)
failed_indicator = True
if failed_indicator:
log.warning(
"'{}' - error occurred with file chunks - file could not be reconstructed"
" and has therefore not been downloaded successfully".format(ia_file_name)
)
else:
# Merge the chunks into the final file and delete each chunk as we go
block_size = 4096 * 1024
with open(dest_file_path, "wb") as output_file_handler:
for chunk_counter in range(split_count):
chunk_file_path = "{}.{}".format(dest_file_path, chunk_counter)
with open(chunk_file_path, "rb") as input_file_handler:
while True:
data = input_file_handler.read(block_size)
if not data:
break
output_file_handler.write(data)
os.remove(chunk_file_path)
else:
# In testing, downloads can timeout occasionally with requests.exceptions.ConnectionError
# raised; catch and attempt download five times before giving up
connection_retry_counter = 0
size_retry_counter = 0
MAX_RETRIES = 5
connection_wait_timer = 600
size_wait_timer = 600
while True:
try:
if not resume_flag and chunk_number is None:
log.info(
"{}'{}'{} - beginning download".format(bold_grey, dest_file_name, blue)
)
while True:
try:
internetarchive.download(
identifier,
files=[ia_file_name],
destdir=output_folder,
on_the_fly=True,
)
break
except requests.exceptions.HTTPError as http_error:
status_code = http_error.response.status_code
if status_code == 403:
log.warning(
"'{}' - 403 Forbidden error occurred - an account login may be"
" required to access this file (account details can be passed"
" using the '-c' flag) - note that download may not be possible"
" even when logged in, if the file is within a restricted"
" access item (e.g. books in the lending program or 'stream"
" only' videos)".format(ia_file_name)
)
else:
log.warning(
"'{}' - {} error status returned when attempting download"
.format(ia_file_name, status_code)
)
return
except FileExistsError:
log.debug(
"FileExistsError for '{}' occurred - this seems to happen"
" occasionally on Windows and Ubuntu, but a retry seems to fix"
.format(ia_file_name)
)
time.sleep(2)
else:
partial_file_size = 0
if os.path.isfile(dest_file_path):
if ia_file_size == -1 or not resume_flag:
# If we don't have size metadata from IA (i.e. if file_size == -1), then
# perform a full re-download. (Although we could run a hash check
# instead, in testing it seems that any IA file that lacks size metadata
# will also give different hash values per download - so would be
# wasting time to calc hash as there'll always be a mismatch requiring
# a full re-download)
log.info(
"{}'{}'{} - beginning re-download".format(
bold_grey, dest_file_name, blue
)
)
file_write_mode = "wb"
elif resume_flag:
log.info(
"{}'{}'{} - resuming download".format(
bold_grey, dest_file_name, blue
)
)
file_write_mode = "ab"
partial_file_size = os.path.getsize(dest_file_path)
else:
log.info(
"{}'{}'{} - beginning download".format(bold_grey, dest_file_name, blue)
)
file_write_mode = "wb"
pathlib.Path(os.path.dirname(dest_file_path)).mkdir(
parents=True, exist_ok=True
)
# If we're wanting to be able to resume file transfers, we will use the
# internetarchive.download function to just return the PreparedResponse object
# with which we can make a new Request
# (We are doing this as internetarchive.download will otherwise delete a
# partially-downloaded file if a ConnectionError occurs, meaning we would have
# nothing left to try and resume)
try:
response_list = internetarchive.download(
identifier,
files=[ia_file_name],
destdir=output_folder,
on_the_fly=True,
return_responses=True,
)
except requests.exceptions.HTTPError as http_error:
status_code = http_error.response.status_code
if status_code == 403:
log.warning(
"'{}' - 403 Forbidden error occurred - an account login may be"
" required to access this file (account details can be passed using"
" the '-c' flag) - note that download may not be possible even when"
" logged in, if the file is within a restricted access item (e.g."
" books in the lending program)".format(ia_file_name)
)
else:
log.warning(
"'{}' - {} error status returned".format(ia_file_name, status_code)
)
return
response = response_list[0] # type: requests.Response
request = response.request # type: requests.PreparedRequest
headers = request.headers
updated_bytes_range = None
if file_write_mode == "ab":
# If we don't have bytes_range, this download isn't a file chunk, so just
# download all the remaining file data
if bytes_range is None:
updated_bytes_range = (partial_file_size, ia_file_size - 1)
# Otherwise, this is a file chunk, so only download up to the final amount
# needed for this chunk
else:
lower_bytes_range = bytes_range[0] + partial_file_size
updated_bytes_range = (lower_bytes_range, bytes_range[1])
elif bytes_range is not None:
updated_bytes_range = bytes_range
# Set the bytes range if we're either resuming a download or downloading a file
# chunk
if updated_bytes_range is not None:
headers["Range"] = "bytes={}-{}".format(
updated_bytes_range[0], updated_bytes_range[1]
)
log.debug(
"'{}' - range to be requested (being downloaded as file"
" '{}') is {}-{}".format(
ia_file_name,
dest_file_name,
updated_bytes_range[0],
updated_bytes_range[1],
)
)
new_response = requests.get(
request.url, headers=headers, timeout=12, stream=True
)
log.debug(
"'{}' - {} status for request (being downloaded as file '{}')".format(
ia_file_name, new_response.status_code, dest_file_name
)
)
if new_response.status_code == 200 or new_response.status_code == 206:
file_download_write_block_size = 1000000
with open(dest_file_path, file_write_mode) as file_handler:
for download_chunk in new_response.iter_content(
chunk_size=file_download_write_block_size
):
if download_chunk:
file_handler.write(download_chunk)
try:
if (
ia_mtime != -1
): # -1 denotes that IA metadata does not contain mtime info
os.utime(dest_file_path, (0, ia_mtime))
except OSError:
# Probably file-like object, e.g. sys.stdout.
pass
elif new_response.status_code == 416:
if os.path.isfile(dest_file_path):
if does_file_have_416_issue(dest_file_path):
log.info(
"416 error message has been embedded in partially downloaded"
" file '{}', causing file corruption; the partially downloaded"
" file will be deleted".format(dest_file_name)
)
os.remove(dest_file_path)
if size_retry_counter < MAX_RETRIES:
log.info(
"416 status returned for request for IA file '{}' (being downloaded"
" as file '{}') - indicating that the IA server cannot proceed with"
" resumed download at this time - waiting {} minutes before"
" retrying (will retry {} more times)".format(
ia_file_name,
dest_file_name,
int(size_wait_timer / 60),
MAX_RETRIES - size_retry_counter,
)
)
time.sleep(size_wait_timer)
size_retry_counter += 1
size_wait_timer *= (
2 # Add some delay for each retry in case connection issue is
# ongoing
)
continue
log.warning(
"Persistent 416 statuses returned for IA file '{}' (being downloaded as"
" file '{}') - server may be having temporary issues; download not"
" completed".format(ia_file_name, dest_file_name)
)
return
else:
log.warning(
"Unexpected status code {} returned for IA file '{}' (being downloaded"
" as file '{}') - download not completed".format(
new_response.status_code, ia_file_name, dest_file_name
)
)
return
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
if connection_retry_counter < MAX_RETRIES:
log.info(
"ConnectionError/ReadTimeout occurred for '{}', waiting {} minutes before"
" retrying (will retry {} more times)".format(
dest_file_name,
int(connection_wait_timer / 60),
MAX_RETRIES - connection_retry_counter,
)
)
time.sleep(connection_wait_timer)
connection_retry_counter += 1
connection_wait_timer *= (
2 # Add some delay for each retry in case connection issue is ongoing
)
else:
log.warning(
"'{}' - download timed out {} times; this file has not been downloaded"
" successfully".format(dest_file_name, MAX_RETRIES)
)
return
else:
downloaded_file_size = os.path.getsize(dest_file_path)
# In testing, have seen rare instances of the file not being fully downloaded
# despite the response object not reporting any more data to write.
# This appears associated with the server suddenly throwing a 416 status -
# this can be seen by the partially downloaded file having a tail with content
# content similar to:
# <html><head><title>416 Requested Range Not Satisfiable</title></head>
# <body><center><h1>416 Requested Range Not Satisfiable</h1></center>
# <hr><center>nginx/1.18.0 (Ubuntu)</center></body></html>
# In testing, can't just remove this tail and resume the download, as when diffing
# a completed verified file against a partially downloaded '416' file, the file
# data deviates not at the tail but much earlier in the file.
# So, let's delete the partially downloaded file in this situation and begin again
if ia_file_size != -1 and downloaded_file_size < expected_file_size:
if size_retry_counter < MAX_RETRIES:
log.info(
"File '{}' download concluded but file size is not as expected (file"
" size is {} bytes, expected {} bytes). {} - partially downloaded file"
" will be deleted. Waiting {} minutes before retrying (will retry {}"
" more times)".format(
dest_file_name,
downloaded_file_size,
expected_file_size,
"The server raised a 416 status error, causing file corruption"
if does_file_have_416_issue(dest_file_path)
else "In this situation the file is likely corrupt",
int(size_wait_timer / 60),
MAX_RETRIES - size_retry_counter,
)
)
os.remove(dest_file_path)
time.sleep(size_wait_timer)
size_retry_counter += 1
size_wait_timer *= (
2 # Add some delay for each retry in case connection issue is ongoing
)
else:
log.warning(
"Failed to increase downloaded file '{}' to expected file size (final"
" file size is {}, expected {}; this file has not been downloaded"
" successfully".format(
dest_file_name, downloaded_file_size, expected_file_size
)
)
return
# If no further errors, break from the True loop
else:
break
complete_time = datetime.datetime.now()
duration = complete_time - start_time
duration_in_minutes = duration.total_seconds() / 60
# Remove the data that was downloaded in previous sessions (initial_file_size) to get the
# amount of data downloaded in this session, for accurate stats on how long it took to download
downloaded_data_in_mb = ((expected_file_size - initial_file_size) / 1024) / 1024
log.info(
"{}'{}'{} - download completed in {}{}".format(
bold_grey,
dest_file_name,
green,
datetime.timedelta(seconds=round(int(duration.total_seconds()))),
" ({:.2f}MB per minute)".format(downloaded_data_in_mb / duration_in_minutes)
if expected_file_size > 1048576 # 1MB; seems inaccurate for files beneath this size
else "",
)
)
# If user has opted to verify downloads, add the task to the hash_pool
if chunk_number is None: # Only hash if we're in a thread that isn't downloading a file chunk
if hash_pool is not None:
# Don't hash the [identifier]_files.xml file, as this regularly gives false
# positives (see README Known Issues)
if dest_file_name != "{}_files.xml".format(identifier):
hash_pool.starmap_async(
check_hash, iterable=[(dest_file_path, ia_md5)], callback=log_update_callback
)
def download(
identifier: str,
output_folder: str,
hash_file: typing.Optional[io.TextIOWrapper],
thread_count: int,
resume_flag: bool,
verify_flag: bool,
split_count: int,
file_filters: typing.Optional[typing.List[str]],
invert_file_filtering: bool,
cache_parent_folder: str,
cache_refresh: bool,
) -> None:
"""Download files associated with an Internet Archive identifier"""
log = logging.getLogger(__name__)
PROCESSES = multiprocessing.cpu_count() - 1
MAX_RETRIES = 5
# Create output folder if it doesn't already exist
pathlib.Path(output_folder).mkdir(parents=True, exist_ok=True)
log.info("'{}' contents will be downloaded to '{}'".format(identifier, output_folder))
# If user has set to verify, create a new multiprocessing.Pool whose reference will be passed
# to each download thread to allow for non-blocking hashing
hash_pool = None
if verify_flag:
hash_pool = multiprocessing.Pool(PROCESSES, initializer=hash_pool_initializer)
# See if the items exist in the cache
cache_folder = os.path.join(cache_parent_folder, identifier)
item = None
class CacheDict:
"""Using this simply to allow for a custom attribute (item_metadata) if we use cache"""
if not cache_refresh and os.path.isdir(cache_folder):
cache_files = sorted(
[
f.path
for f in os.scandir(cache_folder)
if f.is_file() and f.name.endswith("metadata.txt")
]
)
if len(cache_files) > 0:
cache_file = cache_files[-1] # Get the most recent cache file
# Get datetime from filename
datetime_str = "_".join(os.path.basename(cache_file).split("_", 2)[:2])
file_datetime = datetime.datetime.strptime(datetime_str, "%Y%m%d_%H%M%S")
now_datetime = datetime.datetime.now()
if now_datetime - datetime.timedelta(weeks=1) <= file_datetime <= now_datetime:
log.debug(
"Cached data from {} will be used for item '{}'".format(
datetime_str, identifier
)
)
item = CacheDict()
item.item_metadata = {}
item.item_metadata["files"] = []
with open(cache_file, "r") as file_handler:
try:
for line in file_handler:
_, file_path, size, md5, mtime = line.strip().split("|")
item_dict = {}
item_dict["name"] = file_path
item_dict["size"] = size
item_dict["md5"] = md5
item_dict["mtime"] = mtime
item.item_metadata["files"].append(item_dict)
except ValueError:
log.info(
"Cache file '{}' does not match expected format - cache data will"
" be redownloaded".format(cache_file)
)
item = None
if item is None:
connection_retry_counter = 0
connection_wait_timer = 600
while True:
try:
# Get Internet Archive metadata for the provided identifier
item = internetarchive.get_item(identifier)
if "item_last_updated" in item.item_metadata:
item_updated_time = datetime.datetime.fromtimestamp(
int(item.item_metadata["item_last_updated"])
)
if item_updated_time > (datetime.datetime.now() - datetime.timedelta(weeks=1)):
log.warning(
"Internet Archive item '{}' was updated within the last week (last"
" updated on {}) - verification/corruption issues may occur if"
" files are being updated by the uploader. If such errors occur"
" when resuming a download, recommend using the '--cacherefresh'"
" flag".format(
identifier, item_updated_time.strftime("%Y-%m-%d %H:%M:%S")
)
)