Skip to content

Commit

Permalink
feat/refactoring (#228)
Browse files Browse the repository at this point in the history
* feat!(rules) : BREAKING CHANGE rules name change

* Remove dikdik.get_path in favor of jsonpath

* Remove dikdik.get_path in favor of jsonpath (phase2)

* Fixing typing

* Fixing example

* fixing test to reflect code changes

* magic mocking

* Fixing test and pd initialization

* example reflecting current cli

* Bump to 0.8.0

* #207 bump nix-action
  • Loading branch information
mbovo authored Jan 3, 2025
1 parent ecbc611 commit 52844e2
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 207 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/nix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: cachix/install-nix-action@v29
- uses: cachix/install-nix-action@v30
with:
nix_path: nixpkgs=channel:nixos-unstable
- uses: DeterminateSystems/magic-nix-cache-action@v8
Expand Down
72 changes: 18 additions & 54 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pdh inc ls -e -o raw
`PDH` support custom scripting applied to your incidents list. These `rules` are in fact any type of executable you can run on your machine.

```bash
pdh inc apply INCID001 -s /path/to/my/script.py -s /path/to/binary
pdh inc ls -e --rules-path ./rules/ --rules
```

The `apply` subcommand will call the listed executable/script passing along a json to stdin with the incident information. The called script can apply any type of checks/sideffects and output another json to stout to answer the call.
Expand All @@ -202,21 +202,30 @@ An example rule can be written in python with the following lines

```python
#!/usr/bin/env python3
from pdh import rules, Filter
from pdh.rules import rule

@rules.rule
def main(input):
return {i["id"]: i["summary"] for i in input}
@rule
def main(alerts, pagerduty, Filters, Transformations):

# From the given input extract only incidents with the word "EC2" in title
filtered = Filters.apply(alerts, filters=[
Filters.not_regexp("service.summary", ".*My Service.*"),
Filters.regexp("title", ".*EC2.*")
])

# # auto acknowledge all previously filtered incidents
pagerduty.incidents.ack(filtered)

return filtered

if __name__ == "__main__":
main()
main() # type: ignore
```

This is the simplest rule you can write, reading the input and simply output a new dictionary with the entries. It will output something like:

```bash

pdh inc apply Q1LNI5LNM7RZ2C Q1C5KG41H0SZAM -s ./a.py
pdh inc ls -e --rules-path ./rules/ --rules
┏━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ script ┃ Q1LNI5LNM7RZ2C ┃ Q1C5KG41H0SZAM ┃
┡━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
Expand All @@ -228,51 +237,7 @@ The default output is `table` with one line for each script run and with one col

### Rules: more examples

```python
#!/usr/bin/env python3

# Needed imports
from pdh import rules, Filter

# This annotation make the main() method parse stdin as json into the parameter called input
# All returned values are converted to json and printed to stdout
@rules.rule
def main(input):

# Initialize PagerDuty's APIs
api = rules.api()

# From the given input extract only incidents with the word cassandra in title
incs = Filter.objects(input, filters=[Filter.regexp("title", ".*EC2.*")])

# ackwnoledge all previously filtered incidents
api.ack(incs)

# resolve all previously filtered incidents
api.resolve(incs)

# snooze all previously filtered incidents for 1h
api.snooze(incs, duration=3600)

# Chain a given rule, i.e call that rule with the output of this one
# chain-loading supports only a single binary, not directories
c = rules.chain(incs, "rules/test_chaining.sh")

# Execute an external program and get the output/err/return code
p: rules.ShellResponse = rules.exec('kubectl get nodes -o name')
if p.rc > 0:
nodes = p.stdout.split("\n")

# if you return a dict will be rendered with each item as a column in a table
# Othrwise will be converted as string
return {i["id"]: i["summary"] for i in incs}


if __name__ == "__main__":
main()


```
see [rules](./rules) for more

## Requirements

Expand All @@ -290,7 +255,6 @@ task setup

This will create a python virtualenv and install `pre-commit` and `poetry` in your system if you lack them.


## License

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ignore = [

[tool.poetry]
name = "pdh"
version = "0.7.0"
version = "0.8.0"
description = "Pagerduty CLI for Humans"
authors = ["Manuel Bovo <[email protected]>"]
license = "GPL-3.0-or-later"
Expand Down
42 changes: 19 additions & 23 deletions rules/examples.py → rules/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,38 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Needed imports
from pdh import rules, Filters
from pdh.rules import rule

# This annotation make the main() method parsing stdin as a json and returning a json to stdout
# Available arguments are
# - alerts: the input data from pdh command line (a list of incidents in json format)
# - pagerduty: an instance of the PagerDuty class, exposes the PagerDuty APIs
# - Filters: useful filter functions
# - Transformations: useful transformation functions
@rule
def main(alerts, pagerduty, Filters, Transformations):

# This annotation make the main() method parse stdin as json into the parameter called input
# All returned values are converted to json and printed to stdout
@rules.rule
def main(input):
# From the given input extract only incidents with the word "EC2" in title
filtered = Filters.apply(alerts, filters=[
Filters.regexp("service.summary", ".*Graph.*")
])

# Initialize PagerDuty's APIs
api = rules.api()

# From the given input extract only incidents with the word cassandra in title
incs = Filters.apply(input, filters=[Filters.regexp("title", ".*EC2.*")])

# ackwnoledge all previously filtered incidents
api.ack(incs)
# # acknowledge all previously filtered incidents
#pagerduty.incidents.ack(filtered)

# # resolve all previously filtered incidents
# api.resolve(incs)
# pagerduty.incidents.resolve(filtered)

# # snooze all previously filtered incidents for 1h
# api.snooze(incs, duration=3600)

# # Chain a given rule, i.e call that rule with the output of this one
# # chain-loading supports only a single binary, not directories
# c = rules.chain(incs, "rules/test_chaining.sh")
# pagerduty.incidents.snooze(filtered, duration=3600)

# # Execute an external program and get the output/err/return code
# p: rules.ShellResponse = rules.exec('kubectl get nodes -o name')
# if p.rc > 0:
# nodes = p.stdout.split("\n")

# if you return a list of dicts, it will be rendered with each item as a row in a table
# Othrwise will be converted as string
return incs
return filtered


if __name__ == "__main__":
main()
main() # type: ignore
5 changes: 2 additions & 3 deletions src/pdh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
#
from . import transformations as _Transformations
from .filters import Filter as _Filters
from .pd import Users as _Users, Incidents as _Incidents
from .pd import PagerDuty as _PagerDuty

# Exposing Internal things
Transformations = _Transformations
Filters = _Filters
Users = _Users
Incidents = _Incidents
PagerDuty = _PagerDuty
2 changes: 1 addition & 1 deletion src/pdh/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def list_teams(cfg: Config, mine: bool = True, output='table', fields=None) -> b
try:
pd = PagerDuty(cfg)
if mine:
teams = dict(pd.me())['teams']
teams = dict(pd.me)['teams'] if 'teams' in pd.me else []
else:
teams = pd.teams.list()

Expand Down
53 changes: 41 additions & 12 deletions src/pdh/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import re
from typing import Callable, Iterator, List, Dict, Any

import jsonpath_ng

class Filter(object):
"""
Filter is a collection of methods to functions to filter out items from an iterator,
useful when used in conjunction with filter()
Filter is a collection of methods to filter out items from an iterator based on a set of conditions.
"""

@staticmethod
Expand All @@ -36,7 +37,9 @@ def le(field: str, value: int) -> Callable[[dict], bool]:
Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field is less than or equal to the given value, otherwise False.
"""
def f(item: dict) -> bool:
if item[field] <= value:
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
if len(values) > 0 and values[0] <= value:
return True
return False

Expand All @@ -55,7 +58,9 @@ def ge(field: str, value: int)-> Callable[[dict], bool]:
Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field is greater than or equal to the given value, otherwise False.
"""
def f(item: dict) -> bool:
if item[field] >= value:
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
if len(values)>0 and values[0] >= value:
return True
return False

Expand All @@ -74,7 +79,10 @@ def lt(field: str, value: int) -> Callable[[dict], bool]:
Callable[[dict], bool]: A function that takes a dictionary and returns True if the specified field's value is less than the given value, otherwise False.
"""
def f(item: dict) -> bool:
if item[field] < value:
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else value
if val < value:
return True
return False

Expand All @@ -93,7 +101,10 @@ def gt(field: str, value: int) -> Callable[[dict], bool]:
Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field is greater than the given value, otherwise False.
"""
def f(item: dict) -> bool:
if item[field] > value:
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else value
if val > value:
return True
return False

Expand All @@ -114,7 +125,10 @@ def inList(field: str, listOfValues: List[str]) -> Callable[[dict], bool]:
value of the specified field is in the list of values, otherwise False.
"""
def f(item: dict) -> bool:
if item[field] in listOfValues:
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else ""
if val in listOfValues:
return True
return False

Expand All @@ -133,7 +147,10 @@ def inStr(field: str, value: str) -> Callable[[dict], bool]:
Callable[[dict], bool]: A function that takes a dictionary as input and returns True if the value is found in the specified field, otherwise False.
"""
def f(item: dict) -> bool:
if value.lower() in item[field].lower():
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else ""
if value.lower() in val.lower():
return True
return False

Expand All @@ -152,7 +169,10 @@ def ieq(field: str, value: str) -> Callable[[dict], bool]:
Callable[[dict], bool]: A function that takes a dictionary and returns True if the value of the specified field matches the given value (case-insensitive), otherwise False.
"""
def f(item: dict) -> bool:
if item[field].lower() == value.lower():
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else ""
if val.lower() == value.lower():
return True
return False

Expand All @@ -172,7 +192,10 @@ def eq(field: str, value: Any) -> Callable[[dict], bool]:
"""

def f(item: dict) -> bool:
if item[field] == value:
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else None
if val == value:
return True
return False

Expand All @@ -195,7 +218,10 @@ def regexp(field: str, regexp) -> Callable[[dict], bool]:
regexp = re.compile(regexp)

def f(item: dict) -> bool:
if regexp.search(item[field]):
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else ""
if regexp.search(val):
return True
return False

Expand All @@ -215,7 +241,10 @@ def not_regexp(field: str, regexp) -> Callable[[dict], bool]:
regexp = re.compile(regexp)

def f(item: dict) -> bool:
if regexp.search(item[field]):
expr = jsonpath_ng.parse(field)
values = [match.value for match in expr.find(item)]
val = values[0] if len(values) > 0 else ""
if regexp.search(val):
return False
return True

Expand Down
Loading

0 comments on commit 52844e2

Please sign in to comment.