Skip to content

Commit

Permalink
refactor: move rights/license and format/media type graph in seperate…
Browse files Browse the repository at this point in the history
… methods
  • Loading branch information
sarusarah committed Oct 3, 2023
1 parent ec6d3dc commit cf54498
Showing 1 changed file with 115 additions and 111 deletions.
226 changes: 115 additions & 111 deletions ckanext/dcatapchharvest/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,53 +799,8 @@ def graph_from_dataset(self, dataset_dict, dataset_ref): # noqa
('spatial', DCT.spatial, None, Literal),
]

if resource_dict.get('rights'):
rights_uri = dh.get_license_uri_by_name(
resource_dict.get('rights')
)
if rights_uri is not None:
rights_ref = URIRef(rights_uri)
g.add((rights_ref, RDF.type, DCT.RightsStatement))
g.add((distribution, DCT.rights, rights_ref))
if rights_uri is None:
rights_name = dh.get_license_name_by_uri(
resource_dict.get('rights')
)
if rights_name is not None:
resource_rights_ref = URIRef(
resource_dict.get('rights')
)
g.add((
resource_rights_ref,
RDF.type,
DCT.RightsStatement)
)
g.add((distribution, DCT.rights, resource_rights_ref))

if resource_dict.get('license'):
license_uri = dh.get_license_uri_by_name(
resource_dict.get('license')
)
if license_uri is not None:
license_ref = URIRef(license_uri)
g.add((license_ref, RDF.type, DCT.LicenseDocument))
g.add((distribution, DCT.license, license_ref))
if license_uri is None:
license_name = dh.get_license_name_by_uri(
resource_dict.get('license')
)
if license_name is not None:
resource_license_ref = URIRef(
resource_dict.get('license')
)
g.add((
resource_license_ref,
RDF.type,
DCT.LicenseDocument)
)
g.add(
(distribution, DCT.license, resource_license_ref)
)
self._rights_and_license_to_graph(resource_dict, distribution)
self._format_and_media_type_to_graph(resource_dict, distribution)

self._add_triples_from_dict(resource_dict, distribution, items)
self._add_multilang_value(distribution, DCT.title, 'display_name', resource_dict) # noqa
Expand Down Expand Up @@ -891,67 +846,6 @@ def graph_from_dataset(self, dataset_dict, dataset_ref): # noqa
g.add((doc, RDF.type, FOAF.Document))
g.add((distribution, FOAF.page, doc))

# Format and Media Type Case 1:
# Format: Set Format value if format matches EU vocabulary
format_uri = None
if resource_dict.get('format'):
for key, value in valid_formats.items():
if resource_dict.get('format') == key:
format_uri = URIRef(value)
g.add((
distribution,
DCT['format'],
format_uri
))
# Media Type: Set Format value if format matches EU vocabulary
# and media type is not set
if format_uri and resource_dict.get('media_type') is None:
g.add((
distribution,
DCT['media_type'],
format_uri
))

# Format and Media Type Case 2:
# Set Media Type and Formar value
# if format does not match eu vocabulary
# but media type matches iana vocabulary
media_type_uri = None
if format_uri is None and resource_dict.get('media_type'):
for key, value in valid_media_types.items():
if resource_dict.get('media_type') == key:
media_type_uri = URIRef(value)
g.add((
distribution,
DCT['format'],
media_type_uri
))
g.add((
distribution,
DCT['media_type'],
media_type_uri
))

# Format and Media Type Case 3:
# Set Media Type and Format value
# if format does not match eu vocabulary
# but format matches iana vocabulary
if format_uri is None and media_type_uri is None:
if resource_dict.get('format'):
for key, value in valid_media_types.items():
if resource_dict.get('format') == key:
media_type_uri_by_format = URIRef(value)
g.add((
distribution,
DCT['format'],
media_type_uri_by_format
))
g.add((
distribution,
DCT['media_type'],
media_type_uri_by_format
))

# Mime-Type
if resource_dict.get('mimetype'):
g.add((
Expand All @@ -966,14 +860,124 @@ def graph_from_dataset(self, dataset_dict, dataset_ref): # noqa
('modified', DCT.modified, None, Literal),
]

self._add_date_triples_from_dict(resource_dict, distribution,
items)

# ByteSize
if resource_dict.get('byte_size'):
g.add((distribution, DCAT.byteSize,
Literal(resource_dict['byte_size'])))

def _rights_and_license_to_graph(self, resource_dict, distribution):
g = self.g
if resource_dict.get('rights'):
rights_uri = dh.get_license_uri_by_name(
resource_dict.get('rights')
)
if rights_uri is not None:
rights_ref = URIRef(rights_uri)
g.add((rights_ref, RDF.type, DCT.RightsStatement))
g.add((distribution, DCT.rights, rights_ref))
if rights_uri is None:
rights_name = dh.get_license_name_by_uri(
resource_dict.get('rights')
)
if rights_name is not None:
resource_rights_ref = URIRef(
resource_dict.get('rights')
)
g.add((
resource_rights_ref,
RDF.type,
DCT.RightsStatement)
)
g.add((distribution, DCT.rights, resource_rights_ref))

if resource_dict.get('license'):
license_uri = dh.get_license_uri_by_name(
resource_dict.get('license')
)
if license_uri is not None:
license_ref = URIRef(license_uri)
g.add((license_ref, RDF.type, DCT.LicenseDocument))
g.add((distribution, DCT.license, license_ref))
if license_uri is None:
license_name = dh.get_license_name_by_uri(
resource_dict.get('license')
)
if license_name is not None:
resource_license_ref = URIRef(
resource_dict.get('license')
)
g.add((
resource_license_ref,
RDF.type,
DCT.LicenseDocument)
)
g.add(
(distribution, DCT.license, resource_license_ref)
)

def _format_and_media_type_to_graph(self, resource_dict, distribution): # noqa
g = self.g
# Format and Media Type Case 1:
# Format: Set Format value if format matches EU vocabulary
format_uri = None
if resource_dict.get('format'):
for key, value in valid_formats.items():
if resource_dict.get('format') == key:
format_uri = URIRef(value)
g.add((
distribution,
DCT['format'],
format_uri
))
# Media Type: Set Format value if format matches EU vocabulary
# and media type is not set
if format_uri and resource_dict.get('media_type') is None:
g.add((
distribution,
DCAT.mediaType,
format_uri
))

# Format and Media Type Case 2:
# Set Media Type and Formar value
# if format does not match eu vocabulary
# but media type matches iana vocabulary
media_type_uri = None
if format_uri is None and resource_dict.get('media_type'):
for key, value in valid_media_types.items():
if resource_dict.get('media_type') == key:
media_type_uri = URIRef(value)
g.add((
distribution,
DCT['format'],
media_type_uri
))
g.add((
distribution,
DCAT.mediaType,
media_type_uri
))

# Format and Media Type Case 3:
# Set Media Type and Format value
# if format does not match eu vocabulary
# but format matches iana vocabulary
if format_uri is None and media_type_uri is None:
if resource_dict.get('format'):
for key, value in valid_media_types.items():
if resource_dict.get('format') == key:
media_type_uri_by_format = URIRef(value)
g.add((
distribution,
DCT['format'],
media_type_uri_by_format
))
g.add((
distribution,
DCAT.mediaType,
media_type_uri_by_format
))

def graph_from_catalog(self, catalog_dict, catalog_ref):
g = self.g
g.add((catalog_ref, RDF.type, DCAT.Catalog))
Expand Down

0 comments on commit cf54498

Please sign in to comment.