5
5
"""
6
6
import logging
7
7
import urllib
8
- from typing import Union , Dict , Tuple , List , Any , Optional , Iterable
8
+ from typing import Union , Dict , Tuple , List , Any , Optional , Iterable , Iterator
9
9
10
10
from dataclasses import dataclass
11
11
19
19
from schema_automator .importers .import_engine import ImportEngine
20
20
import schema_automator .metamodels .cadsr as cadsr
21
21
22
+ ID_LABEL_PAIR = Tuple [str , str ]
22
23
23
24
TMAP = {
24
25
"DATE" : "date" ,
38
39
"Floating-point" : "float" ,
39
40
}
40
41
42
+ def extract_concepts (concepts : List [cadsr .Concept ]) -> Tuple [ID_LABEL_PAIR , List [str ]]:
43
+ main = None
44
+ rest = []
45
+ if not concepts :
46
+ raise ValueError ("No concepts" )
47
+ for concept in concepts :
48
+ if concept .evsSource != "NCI_CONCEPT_CODE" :
49
+ continue
50
+ id = f"NCIT:{ concept .conceptCode .strip ()} "
51
+ pair = id , concept .longName
52
+ if concept .primaryIndicator == "Yes" :
53
+ if main :
54
+ raise ValueError (f"Multiple primary for: { concepts } " )
55
+ main = pair
56
+ else :
57
+ rest .append (id )
58
+ if not main :
59
+ logging .warning (f"No primary, using arbitrary from { rest } " )
60
+ main = rest [0 ]
61
+ rest = rest [1 :]
62
+ return main , rest
63
+
41
64
@dataclass
42
65
class CADSRImportEngine (ImportEngine ):
43
66
"""
@@ -94,19 +117,30 @@ def convert(self, paths: Iterable[str], id: str=None, name: str=None, **kwargs)
94
117
title = cde .preferredName ,
95
118
description = cde .preferredDefinition ,
96
119
aliases = [cde .longName ],
120
+ conforms_to = f"cadsr:DataElement" ,
97
121
source = source ,
98
122
)
123
+ # each data element belongs to a concept
124
+ # (may be reused across classes?)
99
125
slots [slot .name ] = slot
100
126
concept = cde .DataElementConcept
101
- concept_name = urllib .parse .quote (camelcase (f"{ ctxt } { concept .preferredName } " ))
102
- parent_concept_name = urllib .parse .quote (camelcase (concept .longName ))
127
+ # a concept is linked to a class
128
+ objectClass = concept .ObjectClass
129
+ # NCIT concepts describing the class
130
+ mainConcept , mappings = extract_concepts (objectClass .Concepts )
131
+ class_name = objectClass .longName
132
+ concept_name = urllib .parse .quote (camelcase (f"{ ctxt } { class_name } " ))
133
+ parent_concept_name = urllib .parse .quote (class_name )
103
134
if parent_concept_name not in classes :
104
135
parent_cls = ClassDefinition (
105
136
name = parent_concept_name ,
106
- title = concept .preferredName ,
107
- description = concept .preferredDefinition ,
137
+ title = objectClass .preferredName ,
138
+ description = objectClass .preferredDefinition ,
108
139
#aliases=[concept.longName],
109
- class_uri = f"cadsr:{ concept .publicId } " ,
140
+ class_uri = f"cadsr:{ objectClass .publicId } " ,
141
+ exact_mappings = [mainConcept [0 ]],
142
+ broad_mappings = mappings ,
143
+ conforms_to = f"cadsr:ObjectClass" ,
110
144
)
111
145
classes [parent_concept_name ] = parent_cls
112
146
if concept_name not in classes :
@@ -117,14 +151,23 @@ def convert(self, paths: Iterable[str], id: str=None, name: str=None, **kwargs)
117
151
aliases = [concept .longName ],
118
152
class_uri = f"cadsr:{ concept .publicId } " ,
119
153
is_a = parent_concept_name ,
154
+ conforms_to = f"cadsr:DataElementConcept" ,
120
155
)
121
156
classes [concept_name ] = cls
122
157
else :
123
158
cls = classes [concept_name ]
124
159
cls .slots .append (slot .name )
125
- objectClass = concept .ObjectClass
126
- # TODO
160
+ # In theory the ObjectClass should link to a general class of utility in NCIT.
161
+ # In practice the actual concept may not be so useful. E.g. in 2724331
162
+ # "Agent Adverse Event Attribution Name" the DataConcept is
163
+ # Agent (C1708) defined as "An active power or cause (as principle,
164
+ # substance, physical or biological factor, etc.) that produces a specific effect."
165
+ # which is very upper-ontological
166
+ #for ocConcept in objectClass.Concepts:
167
+ # if ocConcept.evsSource == "NCI_CONCEPT_CODE":
168
+ # cls.is_a = f"NCIT:{ocConcept.conceptCode}"
127
169
valueDomain = cde .ValueDomain
170
+ # TODO
128
171
conceptualDomain = valueDomain .ConceptualDomain
129
172
pvs = valueDomain .PermissibleValues
130
173
if pvs :
@@ -140,7 +183,7 @@ def convert(self, paths: Iterable[str], id: str=None, name: str=None, **kwargs)
140
183
rng = enum_name
141
184
for pv in pvs :
142
185
# url encode the value to escape symbols like <, >, etc.
143
- pv_value = urllib .parse .quote (pv .value )
186
+ pv_value = urllib .parse .quote (pv .value ). replace ( "%20" , " " )
144
187
tgt_pv = PermissibleValue (
145
188
text = pv_value ,
146
189
title = pv .value ,
@@ -151,9 +194,10 @@ def convert(self, paths: Iterable[str], id: str=None, name: str=None, **kwargs)
151
194
tgt_pv .title = vm .preferredName
152
195
if not tgt_pv .description :
153
196
tgt_pv .description = vm .preferredDefinition
154
- for c in vm .Concepts :
155
- code = c .conceptCode .strip ()
156
- tgt_pv .meaning = f"NCIT:{ code } "
197
+ if vm .Concepts :
198
+ mainConcept , mappings = extract_concepts (vm .Concepts )
199
+ tgt_pv .meaning = mainConcept [0 ]
200
+ tgt_pv .broad_mappings = mappings
157
201
else :
158
202
datatype = valueDomain .dataType
159
203
rng = TMAP .get (datatype , "string" )
@@ -179,6 +223,56 @@ def convert(self, paths: Iterable[str], id: str=None, name: str=None, **kwargs)
179
223
schema .enums = enums
180
224
return schema
181
225
226
+ def as_rows (self , paths : Iterable [str ], ** kwargs ) -> Iterator [Dict ]:
227
+ for path in paths :
228
+ logging .info (f"Loading { path } " )
229
+ with (open (path ) as file ):
230
+ container : cadsr .DataElementContainer
231
+ container = json_loader .load (file , target_class = cadsr .DataElementContainer )
232
+ cde = container .DataElement
233
+ yield from self ._obj_as_rows (cde , path )
234
+
235
+ def _obj_as_rows (self , e : Union [cadsr .DataElement , cadsr .DataElementConcept , cadsr .Concept , cadsr .Property , cadsr .ObjectClass , cadsr .ConceptualDomain ,
236
+ cadsr .ValueDomain , cadsr .PermissibleValue , cadsr .ValueMeaning ], parent_id : str ) -> Iterator [Dict ]:
237
+ if isinstance (e , cadsr .Concept ):
238
+ obj = {
239
+ "id" : e .conceptCode ,
240
+ "context" : e .evsSource ,
241
+ "longName" : e .longName ,
242
+ }
243
+ elif isinstance (e , cadsr .CDEPermissibleValue ):
244
+ obj = {
245
+ "id" : e .publicId ,
246
+ "value" : e .value ,
247
+ "valueDescription" : e .valueDescription ,
248
+ }
249
+ else :
250
+ obj = {
251
+ "id" : e .publicId ,
252
+ "preferredName" : e .preferredName ,
253
+ "context" : e .context ,
254
+ "longName" : e .longName ,
255
+ }
256
+ obj ["parentId" ] = parent_id
257
+ obj ["type" ] = type (e ).class_name
258
+ id = obj ["id" ]
259
+ yield obj
260
+ if isinstance (e , cadsr .DataElement ):
261
+ yield from self ._obj_as_rows (e .DataElementConcept , id )
262
+ yield from self ._obj_as_rows (e .ValueDomain , id )
263
+ elif isinstance (e , cadsr .DataElementConcept ):
264
+ yield from self ._obj_as_rows (e .ObjectClass , id )
265
+ yield from self ._obj_as_rows (e .Property , id )
266
+ yield from self ._obj_as_rows (e .ConceptualDomain , id )
267
+ elif isinstance (e , cadsr .ValueDomain ):
268
+ for pv in e .PermissibleValues :
269
+ yield from self ._obj_as_rows (pv .ValueMeaning , id )
270
+ if isinstance (e , (cadsr .ObjectClass , cadsr .Property , cadsr .PermissibleValue )):
271
+ for c in e .Concepts :
272
+ yield from self ._obj_as_rows (c , id )
273
+
274
+
275
+
182
276
183
277
184
278
0 commit comments