From 474a3797f684c74a8d1d22c1a5af206cc7496893 Mon Sep 17 00:00:00 2001 From: AlessioNar Date: Sun, 24 Nov 2024 11:00:29 +0100 Subject: [PATCH] Added validation, improved parsing, added parsing of conclusions, increased explainability in self.parse() function. Added tests --- op_cellar/parsers/akomantoso.py | 194 ++++++++++++++++++++++++++++--- tests/parsers/test_akomantoso.py | 18 ++- 2 files changed, 193 insertions(+), 19 deletions(-) diff --git a/op_cellar/parsers/akomantoso.py b/op_cellar/parsers/akomantoso.py index b3d0d41..7b31131 100644 --- a/op_cellar/parsers/akomantoso.py +++ b/op_cellar/parsers/akomantoso.py @@ -1,6 +1,9 @@ from .parser import Parser import re from lxml import etree +import os + + class AkomaNtosoParser(Parser): """ @@ -87,6 +90,16 @@ def get_root(self, file: str): tree = etree.parse(f) self.root = tree.getroot() return self.root + + def get_meta(self): + meta_data = { + "meta_identification" : self.get_meta_identification(), + "meta_proprietary" : self.get_meta_proprietary(), + "meta_references" : self.get_meta_references() + } + + self.meta = meta_data + def get_meta_identification(self): """ @@ -105,12 +118,12 @@ def get_meta_identification(self): if identification is None: return None - frbr_data = { + meta_identification = { 'work': self._get_frbr_work(identification), 'expression': self._get_frbr_expression(identification), 'manifestation': self._get_frbr_manifestation(identification) } - return frbr_data + return meta_identification def _get_frbr_work(self, identification): """ @@ -214,11 +227,12 @@ def get_meta_references(self): if references is None: return None - return { + meta_references = { 'eId': references.get('eId'), 'href': references.get('href'), 'showAs': references.get('showAs') } + return meta_references def get_meta_proprietary(self): """ @@ -242,7 +256,7 @@ def get_meta_proprietary(self): if document_ref is None: return None - return { + meta_proprietary = { 'file': document_ref.get('FILE'), 'coll': document_ref.find('fmx:COLL', namespaces=self.namespaces).text, 'year': document_ref.find('fmx:YEAR', namespaces=self.namespaces).text, @@ -250,6 +264,8 @@ def get_meta_proprietary(self): 'no_seq': proprietary.find('fmx:NO.SEQ', namespaces=self.namespaces).text # Add other elements as needed } + + return meta_proprietary def get_preface(self) -> None: """ @@ -290,7 +306,7 @@ def get_preamble(self): 'citations': self.get_preamble_citations(), 'recitals': self.get_preamble_recitals() } - return preamble_data + self.preamble = preamble_data def get_preamble_formula(self): """ @@ -440,7 +456,6 @@ def get_chapters(self) -> None: 'chapter_heading': ''.join(chapter_heading.itertext()).strip() if chapter_heading is not None else None }) - return None def get_articles(self) -> None: """ @@ -533,21 +548,166 @@ def get_text_by_eId(self, node): } elements.append(element) return elements + + def get_conclusions(self): + """ + Extracts conclusions information from the document. - def parse(self, file: str) -> list[dict]: + Returns + ------- + None """ - Parses an Akoma Ntoso file to extract provisions as individual sentences. - + conclusions_section = self.root.find('.//akn:conclusions', namespaces=self.namespaces) + if conclusions_section is None: + return None + + # Find the container with signatures + container = conclusions_section.find('.//akn:container[@name="signature"]', namespaces=self.namespaces) + if container is None: + return None + + # Extract date from the first + date_element = container.find('.//akn:date', namespaces=self.namespaces) + signature_date = date_element.text if date_element is not None else None + + # Extract all signatures + signatures = [] + for p in container.findall('akn:p', namespaces=self.namespaces): + # For each

, find all tags + paragraph_signatures = [] + for signature in p.findall('akn:signature', namespaces=self.namespaces): + # Collect text within the , including nested elements + signature_text = ''.join(signature.itertext()).strip() + paragraph_signatures.append(signature_text) + + # Add the paragraph's signatures as a group + if paragraph_signatures: + signatures.append(paragraph_signatures) + + # Store parsed conclusions data + self.conclusions = { + 'date': signature_date, + 'signatures': signatures + } + + def load_schema(self): + """ + Loads the XSD schema for XML validation using an absolute path. + """ + try: + # Resolve the absolute path to the XSD file + base_dir = os.path.dirname(os.path.abspath(__file__)) + schema_path = os.path.join(base_dir, 'assets', 'akomantoso30.xsd') + + # Parse the schema + with open(schema_path, 'r') as f: + schema_doc = etree.parse(f) + self.schema = etree.XMLSchema(schema_doc) + print("Schema loaded successfully.") + except Exception as e: + print(f"Error loading schema: {e}") + + def validate(self, file: str) -> bool: + """ + Validates an XML file against the loaded XSD schema. + Args: - file (str): The path to the Akoma Ntoso XML file. - + file (str): Path to the XML file to validate. + Returns: - list[dict]: List of extracted provisions with CELEX ID, sentence text, and eId. + bool: True if the XML file is valid, False otherwise. + """ + if not self.schema: + print("No schema loaded. Please load an XSD schema first.") + return False + + try: + with open(file, 'r', encoding='utf-8') as f: + xml_doc = etree.parse(f) + self.schema.assertValid(xml_doc) + print(f"{file} is valid.") + return True + except etree.DocumentInvalid as e: + print(f"{file} is invalid. Validation errors: {e}") + return False + except Exception as e: + print(f"An error occurred during validation: {e}") + return False + + def parse(self, file: str) -> list[dict]: """ - self.get_root(file) - self.get_body() - self.get_chapters() - self.get_articles() - self.get_preface() + Parses an Akoma Ntoso file to extract provisions as individual sentences. + This method sequentially calls various parsing functions to extract metadata, + preface, preamble, body, chapters, articles, and conclusions from the XML file. + It logs errors encountered during parsing and provides debug information about + the structure of the document. + Args: + file (str): The path to the Akoma Ntoso XML file. + + Returns: + list[dict]: List of extracted provisions with CELEX ID, sentence text, and eId, + along with debug information. + """ + debug_info = {} + + try: + self.load_schema() + self.validate(file) + except Exception as e: + print(f'Invalid Akoma Ntoso file: parsing may not work or work only partially: {e}') + + try: + self.get_root(file) + print("Root element loaded successfully.") + except Exception as e: + print(f"Error in get_root: {e}") + + try: + self.get_meta() + debug_info['meta'] = self.meta if hasattr(self, 'meta') else "Meta not parsed." + print("Meta parsed successfully.") + except Exception as e: + print(f"Error in get_meta: {e}") + + try: + self.get_preface() + debug_info['preface'] = self.preface if hasattr(self, 'preface') else 0 + print(f"Preface parsed successfully. Preface: {debug_info['preface']}") + except Exception as e: + print(f"Error in get_preface: {e}") + + try: + self.get_preamble() + debug_info['preamble'] = len(self.preamble['recitals']) if hasattr(self, 'preamble') and 'recitals' in self.preamble else 0 + print(f"Preamble parsed successfully. Number of recitals: {debug_info['preamble']}") + except Exception as e: + print(f"Error in get_preamble: {e}") + + try: + self.get_body() + print("Body parsed successfully.") + except Exception as e: + print(f"Error in get_body: {e}") + + try: + self.get_chapters() + debug_info['chapters'] = len(self.chapters) if hasattr(self, 'chapters') else 0 + print(f"Chapters parsed successfully. Number of chapters: {debug_info['chapters']}") + except Exception as e: + print(f"Error in get_chapters: {e}") + + try: + self.get_articles() + debug_info['articles'] = len(self.articles) if hasattr(self, 'articles') else 0 + print(f"Articles parsed successfully. Number of articles: {debug_info['articles']}") + except Exception as e: + print(f"Error in get_articles: {e}") + + try: + self.get_conclusions() + debug_info['conclusions'] = self.conclusions if hasattr(self, 'conclusions') else "Conclusions not parsed." + print(f"Conclusions parsed successfully. Conclusions: {self.conclusions}") + except Exception as e: + print(f"Error in get_conclusions: {e}") \ No newline at end of file diff --git a/tests/parsers/test_akomantoso.py b/tests/parsers/test_akomantoso.py index 9d50b6f..b530ec2 100644 --- a/tests/parsers/test_akomantoso.py +++ b/tests/parsers/test_akomantoso.py @@ -54,8 +54,8 @@ def test_get_preface(self): def test_get_preamble(self): """Test retrieval of preamble data from the XML file.""" - preamble_data = self.parser.get_preamble() - self.assertIsNotNone(preamble_data, "Preamble data not found") + self.parser.get_preamble() + self.assertIsNotNone(self.parser.preamble, "Preamble data not found") def test_get_preamble_formula(self): """Test extraction of formula text within the preamble.""" @@ -122,6 +122,20 @@ def test_get_articles(self): self.parser.get_articles() self.assertEqual(len(self.parser.articles), 31, "Incorrect number of articles extracted") + + def test_get_conclusions(self): + # Expected output + expected_conclusions = { + 'date': '23 July 2014', + 'signatures': [ + ["Done at Brussels, 23 July 2014."], + ['For the European Parliament', 'The President', 'M. Schulz'], + ['For the Council', 'The President', 'S. Gozi'] + ] + } + # Test get_conclusions method + self.parser.get_conclusions() + self.assertEqual(self.parser.conclusions, expected_conclusions, "Parsed conclusions do not match expected output") if __name__ == '__main__': unittest.main()