diff --git a/.gitignore b/.gitignore index b383fa4..e10c8a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ ### Project structure ### data/* -outputs/* .vscode +out/* ### Contentious file types (can be removed at user discretion) ### diff --git a/README.md b/README.md index 3190723..d1d130a 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,37 @@ Once pre-commits are activated, whenever you commit to this repository a series **NOTE:** Pre-commit hooks execute Python, so it expects a working Python build. ## Usage -Details to follow... +[theyworkforyou.com](https://www.theyworkforyou.com) + +By default, parliamentary content from the previous day (and anything so far on the current day) will be reviewed. However, a number of flags are available for use from the command line. The main time filtering behaviours can be summarised as follows: + +- previous day (default) e.g. + +``` bash +$ python scripts/theyworkforyou.py +``` + +- specify day with optional end date (`-d` or `--end`) e.g. + +``` bash +$ python scripts/theyworkforyou.py -d 2024-05-20 +``` + +- range from start date (`-s` or `--start`) to end date e.g. + +``` bash +$ python scripts/theyworkforyou.py -s 2024-05-20 -d 2024-05-24 +``` + +- look behind from end date (optional) by a specified window of days (inclusive; `-n` or `--window`) e.g. + +``` bash +$ python scripts/theyworkforyou.py -d 2024-05-24 -n 3 +``` + +Additionally, the `-w` or `--weekly` flag can be used to generate a report for the previous week e.g. a Wednesday to a Wednesday. The `-f` or `--form` flag can also be applied to specify a preferred date format (other than the default of %Y-%m-%d). + +This demo is tested on the Gemma (1st version) model. It assumes that [Ollama](https://www.ollama.com) has been installed locally and the required model has been downloaded before using ParliAI. ### Workflow Details to follow... diff --git a/data/.gitkeep b/out/.gitkeep similarity index 100% rename from data/.gitkeep rename to out/.gitkeep diff --git a/scripts/theyworkforyou.py b/scripts/theyworkforyou.py new file mode 100644 index 0000000..9265f13 --- /dev/null +++ b/scripts/theyworkforyou.py @@ -0,0 +1,215 @@ +"""Script for extracting parliamentary content from TheyWorkForYou.""" + +import argparse +import datetime as dt +import os + +import tqdm + +from parliai_public import dates +from parliai_public.readers import Debates, WrittenAnswers + + +def create_reader( + reader_class: type[Debates] | type[WrittenAnswers], + toml: None | str = None, + date_list: None | list[dt.date] = None, + llm_name: None | str = None, +): + """ + Create an instance of a reader class. + + Parameters + ---------- + reader_class : type[Debates] | type[WrittenAnswers] + Class to instantiate. + toml : str, optional + Path to TOML configuration file. If not specified, the + default for the class is used. + date_list : list[dt.date], optional + List of dates to cover. If not specified, the default + for the reader class is used. + llm_name : str, optional + Name of model (only locally-installed Ollama-based LLMs + in this demo). 'gemma' by default. + + Returns + ------- + reader : Debates | WrittenAnswers + An instantiated reader. + """ + + reader = reader_class.from_toml(toml) + if date_list: + reader.dates = date_list + reader.llm_name = "gemma" if llm_name is None else llm_name + + return reader + + +def make_summary( + reader: Debates | WrittenAnswers, + header: str, + save: bool = True, +) -> str: + """ + Collect and summarise the latest entries in Parliament. + + Users have a choice for how they would like to define "latest": + + 1. Providing a specific date. + 2. Defining a reporting period with start and end dates. + 3. Specifying a date and a number of days to look back over + (inclusive of the provided end date). + 4. Providing nothing will have the reader only look at yesterday. + + Parameters + ---------- + reader : Debates | WrittenAnswers + Reader to use in analysis. + header : str + Section header for the reader. + save : bool, default=True + Whether to save the collected and analysed transcripts. + + Returns + ------- + summary : str + Stylised summary of entries in Markdown syntax. + """ + + entries = reader.retrieve_latest_entries() + sections = [] + content = "" + + if entries: + width = max(map(len, entries)) + for entry in (pbar := tqdm.tqdm(entries)): + pbar.set_description(f"Processing {entry.ljust(width)}") + page = reader.read(entry) + if page: + analysed = reader.analyse(page) + rendering = reader.render(analysed) + sections.append(rendering) + if save: + reader.save(analysed) + + content = "\n\n".join(sections) + + if content == "": + content = "No relevant content found for this period." + + summary = "\n\n".join((header, content)) + + return summary + + +def main(): + """Summarise the latest communications in Parliament.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + "-s", + "--start", + type=str, + required=False, + help="start of reporting period (default format YYYY-MM-DD)", + ) + parser.add_argument( + "-d", + "--end", + type=str, + required=False, + help="end of reporting period (default format YYYY-MM-DD)", + ) + parser.add_argument( + "-n", + "--window", + type=int, + required=False, + help="length of reporting period (inclusive of `end`)", + ) + parser.add_argument( + "-f", + "--form", + type=str, + default="%Y-%m-%d", + help="date string format using directive notation (default %Y-%m-%d)", + ) + parser.add_argument( + "--debates-toml", + type=str, + required=False, + help="path to debates TOML configuration file", + ) + parser.add_argument( + "--written-toml", + type=str, + required=False, + help="path to written answers TOML configuration file", + ) + parser.add_argument( + "-w", + "--weekly", + required=False, + action="store_true", + help="trigger a weekly report from today", + ) + parser.add_argument( + "--no-save", + required=False, + action="store_true", + help="do not save data from collected pages", + ) + args = vars(parser.parse_args()) + + start = args.get("start") + end = args.get("end") + window = args.get("window") + form = args["form"] + save = not args["no_save"] + + if args.get("weekly"): + start, end, window = None, None, 8 + + date_list = None + if start or end or window: + date_list = dates.list_dates(start, end, window, form) + + debates = create_reader( + reader_class=Debates, + toml=args.get("debates_toml"), + date_list=date_list, + ) + written = create_reader( + reader_class=WrittenAnswers, + toml=args.get("written_toml"), + date_list=date_list, + ) + + # TODO: refactor to single LLM instantiation + debates.instantiate_llm() + written.instantiate_llm() + + debates.make_outdir() + written.outdir = debates.outdir + + summary = "\n\n".join( + ( + debates.make_header(urls=debates.urls + written.urls), + make_summary(debates, "# Debates", save), + make_summary( + written, "# Written answers (UK Parliament only)", save + ), + ) + ) + + print("Saving summary...") + with open(os.path.join(debates.outdir, "summary.md"), "w") as f: + f.write(summary) + + print("Done! ✅") + + +if __name__ == "__main__": + main() diff --git a/src/parliai_public/_config/debates.toml b/src/parliai_public/_config/debates.toml index 5e01f2f..ebf06c2 100644 --- a/src/parliai_public/_config/debates.toml +++ b/src/parliai_public/_config/debates.toml @@ -24,4 +24,4 @@ Now extract all relevant content from the following text: outdir = "out/theyworkforyou" -llm_name = "gemma:2b" +llm_name = "gemma" diff --git a/src/parliai_public/_config/wrans.toml b/src/parliai_public/_config/wrans.toml index 3e60ffe..23b52bb 100644 --- a/src/parliai_public/_config/wrans.toml +++ b/src/parliai_public/_config/wrans.toml @@ -16,4 +16,4 @@ Now extract all relevant content from the following text: outdir = "out/theyworkforyou" -llm_name = "gemma:2b" +llm_name = "gemma" diff --git a/src/parliai_public/dates.py b/src/parliai_public/dates.py new file mode 100644 index 0000000..e761693 --- /dev/null +++ b/src/parliai_public/dates.py @@ -0,0 +1,126 @@ +"""Functions for handling dates for our reader classes.""" + +import datetime as dt +import warnings + + +def list_dates( + start: None | str | dt.date | dt.datetime = None, + end: None | str | dt.date | dt.datetime = None, + window: None | int = None, + form: str = "%Y-%m-%d", +) -> list[dt.date]: + """ + Create a continuous list of dates. + + Currently, we support three ways of defining your list: + + 1. End-points: start and end dates + 2. Look behind: optional end date and a window + 3. Single date: optional end date + + We do not allow for looking ahead, but that may be introduced in a + future release. + + Parameters + ---------- + start : str | dt.date | dt.datetime, optional + Start of the period. If not specified, this is ignored. + end : str | dt.date | dt.datetime, optional + End of the period. If not specified, this is taken as today. + window : int, optional + Number of days to look back from `end`. If `start` is specified, + this is ignored. + form : str, default="%Y-%m-%d" + Format of any date strings. + + Returns + ------- + dates : list[dt.date] + List of dates. + """ + start = _format_date(start, form) + end = _format_date(end, form) or dt.date.today() + + _check_date_parameters(start, end, window) + + window = window or 1 + if isinstance(start, dt.date): + window = (end - start).days + 1 + + return [end - dt.timedelta(days=x) for x in range(window)][::-1] + + +def _format_date( + date: None | str | dt.date | dt.datetime, form: str = "%Y-%m-%d" +) -> None | dt.date: + """ + Format a date-like object into a proper `dt.date`. + + Dates and `None` pass straight through. Meanwhile, date(time) + strings are converted into datetime objects and then datetime + objects are turned into dates. + + Parameters + ---------- + date : None | str | dt.date | dt.datetime + Date-like object to be converted. + form : str, default="%Y-%m-%d" + Format of date string. + + Returns + ------- + date : None | dt.date + Formatted date object, or a passed-through `None`. + """ + if isinstance(date, str): + date = dt.datetime.strptime(date, form) + if isinstance(date, dt.datetime): + date = date.date() + + return date + + +def _check_date_parameters( + start: None | dt.date, end: dt.date, window: None | int +) -> None: + """ + Check the provided date-forming parameters are valid. + + Valid combinations are start and end points, an end and a window, or + just an end. The checks mostly check for logical consistency - such + as not having dates in the future. + + Parameters + ---------- + start : None | dt.date + Start of period. + end : dt.date + End of period. + window : None | int + Length of period. + + Warns + ----- + UserWarning + If a start and window are provided, we warn the user that the + window will be ignored. + + Raises + ------ + ValueError + If either start or end are in the future, or if start is later + than end. + """ + if start and window: + message = "Ignoring window as start and end dates specified." + warnings.warn(message, UserWarning) + + if end > dt.date.today(): + raise ValueError("End date must not be in the future.") + + if isinstance(start, dt.date): + if start > dt.date.today(): + raise ValueError("Start date must not be in the future.") + if start > end: + raise ValueError("Start date must not be after end date.") diff --git a/src/parliai_public/readers/__init__.py b/src/parliai_public/readers/__init__.py new file mode 100644 index 0000000..dad4aef --- /dev/null +++ b/src/parliai_public/readers/__init__.py @@ -0,0 +1,5 @@ +"""Reader classes for pulling down and analysing activity.""" + +from .theyworkforyou import Debates, WrittenAnswers + +__all__ = ["Debates", "WrittenAnswers"] diff --git a/src/parliai_public/readers/base.py b/src/parliai_public/readers/base.py new file mode 100644 index 0000000..67dad2d --- /dev/null +++ b/src/parliai_public/readers/base.py @@ -0,0 +1,554 @@ +"""Base class for other readers to inherit from.""" + +import abc +import datetime as dt +import json +import os +import re +from importlib import resources +from typing import Iterable +from urllib.parse import urlparse + +import requests +import toml +from bs4 import BeautifulSoup +from langchain.docstore.document import Document +from langchain.prompts import PromptTemplate +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain_community.chat_models import ChatOllama + +from parliai_public import dates + + +class BaseReader(metaclass=abc.ABCMeta): + """ + A base class for readers to inherit. + + This class is not to be used in practice except for inheritance. + + To make your own reader class, you can inherit from this base class + and implement the following methods: + + - `retrieve_latest_entries`: gather the URLs of the latest pages to + be read, analysed, and rendered by the class + - `_read_metadata` (static): extract whatever metadata you might + need from the HTML soup of a web page and its URL + - `_read_contents` (static): extract the core (text) content from + the HTML soup of a web page + - `render`: create a Markdown string to summarise the relevant + content on a web page + + Parameters + ---------- + urls : list[str] + List of URLs from which to gather content. + terms : Iterable[str], optional + Key terms to filter content on. By default, we look for any + mention of `Office for National Statistics` or `ONS`. + dates : list[dt.date], optional + List of dates from which to pull entries. The `parliai.dates` + module may be of help. If not specified, only yesterday is used. + outdir : str, default="out" + Location of a directory in which to write outputs. + prompt : str, optional + System prompt provided to the LLM. If not specified, this is + read from the default configuration file. + llm_name : str, optional + Full name of the LLM (or version) to be accessed. Must be one + available in Ollama and previously downloaded locally. + llm : ChatOllama, optional + Chat model wrapper. + """ + + _default_config: str = "base.toml" + _source: None | str = None + + def __init__( + self, + urls: list[str], + terms: None | Iterable[str] = None, + dates: None | list[dt.date] = None, + outdir: str = "out", + prompt: None | str = None, + llm_name: None | str = None, + llm: None | ChatOllama = None, + ) -> None: + self.urls = urls + self.terms = ( + terms + or toml.load("src/parliai_public/_config/base.toml")["keywords"] + ) + self.dates = dates or [dt.date.today() - dt.timedelta(days=1)] + self.outdir = outdir + + config = self._load_config() + self.prompt = prompt or config["prompt"] + self.llm_name = llm_name or config["llm_name"] + self.llm = llm + + @classmethod + def _load_config(cls, path: None | str = None) -> dict: + """ + Load a configuration file from disk. + + If no path is supplied, the default is used for the class. + + Parameters + ---------- + path : str, optional + Path to configuration file. If `None`, the default is used. + + Returns + ------- + config : dict + Dictionary containing configuration details. + """ + + if isinstance(path, str): + return toml.load(path) + + where = resources.files("parliai_public._config") + with resources.as_file(where.joinpath(cls._default_config)) as c: + config = toml.load(c) + + return config + + @classmethod + def from_toml(cls, path: None | str = None) -> "BaseReader": + """ + Create an instance of the class from a configuration TOML file. + + A complete configuration file will include all parameters listed + in the doc-string of this class. + + Parameters + ---------- + path : str, optional + Path to configuration file. If `None`, the default is used. + + Returns + ------- + reader : BaseReader + reader instance. + """ + + config = cls._load_config(path) + + start = config.pop("start", None) + end = config.pop("end", None) + window = config.pop("window", None) + form = config.pop("form", "%Y-%m-%d") + + config["dates"] = None + if start or end or window: + config["dates"] = dates.list_dates(start, end, window, form) + + return cls(**config) + + def check_contains_terms(self, string: str) -> bool: + """ + Check whether a string contains any of the search terms. + + If you have not specified any search terms, this function + returns `True`. + + This function determines a term is contained in the search + string using a regular expression. Using the standard `in` + operator on two strings would lead to false positives. For + instance, it would say the term "dog" is in the phrase + "dogmatism is the greatest of mental obstacles to human + happiness," which is not our intention. + + Instead, we flag a term as being present if it appears at either + end of the string or in the middle with certain surrounding + characters: + + - The term may be preceded by whitespace, square brackets or + parentheses. + - The term may be followed by whitespace, brackets, or a small + selection of punctuation, including things like commas and + full stops. + + Parameters + ---------- + string : str + String to be checked. + + Returns + ------- + contains : bool + Whether the string contains any search terms. + """ + + terms = self.terms + if not terms: + return True + + string = string.lower() + for term in map(str.lower, terms): + match = re.search( + rf"(^|(?<=[\('\[\s])){term}(?=[\)\]\s!?.,:;'-]|$)", string + ) + if match: + return True + + return False + + def make_outdir(self) -> None: + """ + Create the output directory for a run. + + Attributes + ---------- + outdir : str + Updated output directory, defined by the runtime parameters. + """ + + start, end = min(self.dates), max(self.dates) + period = ".".join(map(dt.date.isoformat, [start, end])) + name = ".".join((period, self.llm_name)) + + outdir = os.path.join(self.outdir, name) + outdir = self._tag_outdir(outdir) + + os.makedirs(outdir) + self.outdir = outdir + + def _tag_outdir(self, outdir: str) -> str: + """ + Determine a unique version for the output directory and tag it. + + If the output directory already exists, then we add a number tag + to the end of the directory name. This number is incremental. + + Parameters + ---------- + outdir : str + Output directory path. + + Returns + ------- + outdir : str + Potentially updated directory path. + """ + + if not os.path.exists(outdir): + return outdir + + tag = 1 + while os.path.exists(updated := ".".join((outdir, str(tag)))): + tag += 1 + + return updated + + @abc.abstractmethod + def retrieve_latest_entries(self) -> list[str]: + """ + Replace with method for getting the latest entries to analyse. + + Returns + ------- + entries : list[str] + List of web pages from which to draw down relevant + information. + """ + + def get(self, url: str, check: bool = True) -> None | BeautifulSoup: + """ + Retrieve the HTML soup for a web page. + + Parameters + ---------- + url : str + Link to the web page. + check : bool, default=True + Whether to check the page for any relevant terms. Default is + to do so. + + Returns + ------- + soup : None | bs4.BeautifulSoup + HTML soup of the web page if the page contains any relevant + terms. Otherwise, `None`. + """ + + page = requests.get(url) + soup = BeautifulSoup(page.content, "html.parser") + if (not check) or ( + check and self.check_contains_terms(soup.get_text()) + ): + return soup + + def read(self, url: str) -> None | dict: + """ + Read a web page, and return its contents if it is relevant. + + Parameters + ---------- + url : str + Link to the web page to read. + + Returns + ------- + page : None | dict + If the web page is relevant, return a dictionary format of + the page text and metadata. Otherwise, `None`. + """ + + soup = self.get(url) + page = None + if soup is not None: + metadata = self._read_metadata(url, soup) + contents = self._read_contents(soup) + page = {**metadata, **contents} + + return page + + @abc.abstractmethod + def _read_metadata(self, url: str, soup: BeautifulSoup) -> dict: + """ + Replace with method to read metadata from an entry. + + Parameters + ---------- + url : str + URL of the entry. + soup : bs4.BeautifulSoup + HTML soup of the entry. + + Returns + ------- + metadata : dict + Dictionary containing the relevant metadata. + """ + + @abc.abstractmethod + def _read_contents(self, soup: BeautifulSoup) -> dict: + """ + Replace with method to read text content from some HTML soup. + + Parameters + ---------- + soup : bs4.BeautifulSoup + HTML soup of a webpage. + + Returns + ------- + text : dict + Dictionary containing any of the relevant contents on the + webpage in plain-text format. + """ + + def instantiate_llm(self) -> None: + """Instantiate LLM object per user specification.""" + + self.llm = ChatOllama(model=self.llm_name, temperature=0) + + return None + + def analyse(self, transcript: dict) -> dict: + """ + Send some text to the LLM for analysis (and receive a response). + + Parameters + ---------- + transcript : dict + Web page transcript with a `text` entry to be analysed. + + Returns + ------- + transcript : dict + Updated transcript with the LLM response. + """ + + chunks = self._split_text_into_chunks(transcript["text"]) + + responses = [] + for chunk in chunks: + if self.check_contains_terms(chunk.page_content): + response = self._analyse_chunk(chunk) + responses.append(response) + + transcript["response"] = "\n\n".join(responses) + + return transcript + + def clean_response(self, response: str): + """ + Remove 'Sure....:' preamble if gemma model used. + + Parameters + ---------- + response : str + Raw response from LLM. + + Returns + ------- + response : str + Cleaned response. + """ + + response = re.sub(r"^Sure(.*?\:)\s*", "", response) + + return response + + @staticmethod + def _split_text_into_chunks( + text: str, + sep: str = ". ", + size: int = 4000, + overlap: int = 1000, + ) -> list[Document]: + r""" + Split a debate into chunks to be processed by the LLM. + + Some of the speeches within a single debate can get very large, + making them intractable for the LLM. + + Parameters + ---------- + text : str + Text to be split. + sep : str + Separator to define natural chunks. Defaults to `. `. + size : int + Chunk size to aim for. Defaults to 20,000 tokens. + overlap : int + Overlap between chunks. Defaults to 4,000 tokens. + + Returns + ------- + chunks : list[Document] + Chunked-up text for processing. + """ + + splitter = RecursiveCharacterTextSplitter( + separators=sep, + chunk_size=size, + chunk_overlap=overlap, + length_function=len, + keep_separator=False, + is_separator_regex=False, + ) + + return splitter.create_documents([text]) + + def _analyse_chunk(self, chunk: Document) -> str: + """ + Extract the relevant content from a chunk using LLM. + + Parameters + ---------- + chunk : langchain.docstore.document.Document + Document with the chunk contents to be processed. + + Returns + ------- + response : str + LLM response, lightly formatted. + """ + + prompt_template = PromptTemplate( + input_variables=["keywords", "text"], template=self.prompt + ) + prompt = prompt_template.format( + keywords=self.terms, text=chunk.page_content + ) + + llm = self.llm + response = llm.invoke(prompt).content.strip() + if self.llm_name == "gemma": + response = self.clean_response(response) + + return response + + def save(self, page: dict) -> None: + """ + Save an HTML entry to a more compact JSON format. + + We use the metadata to create a file path for the JSON data. The + file itself is called `{content["idx"]}.json` and it is saved at + `self.outdir` under the `{content["cat"]}` directory if the + entry has a category. Otherwise, it is saved in `self.outdir`. + + Parameters + ---------- + page : dict + Dictionary containing the contents and metadata of the + entry. + """ + + cat, idx = page.get("cat"), page.get("idx") + + root = os.path.join(self.outdir, "data") + where = root if cat is None else os.path.join(root, cat) + os.makedirs(where, exist_ok=True) + + with open(os.path.join(where, f"{idx}.json"), "w") as f: + json.dump(page, f, indent=4) + + @abc.abstractmethod + def render(self, transcript: dict) -> str: + """ + Replace with a method to render an entry in Markdown. + + Parameters + ---------- + transcript : dict + Dictionary containing the metadata and contents of the web + page to be rendered. This dictionary also includes the LLM + response(s) for that page. + + Returns + ------- + rendering : str + A rendering of the page and its metadata in Markdown format. + """ + + def make_header(self, urls: list[str] = None) -> str: + """ + Make the header for a summary report. + + Parameters + ---------- + urls : list[str], optional + List of URLs to report in summary. If not specified, which + is the expected user behaviour, the URLs used by the reader + will be used. + + Returns + ------- + header : str + Markdown string with details of the reporting date, period + covered, and source of materials. + """ + + form = "%a, %d %b %Y" + today = dt.date.today().strftime(form) + + dates = self.dates + if len(dates) == 1: + period = dates[-1].strftime(form) + else: + start = min(dates).strftime(form) + end = max(dates).strftime(form) + period = f"{start} to {end}" + + urls = urls or self.urls + source = f"Based on information from {self._source}:\n" + links = [] + for url in urls: + parsed = urlparse(url) + link = url.replace(f"{parsed.scheme}://", "", 1) + links.append(f"- [{link}]({url})") + + header = "\n".join( + ( + f"Publication date: {today}", + f"Period covered: {period}", + f"Search terms: {self.terms}", + "\n".join((source, *links)), + ) + ) + + return header diff --git a/src/parliai_public/readers/theyworkforyou.py b/src/parliai_public/readers/theyworkforyou.py new file mode 100644 index 0000000..547469a --- /dev/null +++ b/src/parliai_public/readers/theyworkforyou.py @@ -0,0 +1,637 @@ +"""Tools to summarise ONS activity in Parliament via TheyWorkForYou.""" + +import datetime as dt +import re +import warnings +from typing import Iterable + +from bs4 import BeautifulSoup +from bs4.element import NavigableString, Tag +from langchain_community.chat_models import ChatOllama + +from .base import BaseReader + + +class Debates(BaseReader): + """ + Class to summarise ONS activity in parliamentary debate. + + All of the content from which we extract relevant activity comes + from the [TheyWorkForYou](https://theyworkforyou.com) organisation's + website. + + Parameters + ---------- + urls : list[str] + List of URLs from which to gather content. These must be + top-level TheyWorkForYou links for bulletins such as + `https://theyworkforyou.com/debates`. + terms : Iterable[str], optional + Key terms to filter content on. By default, we look for any + mention of `Office for National Statistics` or `ONS`. + dates : list[dt.date], optional + List of dates from which to pull entries. The `parliai.dates` + module may be of help. If not specified, only yesterday is used. + outdir : str, default="out" + Location of a directory in which to write outputs. + prompt : str, optional + System prompt provided to the LLM. If not specified, this is + read from the default configuration file. + llm_name : str, optional + Full name of the LLM (or version) to be accessed. Must be one + available to `langchain_google_vertexai.ChatVertexAI`. If not + specified, the reader uses `gemini-1.0-pro-001`. + """ + + _default_config = "debates.toml" + _speech_prefix = "debate-speech__" + _source = ( + "transcripts taken from " + "[TheyWorkForYou](https://www.theyworkforyou.com/)" + ) + + def _list_latest_pages(self) -> list[str]: + """ + List all URLs associated with the days required. + + Returns + ------- + pages : list[str] + List of parliamentary URLs in time scope. + """ + pages: list[str] = [] + for url in self.urls: + pages.extend(f"{url}/?d={date.isoformat()}" for date in self.dates) + + return pages + + def _remove_multi_link_statements( + self, latest_pages: list[str] + ) -> list[str]: + """Remove all .mh links. + + Note that these linked pages filter to departmental + pages. These individual statements are already listed + in the daily pages. This function mitigates that + potential duplication. + + Parameters + ---------- + latest_pages : list[str] + List of all current URLs, including .mh pages. + + Returns + ------- + latest_pages : list[str] + Updated list of URLs. + """ + suffix = ".mh" + latest_pages = [ + page for page in latest_pages if not page.endswith(suffix) + ] + return latest_pages + + def retrieve_latest_entries(self) -> list[str]: + """ + Pull down all the individual parliamentary entry pages. + + Returns + ------- + entries : list[str] + List of individual parliamentary entry URLs. + """ + + latest_pages = self._list_latest_pages() + + entries = [] + for url in latest_pages: + soup = self.get(url, check=False) + if soup is not None: + links = soup.find_all( + "a", attrs={"class": "business-list__title"} + ) + for link in links: + entries.append( + f"https://theyworkforyou.com{link.get('href')}" + ) + + # remove .mh multi-statement references + entries = self._remove_multi_link_statements(entries) + + return entries + + def _read_metadata(self, url: str, soup: BeautifulSoup) -> dict: + """ + Extract the title, date, and storage metadata for a debate. + + In particular, we extract the following as strings: + + - `cat`: category of parliamentary debate. One of `lords`, + `debates`, `whall`, `wms`, `wrans`. URL. + - `idx`: index of the debate entry. URL. + - `title`: plain-text title of the debate. Soup. + - `date`: date of the debate in `YYYY-MM-DD` format. URL. + + Parameters + ---------- + url : str + URL of the entry. + soup : bs4.BeautifulSoup + HTML soup of the entry. + + Returns + ------- + metadata : dict + Dictionary containing the debate metadata. + """ + + *_, cat, idx = url.replace("?id=", "").split("/") + + block = soup.find("title").get_text() + title = re.search(r"^.*(?=:\s*\d{1,2} \w{3} \d{4})", block).group() + date = re.search(r"(?<=(\=))\d{4}-\d{2}-\d{2}(?=[\w\.])", url).group() + + metadata = dict(cat=cat, idx=idx, title=title, date=date, url=url) + + return metadata + + def _read_contents(self, soup: BeautifulSoup) -> dict: + """ + Extract the text from HTML soup in a compact format. + + We convert the transcript into blocks like so: + + ``` + { + "speeches": [ + { + "name": "Sir Henry Wilde", + "position": "Permanent Under-Secretary for Health", + "text": "The ONS provided daily, robust statistics to + support leaders and health services to plan + during the pandemic." + }, + { + "name": "Lord Jackson of Richmond", + "position": "Lord Speaker for Education", + "text": "The Office for National Statistics would welcome + a more transparent sharing of statistics and data + about our children's attainment nationally." + } + ] + } + ``` + + Parameters + ---------- + soup : bs4.BeautifulSoup + HTML soup of a webpage. + + Returns + ------- + text : dict + Dictionary with a single entry (`text`) containing a + transcript of the debate in plain-text format. + """ + + raw_speeches = soup.find_all( + "div", attrs={"class": f"{self._speech_prefix}speaker-and-content"} + ) + + speeches = map(self._process_speech, raw_speeches) + + return {"speeches": list(speeches)} + + def _process_speech(self, speech: BeautifulSoup) -> dict: + """ + Process a speech block by extracting its details and contents. + + This function returns a compact dictionary form of the speech + and its details. If the speech cannot be attributed to someone, + the dictionary will be `None` for the speaker details. + + Parameters + ---------- + speech : bs4.BeautifulSoup + HTML soup of the speech block. + + Returns + ------- + processed : dict + Dictionary containing the speech components: speaker name, + speaker position, speaker URL, and the text of the speech. + """ + + name, position, url = self._extract_speaker_details(speech) + text = self._extract_speech_text(speech) + + return {"name": name, "position": position, "url": url, "text": text} + + def _extract_speaker_details( + self, speech: BeautifulSoup + ) -> tuple[None | str, None | str, None | str]: + """ + Get the name, position, and URL of the speaker. + + Parameters + ---------- + speech : bs4.BeautifulSoup + HTML soup of the speech block. + + Returns + ------- + name : None | str + Speaker name if the speech can be attributed. + position : None | str + Position of the attributed speaker as it appears on TWFY. + url : None | str + URL on TWFY of the attributed speaker. + """ + + prefix = self._speech_prefix + speaker = speech.find("h2", attrs={"class": f"{prefix}speaker"}) + + name, position, url = None, None, None + if isinstance(speaker, Tag): + name_block = speaker.find( + "strong", attrs={"class": f"{prefix}speaker__name"} + ) + position_block = speaker.find( + "small", attrs={"class": f"{prefix}speaker__position"} + ) + name, position = map( + self._get_detail_text, (name_block, position_block) + ) + + href_block = speaker.find( + lambda tag: tag.name == "a" and "href" in tag.attrs + ) + url = ( + f"https://theyworkforyou.com{href_block['href']}" + if href_block + else None + ) + + return name, position, url + + @staticmethod + def _get_detail_text(detail: None | Tag | NavigableString) -> None | str: + """ + Try to get the text of a speaker detail. + + The usual behaviour for this function (getting the text of a + detail) should only fail when the detail is actually `None` and + was not found in `_extract_speaker_details()`. In this scenario, + we catch the `AttributeError` and return the detail as it was, + i.e. as `None`. + + Parameters + ---------- + detail : None | bs4.Tag + The detail from which to extract text. If all is well, this + is a `bs4.Tag` instance. If not, it should be `None`. + + Returns + ------- + detail : None | str + Text from the detail or `None`. + """ + + try: + return detail.get_text() + except AttributeError: + pass + + def _extract_speech_text(self, speech: BeautifulSoup) -> str: + """Get the text of a speech back.""" + + text = speech.find( + "div", attrs={"class": f"{self._speech_prefix}content"} + ) + + return text.get_text().strip() + + def analyse(self, page: dict) -> dict: + """ + Analyse all relevant speeches on a page. + + Parameters + ---------- + page : dict + Dictionary format of a debate transcript. + + Returns + ------- + page : dict + Debate transcript with LLM responses attached. + """ + + for speech in page["speeches"]: + if self.check_contains_terms(speech["text"]): + speech = super().analyse(speech) + + return page + + def parliament_label(self, url: str) -> str: + """Label debates with parliament name. + + Parameters + ---------- + url : str + URL of debate content. + + Returns + ------- + parliament_tag : str + Name of parliament/chamber in which debate occurred. + """ + + parli_labels = { + "debates": "House of Commons", + "lords": "House of Lords", + "whall": "Westminster Hall", + "wms": "UK Ministerial statement", + "senedd": "Senedd / Welsh Parliament", + "sp": "Scottish Parliament", + "ni": "Northern Ireland Assembly", + } + + tag = re.search(r"(?<=theyworkforyou.com\/)\w+(?=\/\?id\=)", url) + if tag is None: + return "Unclassified" + + return parli_labels[tag.group()] + + def render(self, transcript: dict) -> str: + """ + Convert an entry's transcript into Markdown for publishing. + + Parameters + ---------- + transcript : dict + Dictionary containing all the details of the entry. + + Returns + ------- + summary : str + Stylised summary of the entry in Markdown syntax. + """ + + label = self.parliament_label(transcript["url"]) + + title = f"## {label}: [{transcript['title']}]({transcript['url']})" + processed = [] + for speech in transcript["speeches"]: + if speech["name"] and "response" in speech: + speaker = ( + f"### [{speech['name']}]({speech['url']})" + f" ({speech['position']})" + ) + processed.append("\n\n".join((speaker, speech["response"]))) + + return "\n\n".join((title, *processed)) + + +class WrittenAnswers(Debates): + """ + Class to summarise ONS activity in written answers from Parliament. + + Like its parent class, this reader extracts relevant activity + from TheyWorkForYou. + + Parameters + ---------- + urls : list[str] + List of URLs from which to gather content. Currently, only + `https://theyworkforyou.com/wrans` is supported. + terms : Iterable[str], optional + Key terms to filter content on. By default, we look for any + mention of `Office for National Statistics` or `ONS`. + dates : list[dt.date], optional + List of dates from which to pull entries. The `parliai.dates` + module may be of help. If not specified, only yesterday is used. + outdir : str, default="out" + Location of a directory in which to write outputs. + prompt : str, optional + System prompt provided to the LLM. If not specified, this is + read from the default configuration file. + llm_name : str, optional + Full name of the LLM (or version) to be accessed. Must be one + available in Ollama and previously downloaded locally. + llm : ChatOllama, optional + Chat model wrapper. + + Raises + ------ + ValueError + If `urls` contains an unsupported URL. + """ + + _default_config = "wrans.toml" + _supported_urls = ["https://www.theyworkforyou.com/wrans"] + + def __init__( + self, + urls: list[str], + terms: None | Iterable[str] = None, + dates: None | list[dt.date] = None, + outdir: str = "out", + prompt: None | str = None, + llm_name: None | str = None, + llm: None | ChatOllama = None, + ) -> None: + if not isinstance(urls, list) or not set(urls).issubset( + self._supported_urls + ): + supported = ", ".join(self._supported_urls) + warnings.warn( + "URLs must be a list of supported endpoints.\n" + f"Currently, the only acceptable URLs are: {supported}", + UserWarning, + ) + + super().__init__( + urls, + terms, + dates, + outdir, + prompt, + llm_name, + llm, + ) + + def _read_metadata(self, url: str, soup: BeautifulSoup) -> dict: + """ + Extract all metadata on a written answer to Parliament. + + These metadata comprise the following: + + - question title + - ID of the entry + - date of question + - intended recipient (e.g. Cabinet Office, DfE, etc.) + - date of answer + + We do not collect the category since they are all written + answers with category `wrans`. + + Parameters + ---------- + url : str + URL of the entry. + soup : bs4.BeautifulSoup + HTML soup of the entry. + + Returns + ------- + metadata : dict + Dictionary containing the entry's metadat listed above. + """ + + metadata = super()._read_metadata(url, soup) + + recipient, on = self._read_metadata_from_lead(soup) + metadata = dict(**metadata, recipient=recipient, answered=on) + + return metadata + + @staticmethod + def _read_metadata_from_lead(soup: BeautifulSoup) -> tuple[str, str]: + """ + Extract the date of answer and recipient from a lead block. + + Parameters + ---------- + soup : bs4.BeautifulSoup + HTML soup of the entry containing the `lead` block. + + Returns + ------- + recipient : str + Name of the intended recipient of the question. + on : str + Date question was answered in YYYY-MM-DD format. + """ + + lead = soup.find("p", attrs={"class": "lead"}).get_text().strip() + + recipient = re.search(r"^.*(?= written question)", lead).group() + + on = re.search(r"(?<=on)\s+\d{1,2} \w+ \d{4}", lead).group().strip() + on = dt.datetime.strptime(on, "%d %B %Y").date().isoformat() + + return recipient, on + + def _read_contents(self, soup: BeautifulSoup) -> dict: + """ + Extract the text of the written answer. + + Parameters + ---------- + soup : bs4.BeautifulSoup + HTML soup of the entry. + + Returns + ------- + text : dict + Dictionary with one entry (`answer`) containing the + plain-text response to the question. + """ + + contents = super()._read_contents(soup) + *questions, answer = contents["speeches"] + + return {"questions": questions, "answer": answer} + + def analyse(self, page: dict) -> dict: + """ + Analyse the answer to a written question and answer entry. + + If the answer does not contain any search terms, there is no + need to invoke the LLM. + + Parameters + ---------- + page : dict + Dictionary format of a written answer transcript. + + Returns + ------- + page : dict + Debate transcript with LLM responses attached. + """ + + if self.check_contains_terms(page["answer"]["text"]): + page["answer"] = super(Debates, self).analyse(page["answer"]) + + return page + + def render(self, transcript: dict) -> str: + """ + Convert an entry's transcript into Markdown for publishing. + + Parameters + ---------- + transcript : dict + Dictionary containing all the details of an entry. + + Returns + ------- + summary : str + Stylised summary of the entry in Markdown syntax. + """ + + title = f"## [{transcript['title']}]({transcript['url']})" + + questions = [] + for question in transcript["questions"]: + question_title = ( + "### Asked by " + f"[{question['name']}]({question['url']}) " + f"({question['position']})" + ) + question_text = question["text"].strip() + questions.append("\n\n".join((question_title, question_text))) + + addressed = f"Addressed to: {transcript['recipient']}." + asked = f"Asked on: {transcript['date']}." + answered = f"Answered on: {transcript['answered']}." + metadata = " ".join((addressed, asked, answered)) + + answer = self._render_answer(transcript["answer"]) + + summary = "\n\n".join((title, *questions, metadata, answer)) + + return summary + + @staticmethod + def _render_answer(answer: dict) -> str: + """ + Process a plain-text answer into something for a summary. + + If the answer mentions any search terms, we send it to the LLM + for extraction. Otherwise, we say it makes no mention. + + Parameters + ---------- + answer : dict + Dictionary format for an answer. + + Returns + ------- + processed : str + A stylised answer block for adding to a Markdown summary. + """ + + title = ( + f"### Answered by [{answer['name']}]({answer['url']})" + f" ({answer['position']})" + ) + + response = answer.get( + "response", "Answer does not mention any search terms." + ) + + processed = "\n\n".join((title, response)) + + return processed