diff --git a/.github/workflows/nix.yml b/.github/workflows/nix.yml index d447ed4..22b9625 100644 --- a/.github/workflows/nix.yml +++ b/.github/workflows/nix.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: cachix/install-nix-action@v29 + - uses: cachix/install-nix-action@v30 with: nix_path: nixpkgs=channel:nixos-unstable - uses: DeterminateSystems/magic-nix-cache-action@v8 diff --git a/README.md b/README.md index b5953d7..74b82d1 100644 --- a/README.md +++ b/README.md @@ -189,7 +189,7 @@ pdh inc ls -e -o raw `PDH` support custom scripting applied to your incidents list. These `rules` are in fact any type of executable you can run on your machine. ```bash -pdh inc apply INCID001 -s /path/to/my/script.py -s /path/to/binary +pdh inc ls -e --rules-path ./rules/ --rules ``` The `apply` subcommand will call the listed executable/script passing along a json to stdin with the incident information. The called script can apply any type of checks/sideffects and output another json to stout to answer the call. @@ -202,21 +202,30 @@ An example rule can be written in python with the following lines ```python #!/usr/bin/env python3 -from pdh import rules, Filter +from pdh.rules import rule -@rules.rule -def main(input): - return {i["id"]: i["summary"] for i in input} +@rule +def main(alerts, pagerduty, Filters, Transformations): + + # From the given input extract only incidents with the word "EC2" in title + filtered = Filters.apply(alerts, filters=[ + Filters.not_regexp("service.summary", ".*My Service.*"), + Filters.regexp("title", ".*EC2.*") + ]) + + # # auto acknowledge all previously filtered incidents + pagerduty.incidents.ack(filtered) + + return filtered if __name__ == "__main__": - main() + main() # type: ignore ``` -This is the simplest rule you can write, reading the input and simply output a new dictionary with the entries. It will output something like: ```bash - pdh inc apply Q1LNI5LNM7RZ2C Q1C5KG41H0SZAM -s ./a.py + pdh inc ls -e --rules-path ./rules/ --rules ┏━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ script ┃ Q1LNI5LNM7RZ2C ┃ Q1C5KG41H0SZAM ┃ ┡━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ @@ -228,51 +237,7 @@ The default output is `table` with one line for each script run and with one col ### Rules: more examples -```python -#!/usr/bin/env python3 - -# Needed imports -from pdh import rules, Filter - -# This annotation make the main() method parse stdin as json into the parameter called input -# All returned values are converted to json and printed to stdout -@rules.rule -def main(input): - - # Initialize PagerDuty's APIs - api = rules.api() - - # From the given input extract only incidents with the word cassandra in title - incs = Filter.objects(input, filters=[Filter.regexp("title", ".*EC2.*")]) - - # ackwnoledge all previously filtered incidents - api.ack(incs) - - # resolve all previously filtered incidents - api.resolve(incs) - - # snooze all previously filtered incidents for 1h - api.snooze(incs, duration=3600) - - # Chain a given rule, i.e call that rule with the output of this one - # chain-loading supports only a single binary, not directories - c = rules.chain(incs, "rules/test_chaining.sh") - - # Execute an external program and get the output/err/return code - p: rules.ShellResponse = rules.exec('kubectl get nodes -o name') - if p.rc > 0: - nodes = p.stdout.split("\n") - - # if you return a dict will be rendered with each item as a column in a table - # Othrwise will be converted as string - return {i["id"]: i["summary"] for i in incs} - - -if __name__ == "__main__": - main() - - -``` +see [rules](./rules) for more ## Requirements @@ -290,7 +255,6 @@ task setup This will create a python virtualenv and install `pre-commit` and `poetry` in your system if you lack them. - ## License This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. diff --git a/pyproject.toml b/pyproject.toml index 165dc6e..26cfcb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ ignore = [ [tool.poetry] name = "pdh" -version = "0.7.0" +version = "0.8.0" description = "Pagerduty CLI for Humans" authors = ["Manuel Bovo "] license = "GPL-3.0-or-later" diff --git a/rules/examples.py b/rules/example.py similarity index 56% rename from rules/examples.py rename to rules/example.py index 9633c9d..8f56a9b 100755 --- a/rules/examples.py +++ b/rules/example.py @@ -16,42 +16,38 @@ # along with this program. If not, see . # # Needed imports -from pdh import rules, Filters +from pdh.rules import rule +# This annotation make the main() method parsing stdin as a json and returning a json to stdout +# Available arguments are +# - alerts: the input data from pdh command line (a list of incidents in json format) +# - pagerduty: an instance of the PagerDuty class, exposes the PagerDuty APIs +# - Filters: useful filter functions +# - Transformations: useful transformation functions +@rule +def main(alerts, pagerduty, Filters, Transformations): -# This annotation make the main() method parse stdin as json into the parameter called input -# All returned values are converted to json and printed to stdout -@rules.rule -def main(input): + # From the given input extract only incidents with the word "EC2" in title + filtered = Filters.apply(alerts, filters=[ + Filters.regexp("service.summary", ".*Graph.*") + ]) - # Initialize PagerDuty's APIs - api = rules.api() - - # From the given input extract only incidents with the word cassandra in title - incs = Filters.apply(input, filters=[Filters.regexp("title", ".*EC2.*")]) - - # ackwnoledge all previously filtered incidents - api.ack(incs) + # # acknowledge all previously filtered incidents + #pagerduty.incidents.ack(filtered) # # resolve all previously filtered incidents - # api.resolve(incs) + # pagerduty.incidents.resolve(filtered) # # snooze all previously filtered incidents for 1h - # api.snooze(incs, duration=3600) - - # # Chain a given rule, i.e call that rule with the output of this one - # # chain-loading supports only a single binary, not directories - # c = rules.chain(incs, "rules/test_chaining.sh") + # pagerduty.incidents.snooze(filtered, duration=3600) # # Execute an external program and get the output/err/return code # p: rules.ShellResponse = rules.exec('kubectl get nodes -o name') # if p.rc > 0: # nodes = p.stdout.split("\n") - # if you return a list of dicts, it will be rendered with each item as a row in a table - # Othrwise will be converted as string - return incs + return filtered if __name__ == "__main__": - main() + main() # type: ignore diff --git a/src/pdh/__init__.py b/src/pdh/__init__.py index cd49646..182ab92 100644 --- a/src/pdh/__init__.py +++ b/src/pdh/__init__.py @@ -16,10 +16,9 @@ # from . import transformations as _Transformations from .filters import Filter as _Filters -from .pd import Users as _Users, Incidents as _Incidents +from .pd import PagerDuty as _PagerDuty # Exposing Internal things Transformations = _Transformations Filters = _Filters -Users = _Users -Incidents = _Incidents +PagerDuty = _PagerDuty diff --git a/src/pdh/core.py b/src/pdh/core.py index 74cef99..41a403d 100644 --- a/src/pdh/core.py +++ b/src/pdh/core.py @@ -87,7 +87,7 @@ def list_teams(cfg: Config, mine: bool = True, output='table', fields=None) -> b try: pd = PagerDuty(cfg) if mine: - teams = dict(pd.me())['teams'] + teams = dict(pd.me)['teams'] if 'teams' in pd.me else [] else: teams = pd.teams.list() diff --git a/src/pdh/filters.py b/src/pdh/filters.py index 6221465..5baa035 100644 --- a/src/pdh/filters.py +++ b/src/pdh/filters.py @@ -17,10 +17,11 @@ import re from typing import Callable, Iterator, List, Dict, Any +import jsonpath_ng + class Filter(object): """ - Filter is a collection of methods to functions to filter out items from an iterator, - useful when used in conjunction with filter() + Filter is a collection of methods to filter out items from an iterator based on a set of conditions. """ @staticmethod @@ -36,7 +37,9 @@ def le(field: str, value: int) -> Callable[[dict], bool]: Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field is less than or equal to the given value, otherwise False. """ def f(item: dict) -> bool: - if item[field] <= value: + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + if len(values) > 0 and values[0] <= value: return True return False @@ -55,7 +58,9 @@ def ge(field: str, value: int)-> Callable[[dict], bool]: Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field is greater than or equal to the given value, otherwise False. """ def f(item: dict) -> bool: - if item[field] >= value: + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + if len(values)>0 and values[0] >= value: return True return False @@ -74,7 +79,10 @@ def lt(field: str, value: int) -> Callable[[dict], bool]: Callable[[dict], bool]: A function that takes a dictionary and returns True if the specified field's value is less than the given value, otherwise False. """ def f(item: dict) -> bool: - if item[field] < value: + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else value + if val < value: return True return False @@ -93,7 +101,10 @@ def gt(field: str, value: int) -> Callable[[dict], bool]: Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field is greater than the given value, otherwise False. """ def f(item: dict) -> bool: - if item[field] > value: + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else value + if val > value: return True return False @@ -114,7 +125,10 @@ def inList(field: str, listOfValues: List[str]) -> Callable[[dict], bool]: value of the specified field is in the list of values, otherwise False. """ def f(item: dict) -> bool: - if item[field] in listOfValues: + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else "" + if val in listOfValues: return True return False @@ -133,7 +147,10 @@ def inStr(field: str, value: str) -> Callable[[dict], bool]: Callable[[dict], bool]: A function that takes a dictionary as input and returns True if the value is found in the specified field, otherwise False. """ def f(item: dict) -> bool: - if value.lower() in item[field].lower(): + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else "" + if value.lower() in val.lower(): return True return False @@ -152,7 +169,10 @@ def ieq(field: str, value: str) -> Callable[[dict], bool]: Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field matches the given value (case-insensitive), otherwise False. """ def f(item: dict) -> bool: - if item[field].lower() == value.lower(): + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else "" + if val.lower() == value.lower(): return True return False @@ -172,7 +192,10 @@ def eq(field: str, value: Any) -> Callable[[dict], bool]: """ def f(item: dict) -> bool: - if item[field] == value: + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else None + if val == value: return True return False @@ -195,7 +218,10 @@ def regexp(field: str, regexp) -> Callable[[dict], bool]: regexp = re.compile(regexp) def f(item: dict) -> bool: - if regexp.search(item[field]): + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else "" + if regexp.search(val): return True return False @@ -215,7 +241,10 @@ def not_regexp(field: str, regexp) -> Callable[[dict], bool]: regexp = re.compile(regexp) def f(item: dict) -> bool: - if regexp.search(item[field]): + expr = jsonpath_ng.parse(field) + values = [match.value for match in expr.find(item)] + val = values[0] if len(values) > 0 else "" + if regexp.search(val): return False return True diff --git a/src/pdh/main.py b/src/pdh/main.py index 12b9199..136abf4 100644 --- a/src/pdh/main.py +++ b/src/pdh/main.py @@ -120,41 +120,6 @@ def snooze(ctx, incidentids, duration): def reassign(ctx, incident, user): PDH.reassign(ctx.obj, incident, user) - -@inc.command(help="Apply scripts with sideeffects to given incident") -@click.pass_context -@click.option("-p", "--path", required=False, default=None, help="Subdirectory with scripts to run") -@click.option("-s", "--script", required=False, default=None, multiple=True, help="Single script to run") -@click.argument("incident", nargs=-1) -@click.option( "-o", "--output", "output", help="output format", required=False, type=click.Choice(VALID_OUTPUTS), default="table") -def apply(ctx, incident, path, output, script): - pd = PagerDuty(ctx.obj) - incs = pd.incidents.list() - if incident: - incs = Filters.apply(incs, [Filters.inList("id", incident)]) - - # load the given parameters - scripts = script - # or cycle on every executable found in the given path - if path is not None: - scripts = [] - for root, _, filenames in os.walk(os.path.expanduser(os.path.expandvars(path))): - scripts = [os.path.join(root, fname) for fname in filenames if os.access(os.path.join(root, fname), os.X_OK)] - - ret = pd.incidents.apply(incs, scripts) - for rule in ret: - print("[green]Applied rule:[/green]", rule["script"]) - if "error" in rule: - print("[red]Error:[/red]", rule["error"]) - else: - if type(rule["output"]) is not str: - print_items(rule["output"], output) - else: - print(rule["output"]) - - pass - - @inc.command(help="List incidents", name="ls") @click.pass_context @click.option("-e", "--everything", help="List all incidents not only assigned to me", is_flag=True, default=False) @@ -167,7 +132,7 @@ def apply(ctx, incident, path, output, script): @click.option("-l", "--low", is_flag=True, default=False, help="List only LOW priority incidents") @click.option("-w", "--watch", is_flag=True, default=False, help="Continuously print the list") @click.option("-t", "--timeout", default=5, help="Watch every x seconds (work only if -w is flagged)") -@click.option("--apply", is_flag=True, default=False, help="apply rules from a path (see --rules--path") +@click.option("--rules", is_flag=True, default=False, help="apply rules from a path (see --rules--path") @click.option("--rules-path", required=False, default="~/.config/pdh_rules", help="Apply all executable find in this path") @click.option("-R", "--regexp", default="", help="regexp to filter incidents") @click.option("-o","--output","output",help="output format",required=False,type=click.Choice(VALID_OUTPUTS),default="table") @@ -179,7 +144,7 @@ def apply(ctx, incident, path, output, script): @click.option("--sort", "sort_by", required=False, help="Sort by field name", default=None) @click.option("--reverse", "reverse_sort", required=False, help="Reverse the sort", is_flag=True, default=False) @click.option("-T", "--teams", "teams", required=False, help="Filter only incidents assigned to this team IDs", default=None) -def inc_list(ctx, everything, user, new, ack, output, snooze, resolve, high, low, watch, timeout, regexp, apply, rules_path, fields, alerts, alert_fields, service_re, excluded_service_re, sort_by, reverse_sort, teams): +def inc_list(ctx, everything, user, new, ack, output, snooze, resolve, high, low, watch, timeout, regexp, rules, rules_path, fields, alerts, alert_fields, service_re, excluded_service_re, sort_by, reverse_sort, teams): pd = PagerDuty(ctx.obj) @@ -222,15 +187,40 @@ def inc_list(ctx, everything, user, new, ack, output, snooze, resolve, high, low if type(teams) is str: if teams == "mine": - teams = [ t["id"] for t in dict(pd.me())["teams"] ] + teamNames = dict(pd.me)["teams"] if "teams" in dict(pd.me) else [] + teams = [ t["id"] for t in teamNames if "id" in t ] else: teams = teams.lower().strip().split(",") if not everything and not userid: userid = pd.cfg["uid"] while True: + incs = pd.incidents.list(userid, statuses=status, urgencies=urgencies, teams=teams) + if rules: + scripts = [] + ppath = os.path.expanduser(os.path.expandvars(rules_path)) + for root, _, filenames in os.walk(ppath): + for filename in filenames: + fullpath = os.path.join(root, filename) + if os.access(fullpath, os.X_OK): + scripts.append(fullpath) + + if len(scripts) == 0: + print(f"[yellow]No rules found in {ppath}[/yellow]") + + def printFunc(name: str): + print("[green]Applied rule:[/green]", name) + def errFunc(error:str): + print("[red]Error:[/red]", error) + + ret = pd.incidents.apply(incs, scripts, printFunc, errFunc) + if type(ret) is not str: + incs = list(ret) + else: + print(ret) + incs = Filters.apply(incs, filters=[Filters.regexp("title", filter_re)]) if service_re: @@ -318,27 +308,6 @@ def plain_print_f(i): if output not in ["yaml", "json"]: for i in ids: print(f"Mark {i} as [green]RESOLVED[/green]") - if apply: - scripts = [] - ppath = os.path.expanduser(os.path.expandvars(rules_path)) - for root, _, filenames in os.walk(ppath): - for filename in filenames: - fullpath = os.path.join(root, filename) - if os.access(fullpath, os.X_OK): - scripts.append(fullpath) - - if len(scripts) == 0: - print(f"[yellow]No rules found in {ppath}[/yellow]") - ret = pd.incidents.apply(incs, scripts) - for rule in ret: - print("[green]Applied rule:[/green]", rule["script"]) - if "error" in rule: - print("[red]Error:[/red]", rule["error"]) - else: - if type(rule["output"]) is not str: - print_items(rule["output"], output) - else: - print(rule["output"]) if not watch: break diff --git a/src/pdh/output.py b/src/pdh/output.py index 5e95dec..85d4298 100644 --- a/src/pdh/output.py +++ b/src/pdh/output.py @@ -73,7 +73,7 @@ def table(self, **kwargs) -> None: t.add_column(k) i = 0 for u in items: - args = [repr(v) if isinstance(v, dict) else v for k, v in u.items() if k not in skip_columns] + args = [repr(v) if not isinstance(v, str) else v for k, v in u.items() if k not in skip_columns] if i % 2: t.add_row(*args, style=odd_color) diff --git a/src/pdh/pd.py b/src/pdh/pd.py index 533d488..2bde716 100644 --- a/src/pdh/pd.py +++ b/src/pdh/pd.py @@ -15,7 +15,7 @@ # along with this program. If not, see . # import subprocess -from typing import Any, Dict, Iterator, List +from typing import Any, Dict, Iterator, List, Callable from rich import print from pdpyras import APISession, PDClientError import json @@ -40,8 +40,26 @@ def __init__(self, *args: object) -> None: def ttl_hash(seconds=30): return round(time.time() / seconds) +class RuleExecutionError(Exception): + pass + +class RuleInvalidOutput(TypeError): + pass + class PagerDuty(object): + # Expose the constants to the outside (rules) + + INCIDENT_STATUS_TRIGGERED = STATUS_TRIGGERED + INCIDENT_STATUS_ACK = STATUS_ACK + INCIDENT_STATUS_RESOLVED = STATUS_RESOLVED + + INCIDENT_URGENCY_HIGH = URGENCY_HIGH + INCIDENT_URGENCY_LOW = URGENCY_LOW + + DEFAULT_STATUSES = DEFAULT_STATUSES + DEFAULT_URGENCIES = DEFAULT_URGENCIES + def __init__(self, cfg: Config) -> None: super().__init__() @@ -53,14 +71,13 @@ def __init__(self, cfg: Config) -> None: self.incidents = Incidents(self.cfg, self.session) self.teams = Teams(self.cfg, self.session) try: - self.__me: List | Dict = self.session.rget("/users/me") + self.abilities: List | Dict = self.session.rget("/abilities") except PDClientError as e: raise UnauthorizedException(str(e)) - - def me(self) -> List[Any] | Dict[Any, Any]: - """Retrieve the user information for the configured API key""" - return self.__me - + try: + self.me: List[Any] | Dict[Any, Any] = self.session.rget("/users/me") + except PDClientError: + self.me = {} class Incidents(object): @@ -139,12 +156,19 @@ def reassign(self, incs, uids: List[str]) -> None: except Exception as e: print(str(e)) - def apply(self, incs, paths: List[str]) -> List: - rets = [] - for script in paths: - output = self.apply_single(incs, script) - rets.append({"script": script} | output) - return rets + def apply(self, incs: List[Any] | Dict[Any, Any] | Iterator[Any], paths: List[str], printFunc, errFunc: Callable) -> List[Any] | Dict[Any, Any] | Iterator[Any]: + try: + output = incs # initial input + for script in paths: + # chain the output of the previous script to the input of the next + output = self.apply_single(output, script) + printFunc(script) + return output + except RuleExecutionError as e: + errFunc(str(e)) + except RuleInvalidOutput as e: + errFunc(str(e)) + return incs def apply_single(self, incs, script: str) -> Dict: process = subprocess.Popen(script, text=True, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stdin=subprocess.PIPE) @@ -154,11 +178,9 @@ def apply_single(self, incs, script: str) -> Dict: if process.returncode == 0: output = json.loads(stdout) if type(output) not in [dict, list, tuple]: - output = {"output": str(output)} - else: - output = {"output": output} + raise RuleInvalidOutput(f"invalid rule output it must be a json object, found: {type(output)}") else: - output = {"error": stderr} + raise RuleExecutionError(f"Error executing rule: {stderr}") return output diff --git a/src/pdh/rules.py b/src/pdh/rules.py index 35c67eb..d3dd7dc 100644 --- a/src/pdh/rules.py +++ b/src/pdh/rules.py @@ -21,7 +21,7 @@ from typing import Union from collections import namedtuple -from pdh import config, Incidents +from pdh import config, PagerDuty, Filters, Transformations ShellResponse = namedtuple("ShellResponse", "stdout stderr rc") @@ -34,7 +34,7 @@ def rule(func): """ Decorate a function transforming it into a Rule. - The decorated function must have at least one parameters: `input` in which all the + The decorated function must have at least one parameters: `alerts` in which all the input data will be placed as dictionary. This will be the raw input data directly from PagerDuty APIs ready to use Each value returned by the decorated function will be used as output. @@ -46,7 +46,10 @@ def rule(func): @functools.wraps(func) def wrapper(*args, **kwargs): input = __load_data_from_stdin() - kwargs["input"] = input + kwargs["alerts"] = input + kwargs["pagerduty"] = client() + kwargs["Filters"] = Filters + kwargs["Transformations"] = Transformations ret = func(*args, **kwargs) print(json.dumps(ret)) return ret @@ -72,7 +75,7 @@ def exec(cmd: Union[str, list]) -> ShellResponse: return ShellResponse(out, err, rc) -def chain(incs: list, path: str, pd: Incidents | None = None): +def chain(incs: list, path: str, pd: PagerDuty | None = None): """ Chain loading another rule with the given list of incidents Parameters: @@ -84,9 +87,9 @@ def chain(incs: list, path: str, pd: Incidents | None = None): """ if pd is None: - pd = api() + pd = client() - ret = pd.apply_single(incs, path) + ret = pd.incidents.apply_single(incs, path) if "output" in ret: return ret["output"] if "stderr" in ret: @@ -94,7 +97,7 @@ def chain(incs: list, path: str, pd: Incidents | None = None): return None -def api(config_file: str = "~/.config/pdh.yaml") -> Incidents: +def client(config_file: str = "~/.config/pdh.yaml") -> PagerDuty: """ Initialize the Pagerduty APIs in a more easy way @@ -103,4 +106,4 @@ def api(config_file: str = "~/.config/pdh.yaml") -> Incidents: Returns: Incidents (object): the api object capable of doing things """ - return Incidents(config.load_and_validate(config_file)) + return PagerDuty(config.load_and_validate(config_file)) diff --git a/src/pdh/transformations.py b/src/pdh/transformations.py index 757503c..eb1f80b 100644 --- a/src/pdh/transformations.py +++ b/src/pdh/transformations.py @@ -16,6 +16,7 @@ # from typing import Any, Callable, Dict, Iterator, List from datetime import datetime, timezone +import jsonpath_ng from rich.pretty import pretty_repr from dikdik import Dict as DikDik @@ -84,7 +85,14 @@ def extract_change(path: str, change_map: Dict[str, str] | None = None, default: def f(i: dict) -> Any: try: # recursively return inner fields if they exist in the form of "field.subfield" - ret = DikDik.get_path(i, path) + expr = jsonpath_ng.parse(path) + matches = [match.value for match in expr.find(i)] + if not matches: + if default is not None: + return default + raise KeyError(f"Path '{path}' not found in the dictionary.") + ret = matches[0] + #ret = DikDik.get_path(i, path) if change_map and ret in change_map.keys(): return change_map[ret] return ret @@ -105,7 +113,10 @@ def extract_date(field_name: str, format: str = "%Y-%m-%dT%H:%M:%SZ", tz: timezo - Callable[[Dict], Any]: A function that takes a dictionary and returns a human-readable relative time string. """ def f(i: dict) -> str: - val = DikDik.get_path(i, field_name) + expr = jsonpath_ng.parse(field_name) + val = [match.value for match in expr.find(i)][0] + + #val = DikDik.get_path(i, field_name) duration = datetime.now(tz) - datetime.strptime(val, format) date = {} date["d"], remaining = divmod(duration.total_seconds(), 86_400) @@ -137,7 +148,9 @@ def extract_decorate(field_name: str, color_map: dict | None = None, default_col - Callable: A function that takes a dictionary and returns the transformed field value as a string. """ def f(i: dict) -> str: - item = DikDik.get_path(i, field_name) + expr = jsonpath_ng.parse(field_name) + item = [match.value for match in expr.find(i)][0] + #item = DikDik.get_path(i, field_name) if not item: return "" @@ -179,12 +192,17 @@ def f(i: dict) -> str: def extract_alerts(field_name, alert_fields: list[str] = ["id", "summary", "created_at", "status"]): def f(i: dict) -> str: - alerts = DikDik.get_path(i, field_name) + expr = jsonpath_ng.parse(field_name) + alerts = [match.value for match in expr.find(i)][0] + # alerts = DikDik.get_path(i, field_name) ret = dict() for alert in alerts: alert_obj = dict() for field in alert_fields: - DikDik.set_path(alert_obj, field, DikDik.get_path(alert, field)) + + subval = alert[field] if field in alert else None + if subval is not None: + DikDik.set_path(alert_obj, field, subval) ret[alert["id"]] = alert_obj return pretty_repr(ret) diff --git a/tests/test_rules.py b/tests/test_rules.py index c4d4d73..53af415 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -37,20 +37,19 @@ def test_load_data_from_stdin(): with patch("sys.stdin", io.StringIO(json.dumps(test_data))): assert __load_data_from_stdin() == test_data -@pytest.fixture -def dummy_rule(): + +def test_rule_decorator(mock_config_load): + @rule - def test_rule(input): - return {"processed": True, "input": input} - return test_rule + def dummy_rule(alerts, pagerduty, Filters, Transformations): + return {"processed": True, "alerts": alerts} -def test_rule_decorator(dummy_rule): test_input = {"key": "value"} with patch("pdh.rules.__load_data_from_stdin", return_value=test_input): with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout: - result = dummy_rule() - assert json.loads(mock_stdout.getvalue()) == {"processed": True, "input": test_input} - assert result == {"processed": True, "input": test_input} + result = dummy_rule() # typing: ignore + assert json.loads(mock_stdout.getvalue()) == {"processed": True, "alerts": test_input} + assert result == {"processed": True, "alerts": test_input} # Mock classes and functions @@ -75,14 +74,9 @@ def mock_api(): @pytest.fixture def mock_config_load(): - with patch("pdh.rules.config.load_and_validate", return_value={"some": "config"}) as mock: + with patch("pdh.rules.config.load_and_validate", return_value={"apikey": "y_NbAkKc66ryYTWUXYEu", "email": "user@pagerduty.com", "uid": "PXCT22H"}) as mock: yield mock -def test_chain_with_provided_pd(mock_incidents): - incs = ["incident1", "incident2"] - path = "path_with_output" - result = chain(incs, path, pd=mock_incidents) - assert result == ["success"] @pytest.mark.skip("Not working without a valid config") def test_chain_without_provided_pd(mock_api): @@ -90,9 +84,3 @@ def test_chain_without_provided_pd(mock_api): path = "path_with_output" result = chain(incs, path) assert result == ["success"] - -def test_chain_with_stderr(mock_incidents): - incs = ["incident1", "incident2"] - path = "path_with_stderr" - result = chain(incs, path, pd=mock_incidents) - assert result == ["error"] diff --git a/tests/test_transformations.py b/tests/test_transformations.py index 230df83..4d0f81c 100644 --- a/tests/test_transformations.py +++ b/tests/test_transformations.py @@ -100,7 +100,7 @@ def test_extract_field_change() -> None: def test_extract_path(): - t: dict = {"newfield": Transformations.extract("dictfield.inside", "-x-")} + t: dict = {"newfield": Transformations.extract("$.dictfield.inside", "-x-")} result = Transformations.apply(ilist, t) assert len(result) == 3 assert result[0]["newfield"] == "-x-"