-
Notifications
You must be signed in to change notification settings - Fork 2
/
analyze_recordings.py
81 lines (67 loc) · 3.15 KB
/
analyze_recordings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
'''
Analyzes sound quality of all recordings of selected species.
'''
import logging
import multiprocessing.pool
import signal
import analysis
import fetcher
import progress
from recordings import Recording, SonogramAnalysis
from species import Species, SelectedSpecies
_sonogram_fetcher = None
def _analyze(recording):
# We pass a tuple because SQLAlchemy objects are not fully process-safe.
# They can be pickled and unpickled, but they won't be bound to a Session
# anymore. This means they can't refresh their attributes, which is
# something they try to do after a commit() from inside the main process.
recording_id, sonogram_url_small = recording
try:
# Create one fetcher per process.
global _sonogram_fetcher # pylint: disable=global-statement
if not _sonogram_fetcher:
_sonogram_fetcher = fetcher.Fetcher(cache_group='xc_sonograms_small', pool_size=1)
sonogram = None
try:
sonogram = _sonogram_fetcher.fetch_cached(sonogram_url_small)
except fetcher.FetchError as ex:
logging.warning(f'Sonogram for recording {recording_id} could not be fetched', exc_info=True)
sonogram_quality = -999999
if sonogram:
sonogram_quality = analysis.sonogram_quality(recording_id, sonogram)
return (recording_id, sonogram_quality)
except Exception as ex:
# Re-raise as something that's guaranteed to be pickleable.
logging.error('Exception during analysis', exc_info=True)
raise RuntimeError(f'Exception during analysis: {ex}')
def add_args(parser):
parser.add_argument(
'--reanalyze_recordings', action='store_true',
help='Delete all sonogram analyses before starting')
parser.add_argument(
'--analysis_jobs', type=int, default=8,
help='Number of parallel sonogram analysis jobs to run')
def main(args, session):
if args.reanalyze_recordings:
logging.info('Deleting all sonogram analyses')
session.query(SonogramAnalysis).delete()
logging.info('Fetching all recordings for selected species')
recordings = session.query(Recording)\
.join(Species, Species.scientific_name == Recording.scientific_name)\
.join(SelectedSpecies)\
.filter(Recording.sonogram_url_small != None, # pylint: disable=singleton-comparison
Recording.sonogram_url_small != '',
~Recording.sonogram_analysis.has())\
.all()
logging.info('Analyzing recordings')
# https://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python#35134329
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
with multiprocessing.pool.Pool(args.analysis_jobs) as pool:
signal.signal(signal.SIGINT, original_sigint_handler)
for recording_id, sonogram_quality in progress.percent(
pool.imap(_analyze, [(r.recording_id, r.sonogram_url_small) for r in recordings]),
len(recordings)):
session.add(SonogramAnalysis(
recording_id=recording_id,
sonogram_quality=sonogram_quality))
session.commit()