forked from MCSZ/pyontutils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
1832 lines (1561 loc) · 75 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3.6
#!/usr/bin/env pypy3
__doc__ = f"""Generate NIF parcellation schemes from external resources.
Usage:
parcellation [options]
Options:
-f --fail fail loudly on common common validation checks
-j --jobs=NJOBS number of parallel jobs to run [default: 9]
-l --local only build files with local source copies
-s --stats generate report on current parcellations
"""
import os
import re
import csv
import glob
from pathlib import Path
from collections import defaultdict, Counter
from git import Repo
from lxml import etree
from rdflib import Graph, URIRef, Namespace
from ttlser import natsort
from pyontutils.core import Class, Source, resSource, ParcOnt, LabelsBase, Collector
from pyontutils.core import makeGraph, build, relative_resources
from pyontutils.utils import async_getter, rowParse, getSourceLine, subclasses
from pyontutils.utils import TermColors as tc
from pyontutils.config import devconfig, working_dir
from pyontutils.scigraph import Vocabulary
from pyontutils.namespaces import makePrefixes, interlex_namespace, nsExact
from pyontutils.namespaces import NIFRID, ilx, ilxtr, TEMP, FSLATS
from pyontutils.namespaces import PAXMUS, PAXRAT, paxmusver, paxratver, HCPMMP
from pyontutils.namespaces import NCBITaxon, UBERON, NIFTTL
from pyontutils.combinators import annotations
from pyontutils.process_fixed import ProcessPoolExecutor
from pyontutils.closed_namespaces import rdf, rdfs, owl, dc, dcterms, skos, prov
from IPython import embed
sgv = Vocabulary(cache=True)
def swanson():
""" not really a parcellation scheme
NOTE: the defining information up here is now deprecated
it is kept around to keep the code further down happy """
source = Path(devconfig.resources, 'swanson_aligned.txt').as_posix()
ONT_PATH = 'http://ontology.neuinfo.org/NIF/ttl/generated/'
filename = 'swanson_hierarchies'
ontid = ONT_PATH + filename + '.ttl'
PREFIXES = SwansonLabels.prefixes
new_graph = makeGraph(filename, PREFIXES, writeloc='/tmp/')
new_graph.add_ont(ontid,
'Swanson brain partomies',
'Swanson 2014 Partonomies',
'This file is automatically generated from ' + source + '.' + '**FIXME**',
'now')
# FIXME citations should really go on the ... anatomy? scheme artifact
definingCitation = 'Swanson, Larry W. Neuroanatomical Terminology: a lexicon of classical origins and historical foundations. Oxford University Press, USA, 2014.'
definingCitationID = 'ISBN:9780195340624'
new_graph.add_trip(ontid, 'NIFRID:definingCitation', definingCitation)
new_graph.add_trip(ontid, 'NIFRID:definingCitationID', definingCitationID)
with open(source, 'rt') as f:
lines = [l.strip() for l in f.readlines()]
# join header on page 794
lines[635] += ' ' + lines.pop(636)
#fix for capitalization since this header is reused
fixed = ' or '.join([' ('.join([n.capitalize() for n in _.split(' (')]) for _ in lines[635].lower().split(' or ')]).replace('human','HUMAN')
lines[635] = fixed
data = []
for l in lines:
if not l.startswith('#'):
level = l.count('.'*5)
l = l.strip('.')
if ' (' in l:
if ') or' in l:
n1, l = l.split(') or')
area_name, citationP = n1.strip().split(' (')
citation = citationP.rstrip(')')
d = (level, area_name, citation, 'NEXT SYN')
data.append(d)
#print(tc.red(tc.bold(repr(d))))
area_name, citationP = l.strip().split(' (')
citation = citationP.rstrip(')')
else:
area_name = l
citation = None
d = (level, area_name, citation, None)
#print(d)
data.append(d)
results = async_getter(sgv.findByTerm, [(d[1],) for d in data])
#results = [None] * len(data)
curies = [[r['curie'] for r in _ if 'curie' in r and 'UBERON' in r['curie']] if _ else [] for _ in results]
output = [_[0] if _ else None for _ in curies]
header = ['Depth', 'Name', 'Citation', 'NextSyn', 'Uberon']
zoop = [header] + [r for r in zip(*zip(*data), output)] + \
[(0, 'Appendix END None', None, None, None)] # needed to add last appendix
# TODO annotate the appendicies and the classes with these
appendix_root_mapping = (1, 1, 1, 1, 30, 83, 69, 70, 74, 1) # should generate?
class SP(rowParse):
def __init__(self):
self.nodes = defaultdict(dict)
self._appendix = 0
self.appendicies = {}
self._last_at_level = {}
self.names = defaultdict(set)
self.children = defaultdict(set)
self.parents = defaultdict(set)
self.next_syn = False
super().__init__(zoop)
def Depth(self, value):
if self.next_syn:
self.synonym = self.next_syn
else:
self.synonym = False
self.depth = value
def Name(self, value):
self.name = value
def Citation(self, value):
self.citation = value
def NextSyn(self, value):
if value:
self.next_syn = self._rowind
else:
self.next_syn = False
def Uberon(self, value):
self.uberon = value
def _row_post(self):
# check if we are in the next appendix
# may want to xref ids between appendicies as well...
if self.depth == 0:
if self.name.startswith('Appendix'):
if self._appendix:
self.appendicies[self._appendix]['children'] = dict(self.children)
self.appendicies[self._appendix]['parents'] = dict(self.parents)
self._last_at_level = {}
self.children = defaultdict(set)
self.parents = defaultdict(set)
_, num, apname = self.name.split(' ', 2)
if num == 'END':
return
self._appendix = int(num)
self.appendicies[self._appendix] = {
'name':apname.capitalize(),
'type':self.citation.capitalize() if self.citation else None}
return
else:
if ' [' in self.name:
name, taxonB = self.name.split(' [')
self.name = name
self.appendicies[self._appendix]['taxon'] = taxonB.rstrip(']').capitalize()
else: # top level is animalia
self.appendicies[self._appendix]['taxon'] = 'ANIMALIA'.capitalize()
self.name = self.name.capitalize()
self.citation = self.citation.capitalize()
# nodes
if self.synonym:
self.nodes[self.synonym]['synonym'] = self.name
self.nodes[self.synonym]['syn-cite'] = self.citation
self.nodes[self.synonym]['syn-uberon'] = self.uberon
return
else:
if self.citation: # Transverse Longitudinal etc all @ lvl4
self.names[self.name + ' ' + self.citation].add(self._rowind)
else:
self.name += str(self._appendix) + self.nodes[self._last_at_level[self.depth - 1]]['label']
#print(level, self.name)
# can't return here because they are their own level
# replace with actually doing something...
self.nodes[self._rowind]['label'] = self.name
self.nodes[self._rowind]['citation'] = self.citation
self.nodes[self._rowind]['uberon'] = self.uberon
# edges
self._last_at_level[self.depth] = self._rowind
# TODO will need something to deal with the Lateral/
if self.depth > 0:
try:
parent = self._last_at_level[self.depth - 1]
except:
embed()
self.children[parent].add(self._rowind)
self.parents[self._rowind].add(parent)
def _end(self):
replace = {}
for asdf in [sorted(n) for k,n in self.names.items() if len(n) > 1]:
replace_with, to_replace = asdf[0], asdf[1:]
for r in to_replace:
replace[r] = replace_with
for r, rw in replace.items():
#print(self.nodes[rw])
o = self.nodes.pop(r)
#print(o)
for vals in self.appendicies.values():
children = vals['children']
parents = vals['parents']
# need reversed so children are corrected before swap
for r, rw in reversed(sorted(replace.items())):
if r in parents:
child = r
new_child = rw
parent = parents.pop(child)
parents[new_child] = parent
parent = list(parent)[0]
children[parent].remove(child)
children[parent].add(new_child)
if r in children:
parent = r
new_parent = rw
childs = children.pop(parent)
children[new_parent] = childs
for child in childs:
parents[child] = {new_parent}
self.nodes = dict(self.nodes)
sp = SP()
tp = [_ for _ in sorted(['{: <50}'.format(n['label']) + n['uberon'] if n['uberon'] else n['label'] for n in sp.nodes.values()])]
#print('\n'.join(tp))
#print(sp.appendicies[1].keys())
#print(sp.nodes[1].keys())
nbase = PREFIXES['SWAN'] + '%s'
json_ = {'nodes':[],'edges':[]}
parent = ilxtr.swansonBrainRegionConcept
for node, anns in sp.nodes.items():
nid = nbase % node
new_graph.add_class(nid, parent, label=anns['label'])
new_graph.add_trip(nid, 'NIFRID:definingCitation', anns['citation'])
json_['nodes'].append({'lbl':anns['label'],'id':'SWA:' + str(node)})
#if anns['uberon']:
#new_graph.add_trip(nid, owl.equivalentClass, anns['uberon']) # issues arrise here...
for appendix, data in sp.appendicies.items():
aid = PREFIXES['SWAA'] + str(appendix)
new_graph.add_class(aid, label=data['name'].capitalize())
new_graph.add_trip(aid, 'ilxtr:hasTaxonRank', data['taxon']) # FIXME appendix is the data artifact...
children = data['children']
ahp = 'swanr:hasPart' + str(appendix)
apo = 'swanr:partOf' + str(appendix)
new_graph.add_op(ahp, transitive=True)
new_graph.add_op(apo, inverse=ahp, transitive=True)
for parent, childs in children.items(): # FIXME does this give complete coverage?
pid = nbase % parent
for child in childs:
cid = nbase % child
new_graph.add_restriction(pid, ahp, cid) # note hierarhcy inverts direction
new_graph.add_restriction(cid, apo, pid)
json_['edges'].append({'sub':'SWA:' + str(child),'pred':apo,'obj':'SWA:' + str(parent)})
return new_graph
#
# New impl
# helpers
class DupeRecord:
def __init__(self, alt_abbrevs=tuple(), structures=tuple(), figures=None, artiris=tuple()):
self.alt_abbrevs = alt_abbrevs
self.structures = structures
self.artiris = artiris
# classes
class Artifact(Class):
""" Parcellation artifacts are the defining information sources for
parcellation labels and/or atlases in which those labels are used.
They may include semantic and/or geometric information. """
iri = ilxtr.parcellationArtifact
class_label = 'Parcellation Artifact'
_kwargs = dict(iri=None,
rdfs_label=None,
label=None,
synonyms=tuple(),
abbrevs=tuple(),
definition=None,
shortname=None,
date=None,
copyrighted=None,
version=None,
species=None,
devstage=None,
region=None,
source=None,
citation=None,
docUri=None,
comment=None,
definingCitations=tuple(),
hadDerivation=tuple(),
identifiers=tuple(),
)
propertyMapping = dict(
version=ilxtr.artifactVersion, # FIXME
date=dc.date,
sourceUri=ilxtr.sourceUri, # FIXME
copyrighted=dcterms.dateCopyrighted,
source=dc.source, # use for links to
hadDerivation=prov.hadDerivation,
identifiers=dc.identifier, # FIXME TODO
# ilxr.atlasDate
# ilxr.atlasVersion
)
propertyMapping = {**Class.propertyMapping, **propertyMapping} # FIXME make this implicit
class Terminology(Artifact):
""" A source for parcellation information that applies to one
or more spatial sources, but does not itself contain the
spatial definitions. For example Allen MBA. """
iri = ilxtr.parcellationTerminology
class_label = 'Parcellation terminology'
#class_definition = ('An artifact that only contains semantic information, '
#'not geometric information, about a parcellation.')
class CoordinateSystem(Artifact):
""" An artifact that defines the geometric coordinates used by
one or more parcellations. """
iri = ilxtr.parcellationCoordinateSystem
class_label = 'Parcellation coordinate system'
class Delineation(Artifact):
""" An artifact that defines the spatial boundaries or landmarks for a parcellation.
Delineations must be explicitly spatial and are distinct from delineation criteria
which may provide a non-spatial definition for regions. """
iri = ilxtr.parcellationDelineation
class_label = 'Parcellation delineation'
# TODO registrationCriteria => processive, usually matching delineationCriteria where practical
# TODO delineationCriteria => definitional
class Atlas(Artifact):
""" An artifact that contains information about the terminology,
delineation, and coordinate system for a parcellation. These
are usually physical atlases where it is not possibly to uniquely
identify any of the component parts, but only all the parts taken
together (e.g. via ISBN). """
iri = ilxtr.parcellationAtlas
class_label = 'Parcellation atlas'
# hasPart Delineation, hasPart CoordinateSystem, hasPart Terminology
# alternately hasPart DelineationCriteria and/or RegistrationCriteria
# TODO links to identifying atlas pictures
class LabelRoot(Class):
""" Parcellation labels are strings characthers sometimes associated
with a unique identifier, such as an index number or an iri. """
""" Base class for labels from a common source that should live in one file """
# use this to define the common superclass for a set of labels
iri = ilxtr.parcellationLabel
class_label = 'Parcellation Label'
_kwargs = dict(iri=None,
label=None,
comment=None,
shortname=None, # used to construct the rdfs:label
definingArtifacts=tuple(), # leave blank if defined for the parent class
definingArtifactsS=tuple(),
)
def __init__(self, *args, **kwargs):
for it_name in ('definingArtifacts', 'definingArtifactsS'): # TODO abstract to type
if it_name in kwargs:
kwargs[it_name] = tuple(set(kwargs[it_name]))
super().__init__(*args, **kwargs)
class Label(Class):
# allen calls these Structures (which is too narrow because of ventricles etc)
_kwargs = dict(labelRoot=None,
label=None, # this will become the skos:prefLabel
altLabel=None,
synonyms=tuple(),
abbrevs=tuple(),
definingArtifacts=tuple(), # leave blank if defined for the parent class, needed for paxinos
definingCitations=tuple(),
iri=None, # use when a class already exists and we need to know its identifier
)
def __init__(self,
usedInArtifacts=tuple(), # leave blank if 1:1 map between labelRoot and use artifacts NOTE even MBA requires validate on this
**kwargs
):
super().__init__(**kwargs)
self.usedInArtifacts = list(usedInArtifacts)
def usedInArtifact(self, artifact):
self.usedInArtifacts.append(artifact)
@property
def rdfs_label(self):
if hasattr(self, 'label'):
if hasattr(self, 'labelRoot'):
return self.label + ' (' + self.labelRoot.shortname + ')'
return self.label + ' (WARNING YOUR LABELS HAVE NO ROOT!)'
else:
return 'class not initialized but here __init__ you can have this helpful string :)'
@property
def rdfs_subClassOf(self):
return self.labelRoot.iri
class RegionRoot(Class):
""" Parcellation regions are 'anatomical entities' that correspond to some
part of a real biological system and are equivalent to an intersection
between a parcellation label and a specific version of an atlas that
defines or uses that label and that provides a definitive
(0, 1, or probabilistic) way to determine whether a particular sample
corresponds to any given region.
"""
"""
Centroid regions (anatomical entities)
species specific labels
species generic labels (no underlying species specific mapping)
Symbols ->
semantic labels -> semantic anatomical region -> point (aka unbounded connected spatial volume defined by some 'centroid' or canonical member)
parcellation labels -> probabalistic anatomical parcellation region -> probablistically bounded connected spatial volume
-> anatomical parcellation region -> bounded connected spatial volume (as long as the 3d volume is topoligically equivalent to a sphere, unconnected planes of section are fine)
"""
iri = ilxtr.parcellationRegion
class_label = 'Parcellation Region'
_kwargs = dict(iri=None,
label=None,
comment=None,
shortname=None, # used to construct the rdfs:label
atlas=None, # : Atlas
labelRoot=None) # : LabelRoot
class Region(Class):
iri = ilxtr.parcellationRegion
def __init__(self,
regionRoot,
label):
self.atlas = regionRoot.atlas
self.label = label.label
#
# ontologies
class Artifacts(Collector):
collects = Artifact
class PaxMouseAt(Atlas):
""" Any atlas artifact with Paxinos as an author for the adult rat. """
iri = ilx['paxinos/uris/mouse'] # ilxtr.paxinosMouseAtlas
class_label = 'Paxinos Mouse Atlas'
PaxMouseAtlas = Atlas(iri=PaxMouseAt.iri,
species=NCBITaxon['10090'],
devstage=UBERON['0000113'], # TODO this is 'Mature' which may not match... RnorDv:0000015 >10 weeks...
region=UBERON['0000955'],
)
PaxMouse2 = PaxMouseAt(iri=paxmusver['2'], # ilxtr.paxm2,
label='The Mouse Brain in Stereotaxic Coordinates 2nd Edition',
synonyms=('Paxinos Mouse 2nd',),
abbrevs=tuple(),
shortname='PAXMOUSE2', # TODO upper for atlas lower for label?
copyrighted='2001',
version='2nd Edition', # FIXME ??? delux edition??? what is this
citation='???????',)
PaxMouse3 = PaxMouseAt(iri=paxmusver['3'], # ilxtr.paxm3,
label='The Mouse Brain in Stereotaxic Coordinates 3rd Edition',
synonyms=('Paxinos Mouse 3rd',),
abbrevs=tuple(),
shortname='PAXMOUSE3', # TODO upper for atlas lower for label?
copyrighted='2008',
version='3rd Edition',
citation='???????',)
PaxMouse4 = PaxMouseAt(iri=paxmusver['4'], # ilxtr.paxm4,
label='The Mouse Brain in Stereotaxic Coordinates 4th Edition',
synonyms=('Paxinos Mouse 4th',),
abbrevs=tuple(),
shortname='PAXMOUSE4', # TODO upper for atlas lower for label?
copyrighted='2012',
version='4th Edition',
citation='???????',)
class PaxRatAt(Atlas):
""" Any atlas artifact with Paxinos as an author for the adult rat. """
iri = ilx['paxinos/uris/rat'] # ilxtr.paxinosRatAtlas
class_label = 'Paxinos Rat Atlas'
PaxRatAtlas = Atlas(iri=PaxRatAt.iri,
species=NCBITaxon['10116'],
devstage=UBERON['0000113'], # TODO this is 'Mature' which may not match... RnorDv:0000015 >10 weeks...
region=UBERON['0000955'],
citation=('Paxinos, George, Charles RR Watson, and Piers C. Emson. '
'"AChE-stained horizontal sections of the rat brain '
'in stereotaxic coordinates." Journal of neuroscience '
'methods 3, no. 2 (1980): 129-149.'),)
PaxRat4 = PaxRatAt(iri=ilx['paxinos/uris/rat/versions/4'], # ilxtr.paxr4,
label='The Rat Brain in Stereotaxic Coordinates 4th Edition',
synonyms=('Paxinos Rat 4th',),
abbrevs=tuple(),
shortname='PAXRAT4', # TODO upper for atlas lower for label?
copyrighted='1998',
version='4th Edition',)
PaxRat6 = PaxRatAt(iri=ilx['paxinos/uris/rat/versions/6'], # ilxtr.paxr6,
label='The Rat Brain in Stereotaxic Coordinates 6th Edition',
synonyms=('Paxinos Rat 6th',),
abbrevs=tuple(),
shortname='PAXRAT6', # TODO upper for atlas lower for label?
copyrighted='2007',
version='6th Edition',)
PaxRat7 = PaxRatAt(iri=ilx['paxinos/uris/rat/versions/7'], # ilxtr.paxr7,
label='The Rat Brain in Stereotaxic Coordinates 7th Edition',
synonyms=('Paxinos Rat 7th',
'Paxinso and Watson\'s The Rat Brain in Stereotaxic Coordinates 7th Edition', # branding >_<
),
abbrevs=tuple(),
shortname='PAXRAT7', # TODO upper for atlas lower for label?
copyrighted='2014',
version='7th Edition',)
HCPMMP = Terminology(iri=ilx['hcp/uris/mmp/versions/1.0'], # ilxtr.hcpmmpv1,
rdfs_label='Human Connectome Project Multi-Modal human cortical parcellation',
shortname='HCPMMP',
date='2016-07-20',
version='1.0',
synonyms=('Human Connectome Project Multi-Modal Parcellation',
'HCP Multi-Modal Parcellation',
'Human Connectome Project Multi-Modal Parcellation version 1.0'),
abbrevs=('HCP_MMP', 'HCP-MMP1.0', 'HCP MMP 1.0'),
citation='https://doi.org/10.1038/nature18933',
species=NCBITaxon['9606'],
region=UBERON['0000955'],
devstage=UBERON['0000113'],
)
SwansonAppendix = Terminology(iri=ilx['swanson/uris/neuroanatomical-terminology/versions/1'], # ilxtr.hcpmmpv1,
rdfs_label='Swanson Neuroanatomical Terminology',
shortname='swannt', # 2014?
#date='',
#version='1.0',
synonyms=('Swanson 2014 Appendicies',),
#abbrevs=(),
citation=('Swanson, Larry W. Neuroanatomical Terminology: '
'a lexicon of classical origins and historical foundations. '
'Oxford University Press, USA, 2014.'),
identifiers=('ISBN:9780195340624',),
species=NCBITaxon['40674'], # taxon
region=UBERON['0001016'],
comment=('Each appendix probably needs its own artifact entry because '
'the taxon rank and devstage are determined by appendix not NT.'),
#devstage=UBERON['0000113'], # FIXME multiple...
)
class parcArts(ParcOnt):
""" Ontology file for artifacts that define labels or
geometry for parcellation schemes. """
# setup
path = 'ttl/generated/'
filename = 'parcellation-artifacts'
name = 'Parcellation Artifacts'
#shortname = 'parcarts'
prefixes = {**makePrefixes('NCBITaxon', 'UBERON', 'skos'), **ParcOnt.prefixes,
'FSLATS':str(FSLATS),
'paxmusver':str(paxmusver),
'paxratver':str(paxratver),
}
def __call__(self):
return super().__call__()
@property
def _artifacts(self):
for collector in subclasses(Collector):
if collector.__module__ != 'nifstd_tools.parcellation': # just run __main__
yield from collector.arts()
def _triples(self):
from nifstd_tools.parcellation import Artifact
yield from Artifact.class_triples()
# OH LOOK PYTHON IS BEING AN AWFUL LANGUAGE AGAIN
for art_type in subclasses(Artifact): # this is ok because all subclasses are in this file...
# do not comment this out it is what makes the
# upper classes in the artifacts hierarchy
yield from art_type.class_triples()
for artifact in self._artifacts:
yield from artifact
class parcCore(ParcOnt):
""" Core OWL2 entities needed for parcellations """
# setup
path = 'ttl/generated/'
filename = 'parcellation-core'
name = 'Parcellation Core'
#shortname = 'parcore' # huehuehue
prefixes = {**makePrefixes('skos'), **ParcOnt.prefixes}
imports = NIFTTL['nif_backend.ttl'], parcArts
# stuff
parents = LabelRoot, RegionRoot
def _triples(self):
for parent in self.parents:
yield from parent.class_triples()
class RegionsBase(ParcOnt):
""" An ontology file containing parcellation regions from the
intersection of an atlas artifact and a set of labels. """
# TODO find a way to allow these to serialize into one file
__pythonOnly = True # FIXME for now perevent export
imports = parcCore,
atlas = None
labelRoot = None
def __init__(self):
self.regionRoot = RegionRoot(atlas=self.atlas,
labelRoot=self.labelRoot)
class parcBridge(ParcOnt):
""" Main bridge for importing the various files that
make up the parcellation ontology. """
# setup
path = 'ttl/bridge/'
filename = 'parcellation-bridge'
name = 'Parcellation Bridge'
imports = ((g[subclass.__name__]
if subclass.__name__ in g and subclass.__module__ == 'nifstd_tools.parcellation' # parcellation is insurance for name reuse
else subclass)
for g in (globals(),)
for subclass in subclasses(LabelsBase) # XXX wow, well apparently __main__.Class != module.Class
if not hasattr(subclass, f'_{subclass.__name__}__pythonOnly'))
@property
def __imports(self):
for subclass in subclasses(LabelsBase):
if not hasattr(subclass, f'_{subclass.__name__}__pythonOnly'):
yield subclass()
#
# Sources (input files)
class LocalSource(Source):
_data = tuple()
def __new__(cls):
line = getSourceLine(cls)
cls.iri_head = URIRef(cls.iri_prefix_hd + Path(__file__).name)
cls._this_file = Path(__file__).absolute()
repobase = working_dir
cls.repo = Repo(repobase)
cls.prov() # have to call prov here ourselves since Source only calls prov if _data is not defined
if cls.artifact is None: # for prov...
class art:
iri = cls.iri
def addPair(self, *args, **kwargs):
pass
cls.artifact = art()
self = super().__new__(cls)
return self
@classmethod
def prov(cls):
from inspect import getsourcelines
#source_lines = getSourceLine
def get_commit_data(start, end):
records = cls.repo.git.blame('--line-porcelain',
f'-L {start},{end}',
cls._this_file.as_posix()).split('\n')
rl = 13
filenames = [l.split(' ', 1)[-1].strip() for l in records[rl - 2::rl]]
linenos = [(hexsha, int(nowL), int(thenL)) for r in records[::rl]
for hexsha, nowL, thenL, *n in (r.split(' '),)]
author_times = [int(epoch) for r in records[3::rl] for _, epoch in (r.split(' '),)]
lines = [r.strip('\t') for r in records[12::rl]]
index, time = max(enumerate(author_times), key=lambda iv: iv[1])
commit, then, now = linenos[index]
filepath = filenames[index]
# there are some hefty assumptions that go into this
# that other lines have not been deleted from or added to the code block
# between commits, or essentially that the code in the block is the
# same length and has only been shifted by the distance defined by the
# single commit that that has the maximum timestamp, so beware that
# this can and will break which is why I use start and end instead of
# just start like I do with the rest of the lines where I know for sure.
# This can probably be improved with pickaxe or similar.
shift = then - now
then_start = start + shift
then_end = end + shift
return filepath, commit, then_start, then_end
source_lines, start = getsourcelines(cls)
end = start + len(source_lines)
filepath, most_recent_block_commit, then_start, then_end = get_commit_data(start, end)
cls.iri = URIRef(cls.iri_prefix_working_dir.format(file_commit=most_recent_block_commit)
+ f'{filepath}#L{then_start}-L{then_end}')
##
# Instances
##
# Source instances TODO put everything under one class as we do for Artifacts?
class SwansonAppendix(resSource):
sourceFile = relative_resources('swanson_aligned.txt')
artifact = Artifacts.SwansonAppendix
class SwansonLabels(ParcOnt): # FIXME not labels...
filename = 'swanson'
name = 'Swanson 2014 partonomies'
shortname = 'swannt'
imports = parcCore,
prefixes = {**makePrefixes('NIFRID', 'ilxtr', 'prov'),
'swanr':interlex_namespace('swanson/uris/readable/'),
'SWAN':interlex_namespace('swanson/uris/neuroanatomical-terminology/terms/'),
'SWAA':interlex_namespace('swanson/uris/neuroanatomical-terminology/appendix/'),}
sources = SwansonAppendix,
namespace = prefixes['SWAN']
root = LabelRoot(iri=nsExact(namespace), # FIXME this is not really a label in the strict sense
label='Swanson label root',
shortname=shortname,
definingArtifacts=(s.artifact.iri for s in sources),)
def _triples(self):
for s, p, o in swanson().g:
#if p != rdf.type and o != owl.Ontology:
if s != URIRef('http://ontology.neuinfo.org/NIF/ttl/generated/swanson_hierarchies.ttl'):
if p == rdfs.subClassOf and o == ilxtr.swansonBrainRegionConcept:
yield s, p, self.root.iri
elif p == rdfs.label:
yield s, p, Label(label=o, labelRoot=self.root).rdfs_label
yield s, skos.prefLabel, o
else:
yield s, p, o
class PaxSr_6(resSource):
sourceFile = relative_resources('paxinos09names.txt')
artifact = Artifacts.PaxRat6
@classmethod
def loadData(cls):
with open(cls.source, 'rt') as f:
lines = [l.rsplit('#')[0].strip() for l in f.readlines() if not l.startswith('#')]
return [l.rsplit(' ', 1) for l in lines]
@classmethod
def processData(cls):
structRecs = []
out = {}
for structure, abrv in cls.raw:
structRecs.append((abrv, structure))
if abrv in out:
out[abrv][0].append(structure)
else:
out[abrv] = ([structure], ())
return structRecs, out
@classmethod
def validate(cls, structRecs, out):
print(Counter(_[0] for _ in structRecs).most_common()[:5])
print(Counter(_[1] for _ in structRecs).most_common()[:5])
assert len(structRecs) == len([s for sl, _ in out.values() for s in sl]), 'There are non-unique abbreviations'
errata = {}
return out, errata
class PaxSrAr(resSource):
artifact = None
@classmethod
def parseData(cls):
a, b = cls.raw.split('List of Structures')
if not a:
los, loa = b.split('List of Abbreviations')
else:
los = b
_, loa = a.split('List of Abbreviations')
sr = []
for l in los.split('\n'):
if l and not l[0] == ';':
if ';' in l:
l, *comment = l.split(';')
l = l.strip()
print(l, comment)
#asdf = l.rsplit(' ', 1)
#print(asdf)
struct, abbrev = l.rsplit(' ', 1)
sr.append((abbrev, struct))
ar = []
for l in loa.split('\n'):
if l and not l[0] == ';':
if ';' in l:
l, *comment = l.split(';')
l = l.strip()
print(l, comment)
#asdf = l.rsplit(' ', 1)
#print(asdf)
abbrev, rest = l.split(' ', 1)
parts = rest.split(' ')
#print(parts)
for i, pr in enumerate(parts[::-1]):
#print(i, pr)
z = pr[0].isdigit()
if not z or i > 0 and z and pr[-1] != ',':
break
struct = ' '.join(parts[:-i])
figs = tuple(tuple(int(_) for _ in p.split('-'))
if '-' in p
else (tuple(f'{nl[:-1]}{l}'
for nl, *ls in p.split(',')
for l in (nl[-1], *ls))
if ',' in p or p[-1].isalpha()
else int(p))
for p in (_.rstrip(',') for _ in parts[-i:]))
figs = tuple(f for f in figs if f) # zero marks abbrevs in index that are not in figures
#print(struct)
ar.append((abbrev, struct, figs))
return sr, ar
@classmethod
def processData(cls):
sr, ar = cls.parseData()
out = {}
achild = {}
for a, s, f in ar:
if ', layer 1' in s or s.endswith(' layer 1'): # DTT1 ends in ' layer 1' without a comma
achild[a[:-1]] = a
continue # remove the precomposed, we will deal with them systematically
if a not in out:
out[a] = ([s], f)
else:
if s not in out[a][0]:
print(f'Found new label from ar for {a}:\n{s}\n{out[a][0]}')
out[a][0].append(s)
schild = {}
for a, s in sr:
if ', layer 1' in s or s.endswith(' layer 1'):
schild[a[:-1]] = a
continue # remove the precomposed, we will deal with them systematically
if a not in out:
out[a] = ([s], tuple())
else:
if s not in out[a][0]:
print(f'Found new label from sr for {a}:\n{s}\n{out[a][0]}')
out[a][0].append(s)
#raise TypeError(f'Mismatched labels on {a}: {s} {out[a][0]}')
return sr, ar, out, achild, schild
@classmethod
def validate(cls, sr, ar, out, achild, schild):
def missing(a, b):
am = a - b
bm = b - a
return am, bm
sabs = set(_[0] for _ in sr)
aabs = set(_[0] for _ in ar)
ssts = set(_[1] for _ in sr)
asts = set(_[1] for _ in ar)
ar2 = set(_[:2] for _ in ar)
aam, sam = missing(aabs, sabs)
asm, ssm = missing(asts, ssts)
ar2m, sr2m = missing(ar2, set(sr))
print('OK to skip')
print(sorted(aam))
print('Need to be created')
print(sorted(sam))
print()
print(sorted(asm))
print()
print(sorted(ssm))
print()
#print(sorted(ar2m))
#print()
#print(sorted(sr2m))
#print()
assert all(s in achild for s in schild), f'somehow the kids dont match {achild} {schild}\n' + str(sorted(set(a) - set(s) | set(s) - set(a)
for a, s in ((tuple(sorted(achild.items())),
tuple(sorted(schild.items()))),)))
for k, (structs, figs) in out.items():
for struct in structs:
assert not re.match('\d+-\d+', struct) and not re.match('\d+$', struct), f'bad struct {struct} in {k}'
errata = {'nodes with layers':achild}
return out, errata
class PaxSrAr_4(PaxSrAr):
sourceFile = relative_resources('pax-4th-ed-indexes.txt')
artifact = Artifacts.PaxRat4
class PaxSrAr_6(PaxSrAr):
sourceFile = relative_resources('pax-6th-ed-indexes.txt')
artifact = Artifacts.PaxRat6
class PaxMSrAr_2(PaxSrAr):
sourceFile = relative_resources('paxm-2nd-ed-indexes.txt')
artifact = Artifacts.PaxMouse2
class PaxMSrAr_3(PaxSrAr):
sourceFile = relative_resources('paxm-3rd-ed-indexes.txt')
artifact = Artifacts.PaxMouse3
class PaxTree_6(Source):
source = '~/ni/dev/nifstd/paxinos/tree.txt'
artifact = Artifacts.PaxRat6
@classmethod
def loadData(cls):
with open(os.path.expanduser(cls.source), 'rt') as f:
return [l for l in f.read().split('\n') if l]
@classmethod
def processData(cls):
out = {}
recs = []
parent_stack = [None]
old_depth = 0
layers = {}
for l in cls.raw:
depth, abbrev, _, name = l.split(' ', 3)
depth = len(depth)
if old_depth < depth: # don't change
parent = parent_stack[-1]
parent_stack.append(abbrev)
old_depth = depth
elif old_depth == depth:
if len(parent_stack) - 1 > depth:
parent_stack.pop()
parent = parent_stack[-1]
parent_stack.append(abbrev)
elif old_depth > depth: # bump back
for _ in range(old_depth - depth + 1):
parent_stack.pop()
parent = parent_stack[-1]
parent_stack.append(abbrev)
old_depth = depth
struct = None if name == '-------' else name
o = (depth, abbrev, struct, parent)
if '-' in abbrev: