Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PA Parser [SDESK-7427] #2851

Merged
merged 4 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions superdesk/io/feed_parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,4 @@ class EmailFeedParser(FeedParser, metaclass=ABCMeta):
from superdesk.io.feed_parsers.ap_media import APMediaFeedParser # NOQA
from superdesk.io.feed_parsers.dpa_newsml import DPAFeedParser # NOQA
from superdesk.io.feed_parsers.afp_newsml_1_2_new import AFPNewsMLFeedParser # NOQA
from superdesk.io.feed_parsers.pa_parser import PAParser # NOQA
174 changes: 174 additions & 0 deletions superdesk/io/feed_parsers/pa_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from superdesk.io.feed_parsers.nitf import NITFFeedParser
from superdesk.io.registry import register_feed_parser
from superdesk.etree import etree
from superdesk.utc import utc
from datetime import datetime
import re


class PAParser(NITFFeedParser):
"""
Feed Parser for PA (Press Association) XML files.
"""

NAME = "pa_parser"
label = "PA Parser"

def can_parse(self, xml):
"""
Check if the XML can be parsed by this parser.
"""
return xml.tag == "document" and xml.find("nitf") is not None

def parse(self, xml, provider=None):
"""
Parse the XML and return a single dictionary representing the news item.
"""
item = {}
nitf = xml.find("nitf")
if nitf is not None:
self.parse_head(nitf, item)
self.parse_body(nitf, item)
self.parse_resource(xml, item)
return item

def parse_head(self, nitf, item):
"""
Parse the head section of the NITF document.
"""
head = nitf.find("head")
if head is not None:
title = head.find("title")
if title is not None and title.text:
item["headline"] = title.text

def parse_body(self, nitf, item):
"""
Parse the body section of the NITF document.
"""
body = nitf.find("body")
if body is not None:
self.parse_body_head(body, item)
self.parse_body_content(body, item)

def parse_body_head(self, body, item):
"""
Parse the body.head section of the NITF document.
"""
body_head = body.find("body.head")
if body_head is not None:
self.parse_hedline(body_head, item)
self.parse_byline(body_head, item)

def parse_hedline(self, body_head, item):
"""
Parse the hedline section of the NITF document.
"""
hedline = body_head.find("hedline")
if hedline is not None:
hl1 = hedline.find("hl1")
if hl1 is not None and hl1.text:
item["headline"] = hl1.text

def parse_byline(self, body_head, item):
"""
Parse the byline section of the NITF document.
"""
byline = body_head.find("byline")
if byline is not None:
bytag = byline.find("bytag")
if bytag is not None and bytag.text:
item["byline"] = bytag.text

def parse_body_content(self, body, item):
"""
Parse the body.content section of the NITF document and clean up HTML content.
"""
body_content = body.find("body.content")
if body_content is not None:
body_html = etree.tostring(body_content, encoding="unicode", method="html")
parser = etree.HTMLParser()
tree = etree.fromstring(body_html, parser)
etree.strip_tags(tree, "section", "body", "span", "body.content")
cleaned_html = "".join(etree.tostring(child, encoding="unicode", method="html") for child in tree)
item["body_html"] = cleaned_html

def parse_resource(self, xml, item):
"""
Parse the Resource section of the XML document.
"""
resource = xml.find(".//xn:Resource", namespaces={"xn": "http://www.xmlnews.org/namespaces/meta#"})
if resource is not None:
for vendor_data in resource.findall(
".//xn:vendorData", namespaces={"xn": "http://www.xmlnews.org/namespaces/meta#"}
):
if vendor_data.text and "PRESSUK_:Document ID=" in vendor_data.text:
document_id = vendor_data.text.split("PRESSUK_:Document ID=")[-1].strip()
if document_id:
item["guid"] = document_id
break
self.parse_versioncreated(resource, item)
self.parse_firstcreated(resource, item)
self.parse_abstract(resource, item)
self.parse_usageterms(resource, item)
self.parse_word_count(resource, item)
self.parse_keywords(resource, item)

def parse_versioncreated(self, resource, item):
"""
Parse the versioncreated timestamp from the Resource section.
"""
publication_time = resource.find(
"xn:publicationTime", namespaces={"xn": "http://www.xmlnews.org/namespaces/meta#"}
)
if publication_time is not None and publication_time.text:
item["versioncreated"] = datetime.strptime(publication_time.text, "%Y-%m-%dT%H:%M:%S+00:00").replace(
tzinfo=utc
)

