diff --git a/ete4/gtdb_taxonomy/gtdbquery.py b/ete4/gtdb_taxonomy/gtdbquery.py index 6cf00d18..01c65dbd 100644 --- a/ete4/gtdb_taxonomy/gtdbquery.py +++ b/ete4/gtdb_taxonomy/gtdbquery.py @@ -146,13 +146,38 @@ def _translate_merged(self, all_taxids): # return taxid, spname, norm_score + def _get_id2rank(self, internal_taxids): + """Given a list of numeric ids (each one representing a taxa in GTDB), return a dictionary with their corresponding ranks. + Examples: + > gtdb.get_rank([2174, 205487, 610]) + {2174: 'family', 205487: 'order', 610: 'phylum'} + + Note: Numeric taxids are not recognized by the official GTDB taxonomy database, only for internal usage. + """ + ids = ','.join('"%s"' % v for v in set(internal_taxids) - {None, ''}) + result = self.db.execute('SELECT taxid, rank FROM species WHERE taxid IN (%s)' % ids) + return {tax: spname for tax, spname in result.fetchall()} + def get_rank(self, taxids): - """Return dictionary converting taxids to their GTDB taxonomy rank.""" + """Give a list of GTDB string taxids, return a dictionary with their corresponding ranks. + Examples: + + > gtdb.get_rank(['c__Thorarchaeia', 'RS_GCF_001477695.1']) + {'c__Thorarchaeia': 'class', 'RS_GCF_001477695.1': 'subspecies'} + """ + + taxid2rank = {} + name2ids = self._get_name_translator(taxids) + overlap_ids = name2ids.values() + taxids = [item for sublist in overlap_ids for item in sublist] ids = ','.join('"%s"' % v for v in set(taxids) - {None, ''}) result = self.db.execute('SELECT taxid, rank FROM species WHERE taxid IN (%s)' % ids) - return {tax: spname for tax, spname in result.fetchall()} + for tax, rank in result.fetchall(): + taxid2rank[list(self._get_taxid_translator([tax]).values())[0]] = rank + + return taxid2rank - def get_lineage_translator(self, taxids): + def _get_lineage_translator(self, taxids): """Given a valid taxid number, return its corresponding lineage track as a hierarchically sorted list of parent taxids. """ @@ -164,7 +189,6 @@ def get_lineage_translator(self, taxids): id2lineages = {} for tax, track in result.fetchall(): id2lineages[tax] = list(map(int, reversed(track.split(",")))) - return id2lineages def get_name_lineage(self, taxnames): @@ -172,15 +196,15 @@ def get_name_lineage(self, taxnames): hierarchically sorted list of parent taxnames. """ name_lineages = [] - name2taxid = self.get_name_translator(taxnames) + name2taxid = self._get_name_translator(taxnames) for key, value in name2taxid.items(): - lineage = self.get_lineage(value[0]) - names = self.get_taxid_translator(lineage) + lineage = self._get_lineage(value[0]) + names = self._get_taxid_translator(lineage) name_lineages.append({key:[names[taxid] for taxid in lineage]}) return name_lineages - def get_lineage(self, taxid): + def _get_lineage(self, taxid): """Given a valid taxid number, return its corresponding lineage track as a hierarchically sorted list of parent taxids. """ @@ -215,7 +239,7 @@ def get_common_names(self, taxids): id2name[tax] = common_name return id2name - def get_taxid_translator(self, taxids, try_synonyms=True): + def _get_taxid_translator(self, taxids, try_synonyms=True): """Given a list of taxids, returns a dictionary with their corresponding scientific names. """ @@ -245,7 +269,7 @@ def get_taxid_translator(self, taxids, try_synonyms=True): return id2name - def get_name_translator(self, names): + def _get_name_translator(self, names): """ Given a list of taxid scientific names, returns a dictionary translating them into their corresponding taxids. Exact name match is required for translation. @@ -276,11 +300,11 @@ def get_name_translator(self, names): #name2realname[oname] = sp return name2id - def translate_to_names(self, taxids): + def _translate_to_names(self, taxids): """ Given a list of taxid numbers, returns another list with their corresponding scientific names. """ - id2name = self.get_taxid_translator(taxids) + id2name = self._get_taxid_translator(taxids) names = [] for sp in taxids: names.append(id2name.get(sp, sp)) @@ -296,7 +320,7 @@ def get_descendant_taxa(self, parent, intermediate_nodes=False, rank_limit=None, taxid = int(parent) except ValueError: try: - taxid = self.get_name_translator([parent])[parent][0] + taxid = self._get_name_translator([parent])[parent][0] except KeyError: raise ValueError('%s not found!' %parent) @@ -322,7 +346,7 @@ def get_descendant_taxa(self, parent, intermediate_nodes=False, rank_limit=None, elif found == 1: return [taxid] if rank_limit or collapse_subspecies or return_tree: - descendants_spnames = self.get_taxid_translator(list(descendants.keys())) + descendants_spnames = self._get_taxid_translator(list(descendants.keys())) #tree = self.get_topology(list(descendants.keys()), intermediate_nodes=intermediate_nodes, collapse_subspecies=collapse_subspecies, rank_limit=rank_limit) tree = self.get_topology(list(descendants_spnames.values()), intermediate_nodes=intermediate_nodes, collapse_subspecies=collapse_subspecies, rank_limit=rank_limit) if return_tree: @@ -333,10 +357,10 @@ def get_descendant_taxa(self, parent, intermediate_nodes=False, rank_limit=None, return [n.name for n in tree] elif intermediate_nodes: - return self.translate_to_names([tid for tid, count in descendants.items()]) + return self._translate_to_names([tid for tid, count in descendants.items()]) else: - self.translate_to_names([tid for tid, count in descendants.items() if count == 1]) - return self.translate_to_names([tid for tid, count in descendants.items() if count == 1]) + self._translate_to_names([tid for tid, count in descendants.items() if count == 1]) + return self._translate_to_names([tid for tid, count in descendants.items() if count == 1]) def get_topology(self, taxnames, intermediate_nodes=False, rank_limit=None, collapse_subspecies=False, annotate=True): @@ -356,7 +380,7 @@ def get_topology(self, taxnames, intermediate_nodes=False, rank_limit=None, """ from .. import PhyloTree #taxids, merged_conversion = self._translate_merged(taxids) - tax2id = self.get_name_translator(taxnames) #{'f__Korarchaeaceae': [2174], 'o__Peptococcales': [205487], 'p__Huberarchaeota': [610]} + tax2id = self._get_name_translator(taxnames) #{'f__Korarchaeaceae': [2174], 'o__Peptococcales': [205487], 'p__Huberarchaeota': [610]} taxids = [i[0] for i in tax2id.values()] if len(taxids) == 1: @@ -376,7 +400,7 @@ def get_topology(self, taxnames, intermediate_nodes=False, rank_limit=None, # If root taxid is not found in postorder, must be a tip node subtree = [root_taxid] leaves = set([v for v, count in Counter(subtree).items() if count == 1]) - tax2name = self.get_taxid_translator(list(subtree)) + tax2name = self._get_taxid_translator(list(subtree)) name2tax ={spname:taxid for taxid,spname in tax2name.items()} nodes[root_taxid] = PhyloTree({'name': str(root_taxid)}) current_parent = nodes[root_taxid] @@ -394,15 +418,15 @@ def get_topology(self, taxnames, intermediate_nodes=False, rank_limit=None, taxids = set(map(int, taxids)) sp2track = {} elem2node = {} - id2lineage = self.get_lineage_translator(taxids) + id2lineage = self._get_lineage_translator(taxids) all_taxids = set() for lineage in id2lineage.values(): all_taxids.update(lineage) - id2rank = self.get_rank(all_taxids) + id2rank = self._get_id2rank(all_taxids) - tax2name = self.get_taxid_translator(taxids) + tax2name = self._get_taxid_translator(taxids) all_taxid_codes = set([_tax for _lin in list(id2lineage.values()) for _tax in _lin]) - extra_tax2name = self.get_taxid_translator(list(all_taxid_codes - set(tax2name.keys()))) + extra_tax2name = self._get_taxid_translator(list(all_taxid_codes - set(tax2name.keys()))) tax2name.update(extra_tax2name) name2tax ={spname:taxid for taxid,spname in tax2name.items()} @@ -452,12 +476,12 @@ def get_topology(self, taxnames, intermediate_nodes=False, rank_limit=None, n.detach() if annotate: - self.annotate_tree(tree) + self.annotate_tree(tree, ignore_unclassified=False) return tree - def annotate_tree(self, t, taxid_attr='name', - tax2name=None, tax2track=None, tax2rank=None): + def annotate_tree(self, t, taxid_attr='name', tax2name=None, + tax2track=None, tax2rank=None, ignore_unclassified=False): """Annotate a tree containing taxids as leaf names. It annotates by adding the properties 'taxid', 'sci_name', @@ -481,7 +505,7 @@ def annotate_tree(self, t, taxid_attr='name', try: # translate gtdb name -> id taxaname = getattr(n, taxid_attr, n.props.get(taxid_attr)) - tid = self.get_name_translator([taxaname])[taxaname][0] + tid = self._get_name_translator([taxaname])[taxaname][0] taxids.add(tid) except (KeyError, ValueError, AttributeError): pass @@ -490,18 +514,18 @@ def annotate_tree(self, t, taxid_attr='name', taxids, merged_conversion = self._translate_merged(taxids) if not tax2name or taxids - set(map(int, list(tax2name.keys()))): - tax2name = self.get_taxid_translator(taxids) + tax2name = self._get_taxid_translator(taxids) if not tax2track or taxids - set(map(int, list(tax2track.keys()))): - tax2track = self.get_lineage_translator(taxids) - + tax2track = self._get_lineage_translator(taxids) + all_taxid_codes = set([_tax for _lin in list(tax2track.values()) for _tax in _lin]) - extra_tax2name = self.get_taxid_translator(list(all_taxid_codes - set(tax2name.keys()))) + extra_tax2name = self._get_taxid_translator(list(all_taxid_codes - set(tax2name.keys()))) tax2name.update(extra_tax2name) tax2common_name = self.get_common_names(tax2name.keys()) if not tax2rank: - tax2rank = self.get_rank(list(tax2name.keys())) + tax2rank = self._get_id2rank(list(tax2name.keys())) name2tax ={spname:taxid for taxid,spname in tax2name.items()} n2leaves = t.get_cached_content() @@ -512,10 +536,8 @@ def annotate_tree(self, t, taxid_attr='name', else: node_taxid = None node.add_prop('taxid', node_taxid) - if node_taxid: - tmp_taxid = self.get_name_translator([node_taxid]).get(node_taxid, [None])[0] - + tmp_taxid = self._get_name_translator([node_taxid]).get(node_taxid, [None])[0] if node_taxid in merged_conversion: node_taxid = merged_conversion[node_taxid] @@ -539,16 +561,30 @@ def annotate_tree(self, t, taxid_attr='name', rank = 'Unknown', named_lineage = []) else: - lineage = self._common_lineage([lf.props.get('lineage') for lf in n2leaves[node]]) + + if ignore_unclassified: + vectors = [lf.props.get('lineage') for lf in n2leaves[node] if lf.props.get('lineage')] + else: + vectors = [lf.props.get('lineage') for lf in n2leaves[node]] + lineage = self._common_lineage(vectors) + + rank = tax2rank.get(lineage[-1], 'Unknown') + if lineage[-1]: - ancestor = self.get_taxid_translator([lineage[-1]])[lineage[-1]] + if rank != 'subspecies': + ancestor = self._get_taxid_translator([lineage[-1]])[lineage[-1]] + else: + ancestor = self._get_taxid_translator([lineage[-2]])[lineage[-2]] + lineage = lineage[:-1] # remove subspecies from lineage + rank = tax2rank.get(lineage[-1], 'Unknown') # update rank else: ancestor = None + node.add_props(sci_name = tax2name.get(ancestor, str(ancestor)), common_name = tax2common_name.get(lineage[-1], ''), taxid = ancestor, lineage = lineage, - rank = tax2rank.get(lineage[-1], 'Unknown'), + rank = rank, named_lineage = [tax2name.get(tax, str(tax)) for tax in lineage]) return tax2name, tax2track, tax2rank diff --git a/ete4/ncbi_taxonomy/ncbiquery.py b/ete4/ncbi_taxonomy/ncbiquery.py index 67ffe35a..b0434d6b 100644 --- a/ete4/ncbi_taxonomy/ncbiquery.py +++ b/ete4/ncbi_taxonomy/ncbiquery.py @@ -458,7 +458,7 @@ def get_topology(self, taxids, intermediate_nodes=False, rank_limit=None, return tree def annotate_tree(self, t, taxid_attr="name", tax2name=None, - tax2track=None, tax2rank=None): + tax2track=None, tax2rank=None, ignore_unclassified=False): """Annotate a tree containing taxids as leaf names. The annotation adds the properties: 'taxid', 'sci_name', @@ -521,14 +521,18 @@ def annotate_tree(self, t, taxid_attr="name", tax2name=None, rank = 'Unknown', named_lineage = []) else: - lineage = self._common_lineage([lf.props.get('lineage') for lf in n2leaves[n]]) + if ignore_unclassified: + vectors = [lf.props.get('lineage') for lf in n2leaves[n] if lf.props.get('lineage')] + else: + vectors = [lf.props.get('lineage') for lf in n2leaves[n]] + lineage = self._common_lineage(vectors) ancestor = lineage[-1] n.add_props(sci_name = tax2name.get(ancestor, str(ancestor)), - common_name = tax2common_name.get(ancestor, ''), - taxid = ancestor, - lineage = lineage, - rank = tax2rank.get(ancestor, 'Unknown'), - named_lineage = [tax2name.get(tax, str(tax)) for tax in lineage]) + common_name = tax2common_name.get(ancestor, ''), + taxid = ancestor, + lineage = lineage, + rank = tax2rank.get(ancestor, 'Unknown'), + named_lineage = [tax2name.get(tax, str(tax)) for tax in lineage]) return tax2name, tax2track, tax2rank diff --git a/ete4/phylo/phylotree.py b/ete4/phylo/phylotree.py index 8e3b3b97..233db198 100644 --- a/ete4/phylo/phylotree.py +++ b/ete4/phylo/phylotree.py @@ -642,7 +642,7 @@ def collapse_lineage_specific_expansions(self, species=None, return_copy=True): return prunned - def annotate_ncbi_taxa(self, taxid_attr='species', tax2name=None, tax2track=None, tax2rank=None, dbfile=None): + def annotate_ncbi_taxa(self, taxid_attr='species', tax2name=None, tax2track=None, tax2rank=None, dbfile=None, ignore_unclassified=False): """Add NCBI taxonomy annotation to all descendant nodes. Leaf nodes are expected to contain a feature (name, by default) encoding a valid taxid number. @@ -694,11 +694,11 @@ def annotate_ncbi_taxa(self, taxid_attr='species', tax2name=None, tax2track=None """ ncbi = NCBITaxa(dbfile=dbfile) - return ncbi.annotate_tree(self, taxid_attr=taxid_attr, tax2name=tax2name, tax2track=tax2track, tax2rank=tax2rank) + return ncbi.annotate_tree(self, taxid_attr=taxid_attr, tax2name=tax2name, tax2track=tax2track, tax2rank=tax2rank, ignore_unclassified=ignore_unclassified) - def annotate_gtdb_taxa(self, taxid_attr='species', tax2name=None, tax2track=None, tax2rank=None, dbfile=None): + def annotate_gtdb_taxa(self, taxid_attr='species', tax2name=None, tax2track=None, tax2rank=None, dbfile=None, ignore_unclassified=False): gtdb = GTDBTaxa(dbfile=dbfile) - return gtdb.annotate_tree(self, taxid_attr=taxid_attr, tax2name=tax2name, tax2track=tax2track, tax2rank=tax2rank) + return gtdb.annotate_tree(self, taxid_attr=taxid_attr, tax2name=tax2name, tax2track=tax2track, tax2rank=tax2rank, ignore_unclassified=ignore_unclassified) def ncbi_compare(self, autodetect_duplications=True, cached_content=None): if not cached_content: diff --git a/tests/test_gtdbquery.py b/tests/test_gtdbquery.py index 57f9649d..d0038787 100644 --- a/tests/test_gtdbquery.py +++ b/tests/test_gtdbquery.py @@ -132,8 +132,6 @@ def test_04tree_annotation(self): 'o__Korarchaeales', 'f__Korarchaeaceae', 'g__Korarchaeum', 's__Korarchaeum sp003344655', 'GB_GCA_003344655.1']) - - def test_gtdbquery(self): gtdb = GTDBTaxa(dbfile=DATABASE_PATH) @@ -156,6 +154,13 @@ def test_get_topology(self): self.assertEqual(sorted(tree.leaf_names()), ['f__Korarchaeaceae', 'o__Peptococcales', 'p__Huberarchaeota']) + tree = gtdb.get_topology(['p__Huberarchaeota', 'o__Peptococcales', 'f__Korarchaeaceae', 's__Korarchaeum','RS_GCF_006228565.1', 'GB_GCA_001515945.1'], + intermediate_nodes=True, collapse_subspecies=False, annotate=True) + + # normal case with collapse_subspecies + tree = gtdb.get_topology(['p__Huberarchaeota', 'o__Peptococcales', 'f__Korarchaeaceae', 's__Korarchaeum','RS_GCF_006228565.1', 'GB_GCA_001515945.1'], + intermediate_nodes=True, collapse_subspecies=True, annotate=True) + def test_name_lineages(self): gtdb = GTDBTaxa(dbfile=DATABASE_PATH) @@ -171,6 +176,110 @@ def test_name_lineages(self): self.assertEqual(out[0]['o__Peptococcales'], ['root', 'd__Bacteria', 'p__Firmicutes_B', 'c__Peptococcia', 'o__Peptococcales']) + def test_get_rank(self): + gtdb = GTDBTaxa(dbfile=DATABASE_PATH) + ranks = gtdb.get_rank(['c__Thorarchaeia', 'RS_GCF_001477695.1']) + #{'c__Thorarchaeia': 'class', 'RS_GCF_001477695.1': 'subspecies'} + self.assertEqual(ranks, {'c__Thorarchaeia': 'class', 'RS_GCF_001477695.1': 'subspecies'}) + + def test_ignore_unclassified(self): + # normal case + gtdb = GTDBTaxa(dbfile=DATABASE_PATH) + c__Thorarchaeia = "(((((((GB_GCA_002825535.1),(GB_GCA_003345545.1),(GB_GCA_002825465.1),(GB_GCA_004524565.1),(GB_GCA_004524595.1)),((GB_GCA_011364985.1),(GB_GCA_011365025.1),(GB_GCA_001563325.1)),((GB_GCA_004524445.1)),((GB_GCA_008080745.1)),((GB_GCA_003345595.1),(GB_GCA_003345555.1)),((GB_GCA_013388835.1)),((GB_GCA_003662765.1,GB_GCA_003662805.1)),((GB_GCA_011364905.1),(GB_GCA_001563335.1),(GB_GCA_001940705.1),(GB_GCA_004376265.1),(GB_GCA_002825515.1)),((GB_GCA_013138615.1)),((GB_GCA_004524435.1)))))));" + t = PhyloTree(c__Thorarchaeia) + _, _, _= t.annotate_gtdb_taxa(taxid_attr='name', ignore_unclassified=False) + + # case0 + t0 = t['GB_GCA_011364985.1'] + self.assertEqual(t0.props.get("taxid"), 'GB_GCA_011364985.1') + self.assertEqual(t0.props.get("sci_name"), 's__SMTZ1-83 sp011364985') + self.assertEqual(t0.props.get("rank"), 'subspecies') + + t0 = t0.up + self.assertEqual(t0.props.get("taxid"), 's__SMTZ1-83 sp011364985') + self.assertEqual(t0.props.get("sci_name"), 's__SMTZ1-83 sp011364985') + self.assertEqual(t0.props.get("rank"), 'species') + + # case1 + ids = ['GB_GCA_001563325.1', 'GB_GCA_011364985.1'] # soley species + t1 = t.common_ancestor(ids) + self.assertEqual(t1.props.get("taxid"), 'g__SMTZ1-83') + self.assertEqual(t1.props.get("sci_name"), 'g__SMTZ1-83') + self.assertEqual(t1.props.get("rank"), 'genus') + self.assertEqual(t1.props.get("named_lineage"), ['root', 'd__Archaea', 'p__Asgardarchaeota', 'c__Thorarchaeia', 'o__Thorarchaeales', 'f__Thorarchaeaceae', 'g__SMTZ1-83']) + + # case2 + ids = ['GB_GCA_003662765.1', 'GB_GCA_003662805.1'] # normal hierarchy + t2 = t.common_ancestor(ids) + self.assertEqual(t2.props.get("taxid"), 's__B65-G9 sp003662765') + self.assertEqual(t2.props.get("sci_name"), 's__B65-G9 sp003662765') + self.assertEqual(t2.props.get("rank"), 'species') + self.assertEqual(t2.props.get("named_lineage"), ['root', 'd__Archaea', 'p__Asgardarchaeota', 'c__Thorarchaeia', 'o__Thorarchaeales', 'f__Thorarchaeaceae', 'g__B65-G9', 's__B65-G9 sp003662765']) + + # case3 + ids = ['GB_GCA_002825535.1', 'GB_GCA_004524435.1'] + t3 = t.common_ancestor(ids) + self.assertEqual(t3.props.get("taxid"), 'f__Thorarchaeaceae') + self.assertEqual(t3.props.get("sci_name"), 'f__Thorarchaeaceae') + self.assertEqual(t3.props.get("rank"), 'family') + + # ignore unclassified is False + c__Thorarchaeia = '(((((((GB_GCA_002825535.1),(GB_GCA_003345545.1),(GB_GCA_002825465.1),(GB_GCA_004524565.1),(GB_GCA_004524595.1)),((GB_GCA_011364985.1),(GB_GCA_011365025.1),(GB_GCA_001563325.1)),((GB_GCA_004524445.1)),((GB_GCA_008080745.1)),((GB_GCA_003345595.1),(GB_GCA_003345555.1)),((GB_GCA_013388835.1)),((GB_GCA_003662765.1,GB_GCA_003662805.1)),((GB_GCA_011364905.1),(GB_GCA_001563335.1),(GB_GCA_001940705.1),(GB_GCA_004376265.1),(GB_GCA_002825515.1)),((GB_GCA_013138615.1)),((GB_GCA_004524435.1)))))));' + t = PhyloTree(c__Thorarchaeia) + t['GB_GCA_001563325.1'].name = 'sample1' + _, _, _= t.annotate_gtdb_taxa(taxid_attr='name', ignore_unclassified=False) + + # check nomarl leaf + self.assertEqual(t['GB_GCA_011364985.1'].props.get("taxid"), 'GB_GCA_011364985.1') + self.assertEqual(t['GB_GCA_011364985.1'].props.get("sci_name"), 's__SMTZ1-83 sp011364985') + + # check unclassified leaf + self.assertEqual(t['sample1'].props.get("taxid"), 'sample1') + self.assertEqual(t['sample1'].props.get("sci_name"), '') + self.assertEqual(t['sample1'].props.get("rank"), 'Unknown') + self.assertEqual(t['sample1'].props.get("named_lineage"), []) + + # check ancestor (should be empty) + ids = ['sample1', 'GB_GCA_011364985.1'] + t1 = t.common_ancestor(ids) + self.assertEqual(t1.props.get("taxid"), None) + self.assertEqual(t1.props.get("sci_name"), 'None') + self.assertEqual(t1.props.get("rank"), 'Unknown') + self.assertEqual(t1.props.get("named_lineage"), ['']) + + self.assertEqual(t.props.get("taxid"), None) + self.assertEqual(t.props.get("sci_name"), 'None') + self.assertEqual(t.props.get("rank"), 'Unknown') + self.assertEqual(t.props.get("named_lineage"), ['']) + + # ignore unclassified is True + c__Thorarchaeia = '(((((((GB_GCA_002825535.1),(GB_GCA_003345545.1),(GB_GCA_002825465.1),(GB_GCA_004524565.1),(GB_GCA_004524595.1)),((GB_GCA_011364985.1),(GB_GCA_011365025.1),(GB_GCA_001563325.1)),((GB_GCA_004524445.1)),((GB_GCA_008080745.1)),((GB_GCA_003345595.1),(GB_GCA_003345555.1)),((GB_GCA_013388835.1)),((GB_GCA_003662765.1,GB_GCA_003662805.1)),((GB_GCA_011364905.1),(GB_GCA_001563335.1),(GB_GCA_001940705.1),(GB_GCA_004376265.1),(GB_GCA_002825515.1)),((GB_GCA_013138615.1)),((GB_GCA_004524435.1)))))));' + t = PhyloTree(c__Thorarchaeia) + t['GB_GCA_001563325.1'].name = 'sample1' + _, _, _= t.annotate_gtdb_taxa(taxid_attr='name', ignore_unclassified=True) + + # check nomarl leaf + self.assertEqual(t['GB_GCA_011364985.1'].props.get("taxid"), 'GB_GCA_011364985.1') + self.assertEqual(t['GB_GCA_011364985.1'].props.get("sci_name"), 's__SMTZ1-83 sp011364985') + + # check unclassified leaf + self.assertEqual(t['sample1'].props.get("taxid"), 'sample1') + self.assertEqual(t['sample1'].props.get("sci_name"), '') + self.assertEqual(t['sample1'].props.get("rank"), 'Unknown') + self.assertEqual(t['sample1'].props.get("named_lineage"), []) + + # check ancestor (should be annotated) + ids = ['sample1', 'GB_GCA_011364985.1'] + t1 = t.common_ancestor(ids) + self.assertEqual(t1.props.get("taxid"), 'g__SMTZ1-83') + self.assertEqual(t1.props.get("sci_name"), 'g__SMTZ1-83') + self.assertEqual(t1.props.get("rank"), 'genus') + self.assertEqual(t1.props.get("named_lineage"), ['root', 'd__Archaea', 'p__Asgardarchaeota', 'c__Thorarchaeia', 'o__Thorarchaeales', 'f__Thorarchaeaceae', 'g__SMTZ1-83']) + + self.assertEqual(t.props.get("taxid"), 'f__Thorarchaeaceae') + self.assertEqual(t.props.get("sci_name"), 'f__Thorarchaeaceae') + self.assertEqual(t.props.get("rank"), 'family') + if __name__ == '__main__': unittest.main() diff --git a/tests/test_ncbiquery.py b/tests/test_ncbiquery.py index f65be15e..49178360 100644 --- a/tests/test_ncbiquery.py +++ b/tests/test_ncbiquery.py @@ -163,9 +163,34 @@ def test_get_topology(): def test_merged_id(): ncbi = NCBITaxa(dbfile=DATABASE_PATH) - t1 = ncbi.get_lineage(649756) assert t1 == [1, 131567, 2, 1783272, 1239, 186801, 3085636, 186803, 207244, 649756] - - t2 = ncbi.get_lineage('649756') + t2 = ncbi.get_lineage("649756") assert t2 == [1, 131567, 2, 1783272, 1239, 186801, 3085636, 186803, 207244, 649756] + + +def test_ignore_unclassified(): + # normal case + tree = PhyloTree('((9606, 9598), 10090);') + tree.annotate_ncbi_taxa(taxid_attr='name', ignore_unclassified=False) + + assert tree.common_ancestor(['9606', '9598']).props.get("sci_name") == 'Homininae' + assert tree.common_ancestor(['9606', '10090']).props.get("sci_name") == 'Euarchontoglires' + + # empty case + tree = PhyloTree('((9606, sample1), 10090);') + tree.annotate_ncbi_taxa(taxid_attr='name', ignore_unclassified=False) + + assert tree.common_ancestor(['9606', 'sample1']).props.get("sci_name") == '' + assert tree.common_ancestor(['9606', 'sample1']).props.get("rank") == 'Unknown' + assert tree.common_ancestor(['9606', '10090']).props.get("sci_name") == '' + + # ignore unclassified + tree = PhyloTree('((9606, sample1), 10090);') + tree.annotate_ncbi_taxa(taxid_attr='name', ignore_unclassified=True) + + assert tree.common_ancestor(['9606', 'sample1']).props.get("sci_name") == 'Homo sapiens' + assert tree.common_ancestor(['9606', 'sample1']).props.get("rank") == 'species' + assert tree.common_ancestor(['9606', '10090']).props.get("sci_name") == 'Euarchontoglires' + +