Skip to content

Commit

Permalink
build new direct answer parser
Browse files Browse the repository at this point in the history
  • Loading branch information
JKlueber committed Mar 23, 2024
1 parent cf8300e commit 2cf94c7
Showing 1 changed file with 272 additions and 0 deletions.
272 changes: 272 additions & 0 deletions archive_query_log/parsers/warc_direct_answers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
from functools import cache
from itertools import chain
from typing import Iterable, Iterator
from urllib.parse import urljoin
from uuid import uuid5

from click import echo
from elasticsearch_dsl import Search
from elasticsearch_dsl.function import RandomScore
from elasticsearch_dsl.query import FunctionScore, Term, RankFeature, Exists
# noinspection PyProtectedMember
from lxml.etree import _Element, tostring # nosec: B410
from tqdm.auto import tqdm
from warc_s3 import WarcS3Store

from archive_query_log.config import Config
from archive_query_log.namespaces import NAMESPACE_WARC_DIRECT_ANSWER_PARSER, \
NAMESPACE_RESULT
from archive_query_log.orm import Serp, InnerParser, InnerProviderId, \
WarcDirectAnswerParserType, WarcDirectAnswerParser, WarcLocation, DirectAnswer, \
Result, InnerSerp, DirectAnswerId, InnerDownloader
from archive_query_log.parsers.warc import open_warc
from archive_query_log.parsers.xml import parse_xml_tree, safe_xpath
from archive_query_log.utils.es import safe_iter_scan, update_action
from archive_query_log.utils.time import utc_now


def add_warc_direct_answer_parser(
config: Config,
provider_id: str | None,
url_pattern_regex: str | None,
priority: float | None,
parser_type: WarcDirectAnswerParserType,
xpath: str | None,
big_box_xpath: str | None,
small_box_xpath: str | None,
right_box_xpath: str | None,
) -> None:
if priority is not None and priority <= 0:
raise ValueError("Priority must be strictly positive.")
if parser_type == "xpath":
if xpath is None:
raise ValueError("No XPath given.")
else:
raise ValueError(f"Invalid parser type: {parser_type}")
parser_id_components = (
provider_id if provider_id is not None else "",
url_pattern_regex if url_pattern_regex is not None else "",
str(priority) if priority is not None else "",
)
parser_id = str(uuid5(
NAMESPACE_WARC_DIRECT_ANSWER_PARSER,
":".join(parser_id_components),
))
parser = WarcDirectAnswerParser(
id=parser_id,
last_modified=utc_now(),
provider=InnerProviderId(id=provider_id) if provider_id else None,
url_pattern_regex=url_pattern_regex,
priority=priority,
parser_type=parser_type,
xpath=xpath,
big_box_xpath=big_box_xpath,
small_box_xpath=small_box_xpath,
right_box_xpath=right_box_xpath,
)
parser.save(using=config.es.client)


def _parse_warc_direct_answer(
parser: WarcDirectAnswerParser,
serp_id: str,
capture_url: str,
warc_store: WarcS3Store,
warc_location: WarcLocation,
) -> list[DirectAnswer] | None:
# Check if URL matches pattern.
if (parser.url_pattern is not None and
not parser.url_pattern.match(capture_url)):
return None

# Parse direct answer.
if parser.parser_type == "xpath":
if parser.xpath is None:
raise ValueError("No XPath given.")
with open_warc(warc_store, warc_location) as record:
tree = parse_xml_tree(record)
if tree is None:
return None

elements = safe_xpath(tree, parser.xpath, _Element)
if len(elements) == 0:
return None

direct_answers = []
element: _Element
for i, element in enumerate(elements):
big_box: str | None = None
if parser.big_box_xpath is not None:
big_boxs = safe_xpath(element, parser.big_box_xpath, str)
if len(big_boxs) > 0:
big_box = big_boxs[0].strip()
small_box: str | None = None
if parser.small_box_xpath is not None:
small_boxs = safe_xpath(element, parser.small_box_xpath, str)
if len(small_boxs) > 0:
small_box = small_boxs[0].strip()
right_box: str | None = None
if parser.right_box_xpath is not None:
right_boxs = safe_xpath(element, parser.right_box_xpath, str)
if len(right_boxs) > 0:
right_box = right_boxs[0].strip()

