diff --git a/kipoiseq/utils.py b/kipoiseq/utils.py index 5610c37..5d97703 100644 --- a/kipoiseq/utils.py +++ b/kipoiseq/utils.py @@ -1,5 +1,7 @@ -import numpy as np from six import string_types +import numpy as np +import pyranges +from kipoiseq.extractors import MultiSampleVCF, FastaStringExtractor # alphabets: @@ -34,6 +36,62 @@ def parse_dtype(dtype): try: return eval(dtype) except Exception as e: - raise ValueError("Unable to parse dtype: {}. \nException: {}".format(dtype, e)) + raise ValueError( + "Unable to parse dtype: {}. \nException: {}".format(dtype, e)) else: return dtype + + +def _get_chrom_annotation(source): + if type(source) == FastaStringExtractor: + return set(source.fasta.keys()) + elif type(source) == MultiSampleVCF: + return set(source.seqnames) + elif type(source) == pyranges.PyRanges: + return set(source.Chromosome) + else: + raise ValueError('source `%s` is not valid is not valid because ' + ' source type `%s` is not supported.' + % (repr(source), type(source))) + + +def compare_chrom_annotation(sources, strategy='some', core_chroms=None): + """Compares chromosome annotations from different sources. + Throws exception iif annotations are not compatible. + + # Arguments: + sources: list of different objects. vcf, fasta, pyranges are valid. + strategy: comparison strategy. `some` means some intersection excepted + or `all` all chromosomes should be same. + core_chroms: chromosomes must exist. + + # Returns: + chroms common cross files. + + # Example: + ```python + >>> sources = [ + MultiSampleVCF(...), + FastaStringExtractor(...), + pyranges, + pyranges, + MultiSampleVCF(...) + ] + >>> compare_chrom_annotation(sources, strategy='all') + ``` + """ + if not len(sources) > 1: + raise ValueError( + 'At least two item should gived as sources to compare') + + chroms = list(map(_get_chrom_annotation, sources)) + + if strategy == 'all': + assert all(chroms[0] == i for i in chroms), \ + 'chroms annotations are not all same.' + return chroms[0] + elif strategy == 'some': + chrom_intersect = set.intersection(*chroms) + assert len(chrom_intersect) > 0, \ + 'there is not intersection between chromosomes.' + return chrom_intersect diff --git a/setup.py b/setup.py index 4779d5e..59ff134 100755 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ "jedi", "pytest>=3.3.1", "pytest-xdist", # running tests in parallel + "pytest-mock", "pytest-pep8", # see https://github.com/kipoi/kipoi/issues/91 "pytest-cov", "coveralls", @@ -36,7 +37,7 @@ # "genomelake", "keras", "tensorflow", - "pybedtools" + "pybedtools", ] setup( diff --git a/tests/data/test.vcf.gz b/tests/data/test.vcf.gz index d96030e..e37d849 100644 Binary files a/tests/data/test.vcf.gz and b/tests/data/test.vcf.gz differ diff --git a/tests/data/test.vcf.gz.tbi b/tests/data/test.vcf.gz.tbi index 303734c..b385012 100644 Binary files a/tests/data/test.vcf.gz.tbi and b/tests/data/test.vcf.gz.tbi differ diff --git a/tests/test_3_utils.py b/tests/test_3_utils.py index 7520b77..201bd3e 100644 --- a/tests/test_3_utils.py +++ b/tests/test_3_utils.py @@ -1,6 +1,10 @@ +from conftest import vcf_file, sample_5kb_fasta_file, example_intervals_bed import pytest import numpy as np -from kipoiseq.utils import parse_alphabet, parse_dtype +import pyranges +from kipoiseq.extractors import FastaStringExtractor, MultiSampleVCF +from kipoiseq.utils import parse_alphabet, parse_dtype, \ + compare_chrom_annotation def test_parse_alphabet(): @@ -17,3 +21,33 @@ def test_parse_type(): assert parse_dtype('float') == float assert parse_dtype(float) == float assert parse_dtype("np.float32") == np.float32 + + +def test_compare_chrom_annotation(): + sources = [ + MultiSampleVCF(vcf_file), + FastaStringExtractor(sample_5kb_fasta_file), + pyranges.read_bed(example_intervals_bed) + ] + + with pytest.raises(ValueError): + assert compare_chrom_annotation([]) + + with pytest.raises(ValueError): + assert compare_chrom_annotation([object()]) + + assert compare_chrom_annotation(sources) == {'chr1'} + assert compare_chrom_annotation(sources, strategy='all') == {'chr1'} + + with pytest.raises(AssertionError) as exception: + sources[1].fasta = {'chr1': '', 'chr2': '', 'chr3': ''} + compare_chrom_annotation(sources, strategy='all') + + assert str(exception.value) == 'chroms annotations are not all same.' + + assert compare_chrom_annotation(sources) == {'chr1'} + + with pytest.raises(AssertionError) as exception: + sources[1].fasta = {'chr2': '', 'chr3': ''} + compare_chrom_annotation(sources) + assert str(exception.value) == 'there is not intersection between chromosomes.'