Skip to content

Commit

Permalink
Fix #205 and #183
Browse files Browse the repository at this point in the history
Changes:
- Mikado now will not hang if a subprocess dies, it will immediately exit.
- Ensured that Mikado runs are fully reproducible using a random seed (#183)
- Solved a bug that crashed Mikado prepare in the presence of incorrect transcripts
- Removed the cause for locked interprocess-exchange databases in Mikado pick. Switched to WAL and increased the timeout limit.
  • Loading branch information
lucventurini authored Aug 5, 2019
1 parent fe435c6 commit 68d3c60
Show file tree
Hide file tree
Showing 22 changed files with 444 additions and 105 deletions.
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,14 @@ script:
- cd ..;
- python -c "import Mikado; Mikado.test(label='fast')";
- python setup.py test --addopts " --cov Mikado -m '(slow or not slow) and not triage'";
# Check that the seed is set properly
- mikado pick --procs 2 --seed 20 --fasta Mikado/tests/chr5.fas.gz --json-conf Mikado/tests/check_seed.yaml -od 20a Mikado/tests/check_seed.gtf
- mikado pick --procs 2 --seed 20 --fasta Mikado/tests/chr5.fas.gz --json-conf Mikado/tests/check_seed.yaml -od 20b Mikado/tests/check_seed.gtf
- mikado pick --procs 2 --seed 20 --fasta Mikado/tests/chr5.fas.gz --json-conf Mikado/tests/check_seed.yaml -od 20c Mikado/tests/check_seed.gtf
- mikado pick --procs 2 --seed 20 --fasta Mikado/tests/chr5.fas.gz --json-conf Mikado/tests/check_seed.yaml -od 20d Mikado/tests/check_seed.gtf
- mikado pick --procs 2 --seed 10 --fasta Mikado/tests/chr5.fas.gz --json-conf Mikado/tests/check_seed.yaml -od 10 Mikado/tests/check_seed.gtf
- if [[ $(diff -q 20a 20b) || $(diff -q 20a 20c) || $(diff -q 20a 20d) ]]; then exit 1; fi
- if [[ ! $(diff -q 20a 10) ]]; then exit 1; fi

after_success:
- codecov
4 changes: 4 additions & 0 deletions Mikado/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def main(call_args=None):
logger = create_default_logger("main")
logger.error("Mikado crashed, cause:")
logger.exception(exc)
import multiprocessing as mp
for child in mp.active_children():
child.terminate()

sys.exit(1)

if __name__ == '__main__':
Expand Down
18 changes: 13 additions & 5 deletions Mikado/configuration/configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ..utilities import merge_dictionaries
from ..utilities.log_utils import create_default_logger
import sys
import random
import numpy


__author__ = "Luca Venturini"
Expand Down Expand Up @@ -607,10 +607,14 @@ def check_json(json_conf, simple=False, external_dict=None, logger=None):

seed = json_conf.get("seed", None)
if seed is None:
seed = random.randint(0, sys.maxsize)
seed = numpy.random.randint(0, 2**32 - 1)
logger.info("Random seed: {}", seed)
json_conf["seed"] = seed
random.seed(seed)

if seed is not None:
numpy.random.seed(seed % (2 ** 32 - 1))
else:
numpy.random.seed(None)

return json_conf

Expand Down Expand Up @@ -655,8 +659,12 @@ def to_json(string, simple=False, logger=None):

seed = json_dict.get("seed", None)
if seed is None:
seed = random.randint(0, sys.maxsize)
seed = numpy.random.randint(0, 2 ** 32 - 1)
logger.info("Random seed: {}", seed)
random.seed(seed)

if seed is not None:
numpy.random.seed(seed % (2 ** 32 - 1))
else:
numpy.random.seed(None)

return json_dict
14 changes: 9 additions & 5 deletions Mikado/loci/abstractlocus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import abc
import itertools
import numpy as np
import logging
import random
from sys import maxsize
import networkx
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
Expand Down Expand Up @@ -401,8 +401,7 @@ def find_cliques(self, graph: networkx.Graph) -> (networkx.Graph, list):

return find_cliques(graph, self.logger)

@classmethod
def choose_best(cls, transcripts: dict) -> str:
def choose_best(self, transcripts: dict) -> str:
"""
:param transcripts: the dictionary of transcripts of the instance
:type transcripts: dict
Expand All @@ -413,10 +412,15 @@ def choose_best(cls, transcripts: dict) -> str:
"""

# Choose one transcript randomly between those that have the maximum score
if len(transcripts) == 1:
return list(transcripts.keys())[0]
np.random.seed(self.json_conf["seed"])
max_score = max(transcripts.values(),
key=operator.attrgetter("score")).score
return random.choice(
[transc for transc in transcripts if transcripts[transc].score == max_score])
valid = sorted([transc for transc in transcripts if transcripts[transc].score == max_score])
chosen = valid[numpy.random.choice(len(valid))]
self.logger.debug("Chosen {chosen} out of {}".format(", ".join(valid), chosen=chosen))
return chosen

# ###### Class instance methods #######

Expand Down
4 changes: 0 additions & 4 deletions Mikado/loci/monosublocusholder.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ def define_loci(self, purge=False, excluded=None):
selected_transcript = self.transcripts[selected_tid]
to_remove.add(selected_tid)
to_remove.update(set(graph.neighbors(selected_tid)))
# for clique in cliques:
# if selected_tid in clique:
# to_remove.update(clique)

if purge is False or selected_transcript.score > 0:
new_locus = Locus(selected_transcript, logger=self.logger, json_conf=self.json_conf,
use_transcript_scores=self._use_transcript_scores)
Expand Down
8 changes: 0 additions & 8 deletions Mikado/loci/sublocus.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,6 @@ def define_monosubloci(self, purge=False, excluded=None):
",".join(set(transcript_graph.neighbors(selected_tid)))
))
to_remove.update(set(transcript_graph.neighbors(selected_tid)))
# for tid in transcript_graph.neighbors(selected_tid)
# for clique in cliques:
# if selected_tid in clique:
# self.logger.debug("Removing as intersecting {0}: {1}".format(
# selected_tid,
# ",".join(list(clique))
# ))
# to_remove.update(clique)
if purge is False or selected_transcript.score > 0:
new_locus = Monosublocus(selected_transcript,
logger=self.logger,
Expand Down
8 changes: 0 additions & 8 deletions Mikado/loci/superlocus.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,6 @@ def _load_introns(self, data_dict):
ver_introns = dict(((junc.junction_start, junc.junction_end), junc.strand)
for junc in ver_introns)

# ver_introns = set((junc.junction_start, junc.junction_end) for junc in
# self.junction_baked(self.session).params(
# chrom=self.chrom,
# strand=self.strand,
# junctionStart=self.start,
# junctionEnd=self.end
# ))

self.logger.debug("Found %d verifiable introns for %s",
len(ver_introns), self.id)

Expand Down
4 changes: 2 additions & 2 deletions Mikado/parsers/bed12.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""


import random
import numpy
import os
from Bio import Seq
import Bio.SeqRecord
Expand Down Expand Up @@ -1032,7 +1032,7 @@ def __init__(self, handle,
if isinstance(fasta_index, dict):
# check that this is a bona fide dictionary ...
assert isinstance(
fasta_index[random.sample(fasta_index.keys(), 1)],
fasta_index[numpy.random.choice(fasta_index.keys(), 1)],
Bio.SeqRecord.SeqRecord)
elif fasta_index is not None:
if isinstance(fasta_index, str):
Expand Down
10 changes: 7 additions & 3 deletions Mikado/picking/loci_processer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from multiprocessing import Process
from multiprocessing.managers import AutoProxy
import logging
import numpy
from itertools import product
import logging.handlers as logging_handlers
import functools
Expand Down Expand Up @@ -639,6 +640,7 @@ def __init__(self,

# current_counter, gene_counter, current_chrom = shared_values
super(LociProcesser, self).__init__()

self.logging_queue = logging_queue
self.__identifier = identifier # Property directly unsettable
self.name = "LociProcesser-{0}".format(self.identifier)
Expand All @@ -649,7 +651,6 @@ def __init__(self,
self.logger.setLevel(self.json_conf["log_settings"]["log_level"])
self.logger.propagate = False
self._tempdir = tempdir

self.__data_dict = data_dict
self.locus_queue = locus_queue
# self.lock = lock
Expand Down Expand Up @@ -743,8 +744,11 @@ def __close_handles(self):
self.handler.close()

for group in self._handles:
[_.flush() for _ in group if hasattr(_, "flush") and _.closed is False]
[_.close() for _ in group if hasattr(_, "close") and _.closed is False]
try:
[_.flush() for _ in group if hasattr(_, "flush") and _.closed is False]
[_.close() for _ in group if hasattr(_, "close") and _.closed is False]
except ValueError:
pass # This is for when we are terminating Mikado due to a crash.
if self.engine is not None:
self.engine.dispose()

Expand Down
21 changes: 17 additions & 4 deletions Mikado/picking/picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import pickle
import warnings
import pyfaidx
import numpy
logging.captureWarnings(True)
warnings.simplefilter("always")
import sqlite3
Expand Down Expand Up @@ -98,6 +99,10 @@ def __init__(self, json_conf, commandline=""):

# self.setup_logger()
self.logger.info("Random seed: %s", self.json_conf["seed"])
if self.json_conf["seed"] is not None:
numpy.random.seed((self.json_conf["seed"]) % (2 ** 32 - 1))
else:
numpy.random.seed(None)
self.logger.debug("Multiprocessing method: %s",
self.json_conf["multiprocessing_method"])

Expand Down Expand Up @@ -824,15 +829,18 @@ def add_to_index(conn: sqlite3.Connection,
transcripts = json.dumps([_.as_dict() for _ in transcripts])
cursor.execute("INSERT INTO transcripts VALUES (?, ?)", (counter, transcripts))
conn.commit()

return

@staticmethod
def _create_temporary_store(tempdirectory):

conn = sqlite3.connect(os.path.join(tempdirectory, "temp_store.db"))
conn = sqlite3.connect(os.path.join(tempdirectory, "temp_store.db"),
isolation_level="DEFERRED",
timeout=60,
check_same_thread=False # Necessary for SQLite3 to function in multiprocessing
)
cursor = conn.cursor()
# cursor.execute("PRAGMA journal_mode=wal")
cursor.execute("PRAGMA journal_mode=wal")
cursor.execute("CREATE TABLE transcripts (counter integer, json blob)")
cursor.execute("CREATE INDEX tid_idx on transcripts(counter)")

Expand Down Expand Up @@ -905,7 +913,11 @@ def __submit_multi_threading(self, data_dict):
invalid = True
elif row.is_transcript is True:
if current_transcript is not None and invalid is False:
self.__test_sortedness(row, current_transcript)
try:
self.__test_sortedness(row, current_transcript)
except UnsortedInput:
[_.terminate() for _ in working_processes]
raise
if current_locus is not None and Superlocus.in_locus(
current_locus, current_transcript,
flank=self.json_conf["pick"]["clustering"]["flank"]) is True:
Expand Down Expand Up @@ -1206,6 +1218,7 @@ def __call__(self):
except UnsortedInput as _:
self.logger.error(
"The input files were not properly sorted! Please run prepare and retry.")

sys.exit(1)

# list(map(job.get() for job in jobs if job is not None))
Expand Down
8 changes: 7 additions & 1 deletion Mikado/preparation/annotation_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import json
import sqlite3
import os
# from ..parsers.bed12 import BED12
import numpy
from ..transcripts import Transcript
from operator import itemgetter

Expand All @@ -28,9 +28,15 @@ def __init__(self,
min_length=0,
max_intron=3*10**5,
log_level="WARNING",
seed=None,
strip_cds=False):

super().__init__()
if seed is not None:
numpy.random.seed(seed % (2 ** 32 - 1))
else:
numpy.random.seed(None)

self.submission_queue = submission_queue
self.min_length = min_length
self.max_intron = max_intron
Expand Down
Loading

0 comments on commit 68d3c60

Please sign in to comment.