content: str = tostring(
element,
encoding=str,
method="xml",
pretty_print=False,
with_tail=True,
)
direct_answer_id_components = (
serp_id,
parser.id,
str(hash(content)),
str(i),
)
direct_answer_id = str(uuid5(
NAMESPACE_RESULT,
":".join(direct_answer_id_components),
))
direct_answers.append(DirectAnswer(
id=direct_answer_id,
rank=i,
content=content,
big_box=big_box,
small_box=small_box,
right_box=right_box,
))
return direct_answers
else:
raise ValueError(f"Unknown parser type: {parser.parser_type}")


@cache
def _warc_direct_answer_parsers(
config: Config,
provider_id: str,
) -> list[WarcDirectAnswerParser]:
parsers: Iterable[WarcDirectAnswerParser] = (
WarcDirectAnswerParser.search(using=config.es.client)
.filter(
~Exists(field="provider.id") |
Term(provider__id=provider_id)
)
.query(RankFeature(field="priority", saturation={}))
.scan()
)
parsers = safe_iter_scan(parsers)
return list(parsers)


def _parse_serp_warc_direct_answer_action(
config: Config,
serp: Serp,
) -> Iterator[dict]:
# Re-check if it can be parsed.
if (serp.warc_location is None or
serp.warc_location.file is None or
serp.warc_location.offset is None or
serp.warc_location.length is None):
return

# Re-check if parsing is necessary.
if (serp.warc_direct_answer_parser is not None and
serp.warc_direct_answer_parser.should_parse is not None and
not serp.warc_direct_answer_parser.should_parse):
return

for parser in _warc_direct_answer_parsers(config, serp.provider.id):
# Try to parse the snippets.
warc_direct_answers = _parse_warc_direct_answer(
parser=parser,
serp_id=serp.id,
capture_url=serp.capture.url,
warc_store=config.s3.warc_store,
warc_location=serp.warc_location,
)
if warc_direct_answers is None:
# Parsing was not successful, e.g., URL pattern did not match.
continue
for direct_answer in warc_direct_answers:
yield Result(
id=direct_answer.id,
last_modified=utc_now(),
archive=serp.archive,
provider=serp.provider,
capture=serp.capture,
serp=InnerSerp(
id=serp.id,
).to_dict(),
direct_answer=direct_answer,
direct_answer_parser=InnerParser(
id=parser.id,
should_parse=False,
last_parsed=utc_now(),
).to_dict(),
warc_before_serp_downloader=InnerDownloader(
should_download=True,
).to_dict(),
warc_after_serp_downloader=InnerDownloader(
should_download=True,
).to_dict(),
).to_dict(include_meta=True)
yield update_action(
serp,
warc_direct_answers=[
DirectAnswerId(
id=direct_answer.id,
rank=direct_answer.rank,
)
for direct_answer in warc_direct_answers
],
warc_direct_answers_parser=InnerParser(
id=parser.id,
should_parse=False,
last_parsed=utc_now(),
),
)
return
yield update_action(
serp,
warc_direct_answer_parser=InnerParser(
should_parse=False,
last_parsed=utc_now(),
),
)
return


def parse_serps_warc_direct_answer(config: Config) -> None:
Serp.index().refresh(using=config.es.client)
changed_serps_search: Search = (
Serp.search(using=config.es.client)
.filter(
Exists(field="warc_location") &
~Term(warc_direct_answer_parser__should_parse=False)
)
.query(
RankFeature(field="archive.priority", saturation={}) |
RankFeature(field="provider.priority", saturation={}) |
FunctionScore(functions=[RandomScore()])
)
)
num_changed_serps = changed_serps_search.count()
if num_changed_serps > 0:
changed_serps: Iterable[Serp] = (
changed_serps_search
.params(preserve_order=True)
.scan()
)
changed_serps = safe_iter_scan(changed_serps)
# noinspection PyTypeChecker
changed_serps = tqdm(
changed_serps, total=num_changed_serps,
desc="Parsing WARC direct answer", unit="SERP")
actions = chain.from_iterable(
_parse_serp_warc_direct_answer_action(config, serp)
for serp in changed_serps
)
config.es.bulk(actions)
else:
echo("No new/changed SERPs.")

0 comments on commit 2cf94c7

Please sign in to comment.