def parse_firstcreated(self, resource, item):
"""
Parse the firstcreated timestamp from the Resource section.
"""
received_time = resource.find("xn:receivedTime", namespaces={"xn": "http://www.xmlnews.org/namespaces/meta#"})
if received_time is not None and received_time.text:
item["firstcreated"] = datetime.strptime(received_time.text, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=utc)

def parse_abstract(self, resource, item):
"""
Parse the abstract from the Resource section.
"""
description = resource.find("xn:description", namespaces={"xn": "http://www.xmlnews.org/namespaces/meta#"})
if description is not None and description.text:
item["abstract"] = description.text

def parse_usageterms(self, resource, item):
"""
Parse the usage terms (copyright) from the Resource section.
"""
copyright = resource.find("xn:copyright", namespaces={"xn": "http://www.xmlnews.org/namespaces/meta#"})
if copyright is not None and copyright.text:
item["usageterms"] = copyright.text

def parse_word_count(self, resource, item):
for vendor_data in resource.findall("{http://www.xmlnews.org/namespaces/meta#}vendorData"):
if vendor_data.text and "PRESSUK_:Word Count=" in vendor_data.text:
word_count = vendor_data.text.split("PRESSUK_:Word Count=")[-1].strip()
if word_count:
try:
item["word_count"] = int(word_count)
except ValueError:
pass

def parse_keywords(self, resource, item):
keywords = []
for vendor_data in resource.findall("{http://www.xmlnews.org/namespaces/meta#}vendorData"):
if vendor_data.text and "PRESSUK_:Keyword=" in vendor_data.text:
keyword = vendor_data.text.split("PRESSUK_:Keyword=")[-1].strip()
if keyword:
keywords.append(keyword)
if keywords:
item["keywords"] = keywords


register_feed_parser(PAParser.NAME, PAParser())
105 changes: 105 additions & 0 deletions tests/io/feed_parsers/pa_parser_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-
#
# This file is part of Superdesk.
#
# Copyright 2024, Sourcefabric z.u. and contributors.
#
# For the full copyright and license information, please see the
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

import os
from superdesk.tests import TestCase
from superdesk.io.feed_parsers.pa_parser import PAParser
from superdesk.etree import etree


class PAParserTestCase(TestCase):
"""
Test case for the PA Parser.
"""

def setUp(self):
super().setUp()
self.dirname = os.path.dirname(os.path.realpath(__file__))
self.fixture = os.path.normpath(os.path.join(self.dirname, "../fixtures/pa_parser.xml"))
self.provider = {"name": "Test"}
with open(self.fixture, "rb") as f:
xml = etree.parse(f)
self.item = PAParser().parse(xml.getroot(), self.provider)

def test_headline(self):
"""
Test if the headline is correctly parsed.
"""
self.assertEqual(
self.item.get("headline"),
"MORE THAN FOUR IN FIVE SCHOOL LEADERS ABUSED BY PARENTS IN PAST YEAR - SURVEY",
)

def test_byline(self):
"""
Test if the byline is correctly parsed.
"""
self.assertEqual(
self.item.get("byline"),
"By Eleanor Busby, PA Education Correspondent",
)

def test_versioncreated(self):
"""
Test if the versioncreated timestamp is correctly parsed.
"""
self.assertEqual(
self.item.get("versioncreated").isoformat(),
"2025-03-04T00:01:00+00:00",
)

def test_firstcreated(self):
"""
Test if the firstcreated timestamp is correctly parsed.
"""
self.assertEqual(
self.item.get("firstcreated").isoformat(),
"2025-03-03T02:45:45+00:00",
)

def test_abstract(self):
"""
Test if the abstract is correctly parsed.
"""
self.assertEqual(
self.item.get("abstract"),
"The majority of school leaders have reported being abused by parents in the past year, a survey has suggested.",
)

def test_keywords(self):
"""
Test if the keywords are correctly parsed.
"""
self.assertIn("EDUCATION", self.item.get("keywords"))

def test_word_count(self):
"""
Test if the word count is correctly parsed.
"""
self.assertEqual(self.item.get("word_count"), 749)

def test_body_html(self):
"""
Test if the body HTML is correctly parsed.
"""
body_html = self.item.get("body_html")
self.assertTrue(body_html.startswith("<p>The majority of school leaders have reported being abused by parents"))
self.assertIn("<p>More than two in five school leaders (42%)", body_html)
self.assertIn("<p>One in 10 school leaders said they had suffered physical violence", body_html)

def test_usageterms(self):
"""
Test if the usage terms (copyright) are correctly parsed.
"""
self.assertEqual(
self.item.get("usageterms"),
"Press Association 2025",
)
Loading