diff --git a/CHANGELOG.md b/CHANGELOG.md index ef536e2..4d29056 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,34 @@ # Changelog +## 1.5.9 (2024-12-=30) + +* [New feature] Added support for the discovery-only scan level for attributes. + +## 1.5.8 (2024-12-27) + +* [New feature] Added support for the discovery-only scan level for seeds. + +## 1.5.7 (2024-12-24) + +* [New feature] Added support for updating attributes. + +## 1.5.6 (2024-12-23) + +* [New feature] Added operations for adding, updating, deleting, and retrieving seeds. +* [New feature] Added support for provide username and password using environment + variables -- `PRAETORIAN_CLI_USERNAME`, `PRAETORIAN_CLI_PASSWORD`. + +## 1.5.5 (2024-12-13) + +* [New feature] Added version check. The CLI now prompts the user to upgrade if a newer + version is available on PyPI. +* [Bug fix] Pagination when searching for `#seed` is fixed. + +## 1.5.4 (2024-11-22) + +* [New feature] Add the option to specify the capabilities to run when adding scan + jobs for assets. + ## 1.5.3 (2024-11-14) * [Bug] Fixed an error with the `list` and `search` commands. diff --git a/praetorian_cli/handlers/add.py b/praetorian_cli/handlers/add.py index 0f5cd02..c5724ff 100644 --- a/praetorian_cli/handlers/add.py +++ b/praetorian_cli/handlers/add.py @@ -5,7 +5,7 @@ from praetorian_cli.handlers.chariot import chariot from praetorian_cli.handlers.cli_decorators import cli_handler from praetorian_cli.handlers.utils import AssetPriorities, error -from praetorian_cli.sdk.model.globals import AddRisk +from praetorian_cli.sdk.model.globals import AddRisk, Seed, CAPABILITIES @chariot.group() @@ -152,19 +152,22 @@ def risk(sdk, name, asset, status, comment): @add.command() @cli_handler @click.option('-k', '--key', required=True, help='Key of an existing asset or attribute') -def job(sdk, key): +@click.option('-c', '--capability', 'capabilities', multiple=True, type=click.Choice(CAPABILITIES), + help='Capabilities to run (can be specified multiple times)') +def job(sdk, key, capabilities): """ Schedule scan jobs for an asset or an attribute This command schedules the relevant discovery and vulnerability scans for the specified asset or attribute. Make sure to quote the key, since it - contain the "#" sign. + contains the "#" sign. \b Example usages: - praetorian chariot add job --key "#asset#example.com#1.2.3.4" + - praetorian chariot add job --key "#asset#example.com#1.2.3.4" -c subdomain -c portscan - praetorian chariot add job --key "#attribute#ssh#22#asset#api.www.example.com#1.2.3.4" """ - sdk.jobs.add(key) + sdk.jobs.add(key, capabilities) @add.command() @@ -183,3 +186,22 @@ def attribute(sdk, key, name, value): - praetorian chariot add attribute --key "#asset#www.example.com#www.example.com" --name id --value "arn:aws:route53::1654874321:hostedzone/Z0000000EJBHGTFTGH3" """ sdk.attributes.add(key, name, value) + + +@add.command() +@cli_handler +@click.option('-d', '--dns', required=True, help='The DNS of the asset') +@click.option('-s', '--status', type=click.Choice([s.value for s in Seed]), + default=Seed.PENDING.value, help='The status of the seed', show_default=True) +def seed(sdk, dns, status): + """ Add a seed + + Add a seed to the Chariot database. This command requires DNS of the seed to be + specified. When status is not specified, the seed is added as PENDING. + + \b + Example usages: + - praetorian chariot add seed --dns example.com + - praetorian chariot add seed --dns example.com --status A + """ + sdk.seeds.add(dns, status) diff --git a/praetorian_cli/handlers/cli_decorators.py b/praetorian_cli/handlers/cli_decorators.py index eab9cf0..01aa62d 100644 --- a/praetorian_cli/handlers/cli_decorators.py +++ b/praetorian_cli/handlers/cli_decorators.py @@ -1,7 +1,10 @@ import traceback from functools import wraps +from importlib.metadata import version import click +import requests +from packaging.version import Version from praetorian_cli.handlers.chariot import chariot from praetorian_cli.handlers.utils import error @@ -9,7 +12,7 @@ def handle_error(func): @wraps(func) - def handler(*args, **kwargs): + def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: @@ -17,12 +20,34 @@ def handler(*args, **kwargs): if chariot.is_debug: click.echo(traceback.format_exc()) - return handler + return wrapper + + +def upgrade_check(func): + @wraps(func) + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + try: + response = requests.get('https://pypi.org/pypi/praetorian-cli/json') + pypi = sorted([Version(v) for v in list(response.json()['releases'].keys())])[-1] + local = Version(version('praetorian-cli')) + if pypi > local: + click.echo(f'A new version of praetorian-cli is available: {pypi}', err=True) + click.echo(f'You are currently running {local}.', err=True) + click.echo('To upgrade, run "pip install --upgrade praetorian-cli".', err=True) + except: + # Silently fail if we can't check for updates + # This preserves the main functionality even if update checks fail + pass + return result + + return wrapper def cli_handler(func): func = click.pass_obj(func) func = handle_error(func) + func = upgrade_check(func) return func diff --git a/praetorian_cli/handlers/configure.py b/praetorian_cli/handlers/configure.py index 829d3ed..e87fc14 100644 --- a/praetorian_cli/handlers/configure.py +++ b/praetorian_cli/handlers/configure.py @@ -4,10 +4,11 @@ @click.command() -@click.option('--email', help='Email you used to register for Chariot', required=True, - prompt='Enter your email') -@click.option('--password', help='Password you used to register for Chariot', required=True, - prompt='Enter your password', hide_input=True) +@click.option('--email', help='Email you used to register for Chariot', default='', + prompt='Enter your email (Type ENTER if this is set in the PRAETORIAN_CLI_USERNAME environment variable, or if using SSO)') +@click.option('--password', help='Password you used to register for Chariot', default='', + prompt='Enter your password (Type ENTER if this is set in the PRAETORIAN_CLI_PASSWORD environment variable, or if using SSO)', + hide_input=True) @click.option('--profile-name', help='Profile name.', required=True, prompt='Enter the profile name to configure', default=DEFAULT_PROFILE, show_default=True) @click.option('--url', help='URL to the backend API. Default provided.', required=True, diff --git a/praetorian_cli/handlers/delete.py b/praetorian_cli/handlers/delete.py index a2b6dd6..c052591 100644 --- a/praetorian_cli/handlers/delete.py +++ b/praetorian_cli/handlers/delete.py @@ -77,6 +77,23 @@ def webhook(chariot): click.echo('No webhook previously exists.') +@delete.command() +@click.argument('key', required=True) +@cli_handler +def seed(chariot, key): + """ Delete a seed + + \b + Arguments: + - KEY: the key of an existing seed + + \b + Example usage: + - praetorian chariot delete seed "#seed#domain#example.com" + """ + chariot.seeds.delete(key) + + # Special command for deleting your account and all related information. @chariot.command() @cli_handler diff --git a/praetorian_cli/handlers/get.py b/praetorian_cli/handlers/get.py index 82e7ed4..4425be9 100644 --- a/praetorian_cli/handlers/get.py +++ b/praetorian_cli/handlers/get.py @@ -176,3 +176,22 @@ def webhook(chariot): click.echo(chariot.webhook.get_url()) else: click.echo('No existing webhook.') + + +@get.command() +@cli_handler +@click.argument('key', required=True) +@click.option('-d', '--details', is_flag=True, help='Further retrieve the attributes and associated risks of the asset') +def seed(chariot, key, details): + """ Get seed details + + \b + Argument: + - KEY: the key of an existing seed + + \b + Example usages: + - praetorian chariot get seed "#seed#domain#example.com" + - praetorian chariot get seed "#seed#ip#1.1.1.0/24" --details + """ + print_json(chariot.seeds.get(key, details)) diff --git a/praetorian_cli/handlers/list.py b/praetorian_cli/handlers/list.py index c5d0223..484bece 100644 --- a/praetorian_cli/handlers/list.py +++ b/praetorian_cli/handlers/list.py @@ -2,7 +2,7 @@ from praetorian_cli.handlers.chariot import chariot from praetorian_cli.handlers.cli_decorators import list_params -from praetorian_cli.handlers.utils import render_offset, render_list_results, pagination_size +from praetorian_cli.handlers.utils import render_offset, render_list_results, pagination_size, error @chariot.group() @@ -16,7 +16,7 @@ def list(): def assets(chariot, filter, details, offset, page): """ List assets - Retrieve and display a list of assets. + Retrieve and display a list of assets. \b Example usages: @@ -50,8 +50,8 @@ def risks(chariot, filter, details, offset, page): def accounts(chariot, filter, details, offset, page): """ List accounts - Retrieve and display a list of your collaborators, as well as the accounts that - you are authorized to access. + Retrieve and display a list of your collaborators, as well as the accounts that + you are authorized to access. \b Example usages: @@ -68,7 +68,7 @@ def accounts(chariot, filter, details, offset, page): def integrations(chariot, filter, details, offset, page): """ List integrations - Retrieve and display a list of integration connections. + Retrieve and display a list of integration connections. \b Example usages: @@ -157,3 +157,77 @@ def attributes(chariot, filter, key, details, offset, page): - praetorian chariot list attributes --page all """ render_list_results(chariot.attributes.list(filter, key, offset, pagination_size(page)), details) + + +@list.command() +@list_params('DNS') +@click.option('-t', '--type', type=click.Choice(['ip', 'domain']), help=f'Filter by type of the seeds') +def seeds(chariot, type, filter, details, offset, page): + """ List seeds + + Retrieve and display a list of seeds. + + \b + Example usages: + - praetorian chariot list seeds + - praetorian chariot list seeds --type ip + - praetorian chariot list seeds --type domain --filter example.com + - praetorian chariot list seeds --details + - praetorian chariot list seeds --page all + """ + if filter and not type: + error('When the DNS filter is specified, you also need to specify the type of the filter: ip or domain.') + + render_list_results(chariot.seeds.list(type, filter, offset, pagination_size(page)), details) + +@list.command() +@list_params('statistic type and/or filter dates') +@click.option('-f', '--filter', default='', help='Filter by statistic type or name') +@click.option('--from', 'from_date', help='Start date (YYYY-MM-DD)') +@click.option('--to', 'to_date', help='End date (YYYY-MM-DD)') +@click.option('-d', '--details', is_flag=True, default=False, help='Show detailed information') +@click.option('-o', '--offset', default='', help='List results from an offset') +@click.option('-p', '--page', type=click.Choice(('first', 'all')), default='first', + help='Pagination mode. "all" pages up to 1000 pages.', show_default=True) +@click.option('--help-stats', is_flag=True, help='Show detailed information about statistic types') +def stats(chariot, filter, from_date, to_date, details, offset, page, help_stats): + """ List statistics + + Retrieve and display a list of statistics with optional date range filtering. + Use --help-stats for detailed information about available statistic types. + + \b + Example usages: + - praetorian chariot list statistics + - praetorian chariot list statistics --filter "my#status" + - praetorian chariot list statistics --from 2025-01-01 --to 2024-01-31 + - praetorian chariot list statistics --details + - praetorian chariot list statistics --page all + - praetorian chariot list statistics --help-stats + """ + if help_stats: + click.echo(chariot.stats.util.get_statistics_help()) + return + + # Map common filter aliases to StatsFilter values + filter_map = { + 'risks': chariot.stats.util.RISKS, + 'risk_events': chariot.stats.util.RISK_EVENTS, + 'assets_by_status': chariot.stats.util.ASSETS_BY_STATUS, + 'assets_by_class': chariot.stats.util.ASSETS_BY_CLASS, + 'seeds': chariot.stats.util.SEEDS + } + + # Use mapped filter if available, otherwise use raw filter string + actual_filter = filter_map.get(filter, filter) + + render_list_results( + chariot.stats.list( + actual_filter, + from_date, + to_date, + offset, + pagination_size(page) + ), + details + ) diff --git a/praetorian_cli/handlers/update.py b/praetorian_cli/handlers/update.py index a1bb315..84837e5 100644 --- a/praetorian_cli/handlers/update.py +++ b/praetorian_cli/handlers/update.py @@ -3,7 +3,7 @@ from praetorian_cli.handlers.chariot import chariot from praetorian_cli.handlers.cli_decorators import cli_handler from praetorian_cli.handlers.utils import AssetPriorities -from praetorian_cli.sdk.model.globals import Risk +from praetorian_cli.sdk.model.globals import Risk, Seed, Attribute @chariot.group() @@ -50,3 +50,42 @@ def risk(chariot, key, status, comment): - praetorian chariot update risk "#risk#www.example.com#open-ssh-port" --status RH --comment "John stopped sshd on the server" """ chariot.risks.update(key, status, comment) + + +@update.command() +@cli_handler +@click.argument('key', required=True) +@click.option('-s', '--status', type=click.Choice([s.value for s in Seed]), required=True, + help='The status of the seed') +def seed(chariot, key, status): + """ Update the status of a seed + + \b + Argument: + - KEY: the key of an existing seed + + \b + Example usages: + - praetorian chariot update seed "#seed#domain#example.com" -s A + - praetorian chariot update seed "#seed#ip#1.1.1.0/24" -s F + """ + chariot.seeds.update(key, status) + + +@update.command() +@cli_handler +@click.argument('key', required=True) +@click.option('-s', '--status', type=click.Choice([s.value for s in Attribute]), required=True, + help='The status of the seed') +def attribute(chariot, key, status): + """ Update the status of an attribute + + \b + Argument: + - KEY: the key of an existing attribute + + \b + Example usages: + - praetorian chariot update attribute "#attribute#surface#provided#seed#domain#example.com" -s D + """ + chariot.attributes.update(key, status) diff --git a/praetorian_cli/handlers/utils.py b/praetorian_cli/handlers/utils.py index e9dd5c2..3b68dab 100644 --- a/praetorian_cli/handlers/utils.py +++ b/praetorian_cli/handlers/utils.py @@ -28,7 +28,7 @@ def render_offset(offset): def pagination_size(page): - return 1000 if page == 'all' else 1 + return 10000 if page == 'all' else 1 def print_json(data): @@ -37,7 +37,7 @@ def print_json(data): def error(message, quit=True): - click.secho('ERROR: ', fg='red', nl=False) - click.echo(message) + click.secho('ERROR: ', fg='red', nl=False, err=True) + click.echo(message, err=True) if quit: exit(1) diff --git a/praetorian_cli/sdk/chariot.py b/praetorian_cli/sdk/chariot.py index c4a0fa1..a97149d 100644 --- a/praetorian_cli/sdk/chariot.py +++ b/praetorian_cli/sdk/chariot.py @@ -12,7 +12,9 @@ from praetorian_cli.sdk.entities.jobs import Jobs from praetorian_cli.sdk.entities.risks import Risks from praetorian_cli.sdk.entities.search import Search +from praetorian_cli.sdk.entities.seeds import Seeds from praetorian_cli.sdk.entities.webhook import Webhook +from praetorian_cli.sdk.entities.stats import Stats from praetorian_cli.sdk.keychain import Keychain @@ -21,6 +23,7 @@ class Chariot: def __init__(self, keychain: Keychain): self.keychain = keychain self.assets = Assets(self) + self.seeds = Seeds(self) self.risks = Risks(self) self.accounts = Accounts(self) self.integrations = Integrations(self) @@ -30,25 +33,23 @@ def __init__(self, keychain: Keychain): self.attributes = Attributes(self) self.search = Search(self) self.webhook = Webhook(self) + self.stats = Stats(self) def my(self, params: dict, pages=1) -> {}: - my_resp = dict() + final_resp = dict() for _ in range(pages): resp = requests.get(f'{self.keychain.base_url()}/my', params=params, headers=self.keychain.headers()) process_failure(resp) resp = resp.json() - for key, value in resp.items(): - if key in my_resp and isinstance(value, list): - my_resp[key].extend(value) - else: - my_resp[key] = value - if 'offset' in resp: - params['offset'] = json.dumps(resp['offset']) - else: - my_resp.pop('offset', None) + extend(final_resp, resp) + + if 'offset' not in resp: break - return my_resp + + params['offset'] = json.dumps(resp['offset']) + + return final_resp def post(self, type: str, params): resp = requests.post(f'{self.keychain.base_url()}/{type}', @@ -146,3 +147,18 @@ def process_failure(response): if not response.ok: message = f'[{response.status_code}] Request failed' + (f'\nError: {response.text}' if response.text else '') raise Exception(message) + + +def extend(accumulate, new): + for key, value in new.items(): + if isinstance(value, list): + if key in accumulate: + accumulate[key].extend(value) + else: + accumulate[key] = value + elif isinstance(value, dict): + if key not in accumulate: + accumulate[key] = dict() + extend(accumulate[key], value) + + return accumulate diff --git a/praetorian_cli/sdk/entities/accounts.py b/praetorian_cli/sdk/entities/accounts.py index 85b14d3..99acb69 100644 --- a/praetorian_cli/sdk/entities/accounts.py +++ b/praetorian_cli/sdk/entities/accounts.py @@ -9,7 +9,7 @@ def get(self, key): """ Get details of an account """ return self.api.search.by_exact_key(key) - def list(self, username_filter='', offset=None, pages=1000): + def list(self, username_filter='', offset=None, pages=10000): """ List accounts of collaborators and also list the master accounts that the current principal can access. diff --git a/praetorian_cli/sdk/entities/assets.py b/praetorian_cli/sdk/entities/assets.py index 0dbbcfb..243c36e 100644 --- a/praetorian_cli/sdk/entities/assets.py +++ b/praetorian_cli/sdk/entities/assets.py @@ -58,7 +58,7 @@ def delete(self, key): """ return self.api.delete('asset', key) - def list(self, prefix_filter='', offset=None, pages=1000): + def list(self, prefix_filter='', offset=None, pages=10000): """ List assets Arguments: diff --git a/praetorian_cli/sdk/entities/attributes.py b/praetorian_cli/sdk/entities/attributes.py index 37f3f48..ec60c0c 100644 --- a/praetorian_cli/sdk/entities/attributes.py +++ b/praetorian_cli/sdk/entities/attributes.py @@ -13,11 +13,15 @@ def get(self, key): """ Get details of an attribute """ return self.api.search.by_exact_key(key) + def update(self, key, status): + """ Update an attribute """ + return self.api.upsert('attribute', dict(key=key, status=status)) + def delete(self, key): """ Delete an attribute """ return self.api.delete('attribute', key) - def list(self, prefix_filter='', source_key=None, offset=None, pages=1000): + def list(self, prefix_filter='', source_key=None, offset=None, pages=10000): """ List attribute, optionally prefix-filtered by the portion of the key after '#attribute#' """ if source_key: diff --git a/praetorian_cli/sdk/entities/definitions.py b/praetorian_cli/sdk/entities/definitions.py index df95623..8d3654f 100644 --- a/praetorian_cli/sdk/entities/definitions.py +++ b/praetorian_cli/sdk/entities/definitions.py @@ -22,7 +22,7 @@ def get(self, definition_name, download_directory=os.getcwd()): file.write(content) return download_path - def list(self, name_filter='', offset=None, pages=1000): + def list(self, name_filter='', offset=None, pages=10000): """ List the definition names, optionally prefix-filtered by a definition name """ definitions, next_offset = self.api.search.by_key_prefix(f'#file#definitions/{name_filter}', offset, pages) names = [d['name'][12:] for d in definitions] diff --git a/praetorian_cli/sdk/entities/files.py b/praetorian_cli/sdk/entities/files.py index bc48082..11f75dc 100644 --- a/praetorian_cli/sdk/entities/files.py +++ b/praetorian_cli/sdk/entities/files.py @@ -16,7 +16,7 @@ def get(self, chariot_filepath, download_directory=os.getcwd()): """ download a file """ return self.api.download(chariot_filepath, download_directory) - def list(self, prefix_filter='', offset=None, pages=1000): + def list(self, prefix_filter='', offset=None, pages=10000): """ List the files, optionally prefix-filtered by portion of the key after '#file#'. File keys read '#file#{filepath}' """ return self.api.search.by_key_prefix(f'#file#{prefix_filter}', offset, pages) diff --git a/praetorian_cli/sdk/entities/integrations.py b/praetorian_cli/sdk/entities/integrations.py index 491d744..acfc6a5 100644 --- a/praetorian_cli/sdk/entities/integrations.py +++ b/praetorian_cli/sdk/entities/integrations.py @@ -12,7 +12,7 @@ def get(self, key): """ Get details of an integration """ return self.api.search.by_exact_key(key) - def list(self, name_filter='', offset=None, pages=1000): + def list(self, name_filter='', offset=None, pages=10000): """ List integrations, optionally filtered by the name of the integrations, such as github, amazon, gcp, etc. """ results, next_offset = self.api.search.by_key_prefix('#account#', offset, pages) diff --git a/praetorian_cli/sdk/entities/jobs.py b/praetorian_cli/sdk/entities/jobs.py index f15c1c5..f97bed6 100644 --- a/praetorian_cli/sdk/entities/jobs.py +++ b/praetorian_cli/sdk/entities/jobs.py @@ -5,15 +5,18 @@ class Jobs: def __init__(self, api): self.api = api - def add(self, target_key): + def add(self, target_key, capabilities=[]): """ Add a job for an asset or an attribute """ - return self.api.force_add('job', dict(key=target_key)) + params = dict(key=target_key) + if capabilities: + params = params | dict(capabilities=capabilities) + return self.api.force_add('job', params) def get(self, key): """ Get details of a job """ return self.api.search.by_exact_key(key) - def list(self, prefix_filter='', offset=None, pages=1000): + def list(self, prefix_filter='', offset=None, pages=10000): """ List jobs, optionally prefix-filtered by the portion of the key after '#job#' """ return self.api.search.by_key_prefix(f'#job#{prefix_filter}', offset, pages) diff --git a/praetorian_cli/sdk/entities/risks.py b/praetorian_cli/sdk/entities/risks.py index 62702d3..8bc2487 100644 --- a/praetorian_cli/sdk/entities/risks.py +++ b/praetorian_cli/sdk/entities/risks.py @@ -71,7 +71,7 @@ def delete(self, key, comment=None): params = dict() return self.api.delete('risk', key, params) - def list(self, prefix_filter='', offset=None, pages=1000): + def list(self, prefix_filter='', offset=None, pages=10000): """ List risks Arguments: @@ -79,7 +79,6 @@ def list(self, prefix_filter='', offset=None, pages=1000): Supply this to perform prefix-filtering of the risk keys after the "#risk#" portion of the key. Risk keys read '#risk#{asset_dns}#{risk_name}' - offset: str The offset of the page you want to retrieve results. If this is not supplied, this function retrieves from the first page. diff --git a/praetorian_cli/sdk/entities/search.py b/praetorian_cli/sdk/entities/search.py index 43fdaf1..256f692 100644 --- a/praetorian_cli/sdk/entities/search.py +++ b/praetorian_cli/sdk/entities/search.py @@ -6,7 +6,7 @@ def __init__(self, api): def count(self, search_term) -> {}: return self.api.count(dict(key=search_term)) - def by_key_prefix(self, key_prefix, offset=None, pages=1000) -> tuple: + def by_key_prefix(self, key_prefix, offset=None, pages=10000) -> tuple: return self.by_term(key_prefix, offset, pages) def by_exact_key(self, key, get_attributes=False) -> {}: @@ -17,19 +17,19 @@ def by_exact_key(self, key, get_attributes=False) -> {}: hit['attributes'] = attributes return hit - def by_source(self, source, offset=None, pages=1000) -> tuple: + def by_source(self, source, offset=None, pages=10000) -> tuple: return self.by_term(f'source:{source}', offset, pages) - def by_status(self, status_prefix, offset=None, pages=1000) -> tuple: + def by_status(self, status_prefix, offset=None, pages=10000) -> tuple: return self.by_term(f'status:{status_prefix}', offset, pages) - def by_name(self, name_prefix, offset=None, pages=1000) -> tuple: + def by_name(self, name_prefix, offset=None, pages=10000) -> tuple: return self.by_term(f'name:{name_prefix}', offset, pages) - def by_ip(self, ip_prefix, offset=None, pages=1000) -> tuple: + def by_ip(self, ip_prefix, offset=None, pages=10000) -> tuple: return self.by_term(f'ip:{ip_prefix}', offset, pages) - def by_dns(self, dns_prefix, offset=None, pages=1000) -> tuple: + def by_dns(self, dns_prefix, offset=None, pages=10000) -> tuple: return self.by_term(f'dns:{dns_prefix}', offset, pages) def by_term(self, search_term, offset=None, pages=1000, exact=False) -> tuple: @@ -53,7 +53,7 @@ def by_term(self, search_term, offset=None, pages=1000, exact=False) -> tuple: def flatten_results(results): - if type(results) == list: + if isinstance(results, list): return results flattened = [] for key in results.keys(): diff --git a/praetorian_cli/sdk/entities/seeds.py b/praetorian_cli/sdk/entities/seeds.py new file mode 100644 index 0000000..36ab48f --- /dev/null +++ b/praetorian_cli/sdk/entities/seeds.py @@ -0,0 +1,56 @@ +from praetorian_cli.handlers.utils import error +from praetorian_cli.sdk.model.globals import Seed + + +class Seeds: + """ The methods in this class are to be assessed from sdk.seeds, where sdk is an instance + of Chariot. """ + + def __init__(self, api): + self.api = api + + def add(self, dns, status=Seed.PENDING.value): + """ Add a seed """ + return self.api.upsert('seed', dict(dns=dns, status=status)) + + def get(self, key, details=False): + """ Get details of a seed """ + seed = self.api.search.by_exact_key(key, details) + return seed + + def update(self, key, status): + """ Update a seed; only status field makes sense to be updated. """ + seed = self.api.search.by_exact_key(key) + if seed: + # the seed PUT endpoint is different from other PUT endpoints. This one has to + # take the DNS of the original seed, instead of the key of the seed record. + # TODO, 2024-12-23, peter: check with Noah as to why. Ideally, we should + # standardize to how other endpoints do it + return self.api.upsert('seed', dict(dns=seed['dns'], status=status)) + else: + error(f'Seed {key} is not found.') + + def delete(self, key): + """ Delete a seed """ + seed = self.api.search.by_exact_key(key) + if seed: + # TODO, 2024-12-23, peter: check with Noah why this is different from + # deleting assets and risks + return self.api.upsert('seed', dict(dns=seed['dns'], status=Seed.DELETED.value)) + else: + error(f'Seed {key} is not found.') + + def list(self, type='', prefix_filter='', offset=None, pages=10000): + """ List seeds """ + prefix_term = '#seed#' + if type: + prefix_term = f'{prefix_term}{type}#' + if prefix_filter: + prefix_term = f'{prefix_term}{prefix_filter}' + + return self.api.search.by_key_prefix(prefix_term, offset, pages) + + def attributes(self, key): + """ list associated attributes """ + attributes, _ = self.api.search.by_source(key) + return attributes diff --git a/praetorian_cli/sdk/entities/stats.py b/praetorian_cli/sdk/entities/stats.py new file mode 100644 index 0000000..12073cf --- /dev/null +++ b/praetorian_cli/sdk/entities/stats.py @@ -0,0 +1,121 @@ +class StatsUtil: + """Helper class for building statistics filters""" + + # Main categories + RISKS = "my#status" # Risk statistics by status/severity + RISK_EVENTS = "event#risk" # Risk event statistics + ASSETS_BY_STATUS = "asset#status" # Asset statistics by status - NOTE this is just to differentiate from RISKS; the actual prefix is the same + ASSETS_BY_CLASS = "class##asset##" # Asset statistics by class + SEEDS = "class##seed" # Seed statistics by class + + # All possible risk statuses + RISK_STATUSES = ["T", "O", "R", "I", "D"] + + # All possible asset statuses + ASSET_STATUSES = ["A", "P", "D", "F", "AL", "AH"] + + @staticmethod + def risks_by_status(status=None, severity=None): + """Build filter for risk statistics by status and/or severity""" + filter = "my#status" + if status: + filter += f":{status}" + if severity: + filter += f"#{severity}" + return filter + + @staticmethod + def get_statistics_help(): + """Returns simplified help text for statistics""" + return """ + Available Statistics Filters: + 1. Risks & Status: + --filter risks : All risk statistics + --filter risk_events : All risk event statistics + --filter "my#status:O#H" : Open high severity risks + + 2. Assets: + --filter assets_by_status : All asset statistics by status (A,P,D,F,AL,AH) + --filter assets_by_class : All asset statistics by class + --filter seeds : All seed statistics by class + + Examples: + 1. Current risk counts: + $ chariot list statistics --filter risks --to now + + 2. Risk event history: + $ chariot list statistics --filter risk_events --from 2024-01-01 + + 3. Current asset status: + $ chariot list statistics --filter assets_by_status --to now + """ + +class Stats: + """ The methods in this class are to be assessed from sdk.statistics, where sdk is an instance + of Chariot. """ + + def __init__(self, api): + self.api = api + self.util = StatsUtil + + def list(self, prefix_filter='', from_date=None, to_date=None, offset=None, pages=1000): + """List statistics with optional date range filtering""" + # Handle the shorthands + if prefix_filter == self.util.RISKS: + all_stats = [] + for status in self.util.RISK_STATUSES: + risk_filter = self.util.risks_by_status(status) + stats, _ = self._query_single(risk_filter, from_date, to_date, offset, pages) + all_stats.extend(stats) + return all_stats, None + elif prefix_filter == self.util.RISK_EVENTS: + # events require double pounds before event type + return self._query_single("event##risk#", from_date, to_date, offset, pages) + elif prefix_filter == self.util.ASSETS_BY_STATUS: + all_stats = [] + for status in self.util.ASSET_STATUSES: + asset_filter = f"my#status:{status}" + stats, _ = self._query_single(asset_filter, from_date, to_date, offset, pages) + all_stats.extend(stats) + return all_stats, None + elif prefix_filter == self.util.ASSETS_BY_CLASS: + return self._query_single("class##asset#", from_date, to_date, offset, pages) + elif prefix_filter == self.util.SEEDS: + return self._query_single("class##seed", from_date, to_date, offset, pages) + else: + return self._query_single(prefix_filter, from_date, to_date, offset, pages) + + def _query_single(self, prefix_filter, from_date, to_date, offset, pages): + """Make a single query with the given parameters""" + params = {} + + if from_date or to_date: + base_key = f'#statistic#{prefix_filter}' if prefix_filter else '#statistic' + if from_date: + params['key'] = f'{base_key}#{from_date}' + else: + params['key'] = base_key + if to_date: + params['to'] = f'{base_key}#{to_date}' + else: + params['to'] = f'{base_key}#now' + else: + params['key'] = f'#statistic#{prefix_filter}' + + if offset: + params['offset'] = offset + + results = self.api.my(params, pages) + stats = self._flatten_results(results) + + next_offset = results.get('offset') + return stats, next_offset + + def _flatten_results(self, results): + if isinstance(results, list): + return results + flattened = [] + for value in results.values(): + if isinstance(value, (list, dict)): + flattened.extend(self._flatten_results(value)) + return flattened diff --git a/praetorian_cli/sdk/entities/webhook.py b/praetorian_cli/sdk/entities/webhook.py index 7b831d8..27c0694 100644 --- a/praetorian_cli/sdk/entities/webhook.py +++ b/praetorian_cli/sdk/entities/webhook.py @@ -34,5 +34,6 @@ def delete(self): return None def webhook_url(self, pin): - username = b64encode(self.api.keychain.username().encode('utf8')) - return f'{self.api.keychain.base_url()}/hook/{username.decode("utf8").rstrip("=")}/{pin}' + # Use current_principal() instead of username() + username = b64encode(self.api.accounts.current_principal().encode('utf8')) + return f'{self.api.keychain.base_url()}/hook/{username.decode("utf8").rstrip("=")}/{pin}' diff --git a/praetorian_cli/sdk/keychain.py b/praetorian_cli/sdk/keychain.py index 35c01be..368e40b 100644 --- a/praetorian_cli/sdk/keychain.py +++ b/praetorian_cli/sdk/keychain.py @@ -1,4 +1,5 @@ from configparser import ConfigParser +from os import environ from os.path import join, split from pathlib import Path from time import time @@ -51,14 +52,25 @@ def load(self): error(f'Could not find the "{self.profile}" profile in {self.filepath}. Run "praetorian configure" to fix.') profile = self.config[self.profile] - if not all(['username' in profile, 'password' in profile, 'api' in profile, 'client_id' in profile]): + if 'api' not in profile or 'client_id' not in profile: error(f'Keychain profile "{self.profile}" is corrupted or incomplete. Run "praetorian configure" to fix.') + self.load_env('username', 'PRAETORIAN_CLI_USERNAME') + self.load_env('password', 'PRAETORIAN_CLI_PASSWORD') + if self.account is None: self.account = self.config.get(self.profile, 'account', fallback=None) return self + def load_env(self, config_name, env_name): + if not self.config.get(self.profile, config_name, fallback=None): + if env_name in environ: + self.config.set(self.profile, config_name, environ[env_name]) + else: + error( + f'{config_name} not in keychain or the {env_name} env variable. Run "praetorian configure" to fix.') + def token(self): """ Authenticate to AWS Cognito and get the token. Cache the token until expiry. """ if not self.token_cache or time() >= (self.token_expiry - 10): @@ -107,10 +119,14 @@ def configure(username, password, profile=DEFAULT_PROFILE, api=DEFAULT_API, clie 'client_id': client_id, 'api': api, 'user_pool_id': user_pool_id, - 'username': username, - 'password': password } + if username: + new_profile['username'] = username + + if password: + new_profile['password'] = password + if account: new_profile['account'] = account diff --git a/praetorian_cli/sdk/model/globals.py b/praetorian_cli/sdk/model/globals.py index e6b117f..f8ca357 100644 --- a/praetorian_cli/sdk/model/globals.py +++ b/praetorian_cli/sdk/model/globals.py @@ -16,6 +16,25 @@ class Asset(Enum): PENDING_LOW = 'PL' +class Seed(Enum): + REJECTED = 'FR' + ACTIVE = Asset.ACTIVE.value + ACTIVE_LOW = Asset.ACTIVE_LOW.value + DELETED = Asset.DELETED.value + PENDING = Asset.PENDING.value + PENDING_LOW = Asset.PENDING_LOW.value + FROZEN = Asset.FROZEN.value + FROZEN_LOW = Asset.FROZEN_LOW.value + + +class Attribute(Enum): + ACTIVE = Asset.ACTIVE.value + ACTIVE_LOW = Asset.ACTIVE_LOW.value + PENDING = Asset.PENDING.value + PENDING_LOW = Asset.PENDING_LOW.value + DELETED = Asset.DELETED.value + + class Risk(Enum): TRIAGE_INFO = 'TI' TRIAGE_LOW = 'TL' @@ -56,8 +75,50 @@ class Risk(Enum): class AddRisk(Enum): """ AddRisk is a subset of Risk. These are the only valid statuses when creating manual risks """ - TRIAGE_INFO = 'TI' - TRIAGE_LOW = 'TL' - TRIAGE_MEDIUM = 'TM' - TRIAGE_HIGH = 'TH' - TRIAGE_CRITICAL = 'TC' + TRIAGE_INFO = Risk.TRIAGE_INFO.value + TRIAGE_LOW = Risk.TRIAGE_LOW.value + TRIAGE_MEDIUM = Risk.TRIAGE_MEDIUM.value + TRIAGE_HIGH = Risk.TRIAGE_HIGH.value + TRIAGE_CRITICAL = Risk.TRIAGE_CRITICAL.value + + +CAPABILITIES = ( + 'favicon', + 'reverse-csp', + 'nuclei', + 'whois', + 'subdomain', + 'csp-mine', + 'tls-mine', + 'portscan', + 'github', + 'github-repository', + 'secrets', + 'amazon', + 'azure', + 'gcp', + 'ns1', + 'cloudflare', + 'gato', + 'crowdstrike', + 'crawler', + 'gitlab', + 'ssh', + 'azuread-discovery', + 'edgar', + 'nessus', + 'nessus-import', + 'insightvm', + 'insightvm-import', + 'qualys', + 'qualys-import', + 'burp-enterprise', + 'ip', + 'cidr', + 'website', + 'reverse-whois', + 'digitalocean', + 'burp-internal', + 'seed-import', + 'builtwith' +) diff --git a/praetorian_cli/sdk/model/utils.py b/praetorian_cli/sdk/model/utils.py index df17074..7434b85 100644 --- a/praetorian_cli/sdk/model/utils.py +++ b/praetorian_cli/sdk/model/utils.py @@ -10,3 +10,11 @@ def risk_key(dns, name): def attribute_key(name, value, source_key): return f'#attribute#{name}#{value}{source_key}' + + +def seed_key(type, dns): + return f'#seed#{type}#{dns}' + + +def seed_status(type, status_code): + return f'{type}#{status_code}' diff --git a/praetorian_cli/sdk/test/test_attribute.py b/praetorian_cli/sdk/test/test_attribute.py index 6f79221..b10768a 100644 --- a/praetorian_cli/sdk/test/test_attribute.py +++ b/praetorian_cli/sdk/test/test_attribute.py @@ -1,5 +1,6 @@ import pytest +from praetorian_cli.sdk.model.globals import Attribute from praetorian_cli.sdk.test.utils import make_test_values, clean_test_entities, setup_chariot @@ -26,6 +27,10 @@ def test_get_attribute(self): assert a['value'] == self.attribute_value assert a['source'] == self.asset_key + def test_update_attribute(self): + self.sdk.attributes.update(self.asset_attribute_key, Attribute.DELETED.value) + assert self.sdk.attributes.get(self.asset_attribute_key)['status'] == Attribute.DELETED.value + def test_delete_attribute(self): self.sdk.attributes.delete(self.asset_attribute_key) assert self.sdk.attributes.get(self.asset_attribute_key) is None diff --git a/praetorian_cli/sdk/test/test_extend.py b/praetorian_cli/sdk/test/test_extend.py new file mode 100644 index 0000000..e4a4df2 --- /dev/null +++ b/praetorian_cli/sdk/test/test_extend.py @@ -0,0 +1,45 @@ +import pytest + +from praetorian_cli.sdk.chariot import extend + + +@pytest.mark.coherence +class TestExtend: + + def test_both_empty(self): + assert extend(dict(), dict()) == dict() + + def test_empty_accumulate(self): + assert extend(dict(), dict(c=[1, 2], d=[3, 4])) == dict(c=[1, 2], d=[3, 4]) + + def test_empty_new(self): + assert extend(dict(c=[1, 2], d=[3, 4]), dict()) == dict(c=[1, 2], d=[3, 4]) + + def test_no_overlap(self): + assert extend(dict(a=[1, 2], b=[4, 5]), dict(c=[7, 8])) == dict(a=[1, 2], b=[4, 5], c=[7, 8]) + + def test_overlap(self): + assert extend(dict(a=[1, 2], b=[5, 6]), dict(c=[7, 8], a=[3, 4])) == dict(a=[1, 2, 3, 4], b=[5, 6], c=[7, 8]) + + def test_dict_in_new(self): + assert extend(dict(a=[1], b=[2]), dict(c=dict(d=[3], e=[4]))) == dict(a=[1], b=[2], c=dict(d=[3], e=[4])) + + def test_dict_in_accumulate(self): + assert extend(dict(c=dict(d=[3], e=[4])), dict(a=[1], b=[2])) == dict(a=[1], b=[2], c=dict(d=[3], e=[4])) + + def test_extend_array_in_dict(self): + assert extend(dict(c=dict(d=[3], e=[4])), dict(c=dict(d=[5]))) == dict(c=dict(d=[3, 5], e=[4])) + + def test_new_array_in_new(self): + assert extend(dict(c=dict(e=[4])), dict(c=dict(d=[5]))) == dict(c=dict(d=[5], e=[4])) + + def test_new_dict_in_new(self): + assert extend(dict(a=[1]), dict(b=dict(c=[5]))) == dict(a=[1], b=dict(c=[5])) + + def test_unexpected_data_type(self): + assert extend(dict(), dict(a=dict(b="1", c=[1, 2]))) == dict(a=dict(c=[1, 2])) + + def test_deeper(self): + assert (extend(dict(a=dict(b=dict(c=dict(d=[1]), e=[3]), f=[1])), + dict(a=dict(b=dict(c=dict(d=[2]), e=[4])))) == + dict(a=dict(b=dict(c=dict(d=[1, 2]), e=[3, 4]), f=[1]))) diff --git a/praetorian_cli/sdk/test/test_job.py b/praetorian_cli/sdk/test/test_job.py index ac7c56e..832659b 100644 --- a/praetorian_cli/sdk/test/test_job.py +++ b/praetorian_cli/sdk/test/test_job.py @@ -1,5 +1,6 @@ import pytest +from praetorian_cli.sdk.model.utils import asset_key from praetorian_cli.sdk.test.utils import make_test_values, clean_test_entities, setup_chariot @@ -11,12 +12,12 @@ def setup_class(self): make_test_values(self) def test_add_job(self): - result = self.sdk.assets.add(self.asset_dns, self.asset_name) - asset_key = result['key'] - self.sdk.jobs.add(asset_key) + result = self.sdk.assets.add(self.asset_dns, self.asset_dns) + self.sdk.jobs.add(result['key']) jobs, _ = self.sdk.jobs.list(self.asset_dns) assert len(jobs) > 0 assert jobs[0]['dns'] == self.asset_dns def teardown_class(self): clean_test_entities(self.sdk, self) + self.sdk.assets.delete(asset_key(self.asset_dns, self.asset_dns)) diff --git a/praetorian_cli/sdk/test/test_risk.py b/praetorian_cli/sdk/test/test_risk.py index bb7642d..db58766 100644 --- a/praetorian_cli/sdk/test/test_risk.py +++ b/praetorian_cli/sdk/test/test_risk.py @@ -24,7 +24,7 @@ def test_get_risk(self): def test_list_risks(self): results, _ = self.sdk.risks.list() - assert len(results) > 1 + assert len(results) > 0 assert any(r['dns'] == self.asset_dns for r in results) def test_update_risk(self): diff --git a/praetorian_cli/sdk/test/test_seed.py b/praetorian_cli/sdk/test/test_seed.py new file mode 100644 index 0000000..e228278 --- /dev/null +++ b/praetorian_cli/sdk/test/test_seed.py @@ -0,0 +1,41 @@ +import pytest + +from praetorian_cli.sdk.model.globals import Seed +from praetorian_cli.sdk.model.utils import seed_status +from praetorian_cli.sdk.test.utils import make_test_values, clean_test_entities, setup_chariot + + +@pytest.mark.coherence +class TestSeed: + + def setup_class(self): + self.sdk = setup_chariot() + make_test_values(self) + + def test_add_seed(self): + seed = self.sdk.seeds.add(self.seed_dns) + assert seed['key'] == self.seed_key + + def test_get_seed(self): + a = self.get_seed() + assert a['dns'] == self.seed_dns + assert a['status'] == seed_status('domain', Seed.PENDING.value) + + def test_list_seed(self): + results, _ = self.sdk.seeds.list('domain', self.seed_dns) + assert len(results) == 1 + assert results[0]['dns'] == self.seed_dns + + def test_update_seed(self): + self.sdk.seeds.update(self.seed_key, Seed.FROZEN.value) + assert self.get_seed()['status'] == seed_status('domain', Seed.FROZEN.value) + + def test_delete_seed(self): + self.sdk.seeds.delete(self.seed_key) + assert self.sdk.seeds.get(self.seed_key)['status'] == seed_status('domain', Seed.DELETED.value) + + def get_seed(self): + return self.sdk.seeds.get(self.seed_key) + + def teardown_class(self): + clean_test_entities(self.sdk, self) diff --git a/praetorian_cli/sdk/test/test_z_cli.py b/praetorian_cli/sdk/test/test_z_cli.py index 72c2197..4fcbb9d 100644 --- a/praetorian_cli/sdk/test/test_z_cli.py +++ b/praetorian_cli/sdk/test/test_z_cli.py @@ -1,10 +1,11 @@ import os +from subprocess import run import pytest -from praetorian_cli.sdk.model.globals import AddRisk, Asset, Risk -from praetorian_cli.sdk.test.utils import verify_cli, epoch_micro, random_ip, make_test_values, clean_test_entities, \ - setup_chariot +from praetorian_cli.sdk.model.globals import AddRisk, Asset, Risk, Seed, Attribute +from praetorian_cli.sdk.model.utils import seed_status +from praetorian_cli.sdk.test.utils import epoch_micro, random_ip, make_test_values, clean_test_entities, setup_chariot @pytest.mark.cli @@ -16,46 +17,79 @@ def setup_class(self): def test_asset_cli(self): o = make_test_values(lambda: None) - verify_cli(f'add asset -n {o.asset_name} -d {o.asset_dns}') + self.verify(f'add asset -n {o.asset_name} -d {o.asset_dns}') - verify_cli('list assets -p all', [o.asset_key]) - verify_cli(f'list assets -f "{o.asset_dns}"', [o.asset_key]) - verify_cli(f'list assets -p first -f "{o.asset_dns}"', [o.asset_key]) - verify_cli(f'list assets -p all -f "{o.asset_dns}"', [o.asset_key]) - verify_cli(f'list assets -d -f "{o.asset_dns}"', [o.asset_key, '"key"', '"data"']) + self.verify('list assets -p all', [o.asset_key]) + self.verify(f'list assets -f "{o.asset_dns}"', [o.asset_key]) + self.verify(f'list assets -f "{o.asset_dns}" -p first', [o.asset_key]) + self.verify(f'list assets -f "{o.asset_dns}" -p all', [o.asset_key]) + self.verify(f'list assets -f "{o.asset_dns}" -d', [o.asset_key, '"key"', '"data"']) - verify_cli(f'list assets -f {epoch_micro()}') + self.verify(f'list assets -f {epoch_micro()}') - verify_cli(f'get asset "{o.asset_key}"', [o.asset_key, f'"status": "{Asset.ACTIVE.value}"']) - verify_cli(f'get asset -d "{o.asset_key}"', ['"attributes"', '"associated_risks"']) + self.verify(f'get asset "{o.asset_key}"', [o.asset_key, f'"status": "{Asset.ACTIVE.value}"']) + self.verify(f'get asset -d "{o.asset_key}"', ['"attributes"', '"associated_risks"']) - verify_cli(f'update asset -p discover "{o.asset_key}"') - verify_cli(f'get asset "{o.asset_key}"', [o.asset_key, f'"status": "{Asset.ACTIVE_LOW.value}"']) + self.verify(f'update asset -p discover "{o.asset_key}"') + self.verify(f'get asset "{o.asset_key}"', [o.asset_key, f'"status": "{Asset.ACTIVE_LOW.value}"']) - verify_cli(f'delete asset "{o.asset_key}"') - verify_cli(f'get asset "{o.asset_key}"', [f'"status": "{Asset.DELETED.value}"']) + self.verify(f'delete asset "{o.asset_key}"') + self.verify(f'get asset "{o.asset_key}"', [f'"status": "{Asset.DELETED.value}"']) + + clean_test_entities(self.sdk, o) + + def test_seed_cli(self): + o = make_test_values(lambda: None) + + self.verify(f'add seed -d {o.seed_dns}') + + self.verify('list seeds -p all', [o.seed_key]) + self.verify('list seeds -t domain -p all', [o.seed_key]) + self.verify(f'list seeds -t domain -f "{o.seed_dns}"', [o.seed_key]) + self.verify(f'list seeds -t domain -f "{o.seed_dns}" -p first', [o.seed_key]) + self.verify(f'list seeds -t domain -f "{o.seed_dns}" -p all', [o.seed_key]) + self.verify(f'list seeds -t domain -f "{o.seed_dns}" -p first', [o.seed_key]) + self.verify(f'list seeds -t domain -f "{o.seed_dns}" -d', [o.seed_dns, '"key"', '"data"']) + self.verify(f'list seeds -t ip -f "{o.seed_dns}"') + self.verify(f'list seeds -f "{o.seed_dns}"', [], + ["When the DNS filter is specified, you also need to specify the type of the filter"]) + + self.verify(f'list seeds -t domain -f {epoch_micro()}') + + self.verify(f'get seed "{o.seed_key}"', + [o.seed_key, f'"status": "{seed_status("domain", Seed.PENDING.value)}"']) + self.verify(f'get seed -d "{o.seed_key}"', ['"attributes"']) + + self.verify(f'update seed -s {Seed.FROZEN.value} "{o.seed_key}"') + self.verify(f'get seed "{o.seed_key}"', + [o.seed_key, f'"status": "{seed_status("domain", Seed.FROZEN.value)}"']) + + self.verify(f'delete seed "{o.seed_key}"') + self.verify(f'get seed "{o.seed_key}"', [f'"status": "{seed_status("domain", Seed.DELETED.value)}"']) clean_test_entities(self.sdk, o) def test_risk_cli(self): o = make_test_values(lambda: None) - verify_cli(f'add asset -n {o.asset_name} -d {o.asset_dns}') + self.verify(f'add asset -n {o.asset_name} -d {o.asset_dns}') - verify_cli(f'add risk {o.risk_name} -a "{o.asset_key}" -s {AddRisk.TRIAGE_HIGH.value}') + self.verify(f'add risk {o.risk_name} -a "{o.asset_key}" -s {AddRisk.TRIAGE_HIGH.value}') - verify_cli('list risks -p all', [o.risk_key]) - verify_cli(f'list risks -f "{o.asset_dns}"', [o.risk_key]) - verify_cli(f'list risks -d -f "{o.asset_dns}"', [o.risk_key, '"key"', '"data"']) - verify_cli(f'list risks -f {epoch_micro()}') + self.verify('list risks -p all', [o.risk_key]) + self.verify(f'list risks -f "{o.asset_dns}"', [o.risk_key]) + self.verify(f'list risks -f "{o.asset_dns}" -p first', [o.risk_key]) + self.verify(f'list risks -f "{o.asset_dns}" -p all', [o.risk_key]) + self.verify(f'list risks -f "{o.asset_dns}" -d', [o.risk_key, '"key"', '"data"']) + self.verify(f'list risks -f {epoch_micro()}') - verify_cli(f'get risk "{o.risk_key}"', [o.risk_key, f'"status": "{AddRisk.TRIAGE_HIGH.value}"']) - verify_cli(f'get risk -d "{o.risk_key}"', ['"attributes"', '"affected_assets"']) + self.verify(f'get risk "{o.risk_key}"', [o.risk_key, f'"status": "{AddRisk.TRIAGE_HIGH.value}"']) + self.verify(f'get risk -d "{o.risk_key}"', ['"attributes"', '"affected_assets"']) - verify_cli(f'update risk "{o.risk_key}" -s {Risk.OPEN_LOW.value}') - verify_cli(f'get risk "{o.risk_key}"', [o.risk_key, f'"status": "{Risk.OPEN_LOW.value}"']) + self.verify(f'update risk "{o.risk_key}" -s {Risk.OPEN_LOW.value}') + self.verify(f'get risk "{o.risk_key}"', [o.risk_key, f'"status": "{Risk.OPEN_LOW.value}"']) - verify_cli(f'delete risk "{o.risk_key}"') - verify_cli(f'get risk "{o.risk_key}"', [f'"status": "{Risk.DELETED_LOW.value}"']) + self.verify(f'delete risk "{o.risk_key}"') + self.verify(f'get risk "{o.risk_key}"', [f'"status": "{Risk.DELETED_LOW.value}"']) clean_test_entities(self.sdk, o) @@ -67,11 +101,11 @@ def test_definition_cli(self): with open(local_filepath, 'w') as f: f.write(content) - verify_cli(f'add definition {local_filepath} -n {definition_name}') - verify_cli(f'list definitions -f {definition_name}', [definition_name]) - verify_cli(f'list definitions -f {definition_name} -p all', [definition_name]) - verify_cli(f'list definitions -f {definition_name} -p first', [definition_name]) - verify_cli(f'get definition {definition_name}', ['Saved', definition_name]) + self.verify(f'add definition {local_filepath} -n {definition_name}') + self.verify(f'list definitions -f {definition_name}', [definition_name]) + self.verify(f'list definitions -f {definition_name} -p first', [definition_name]) + self.verify(f'list definitions -f {definition_name} -p all', [definition_name]) + self.verify(f'get definition {definition_name}', ['Saved', definition_name]) with open(definition_name, 'r') as f: assert f.read() == content @@ -81,122 +115,144 @@ def test_definition_cli(self): def test_attribute_cli(self): o = make_test_values(lambda: None) - verify_cli(f'add asset -n {o.asset_name} -d {o.asset_dns}') - verify_cli(f'add attribute -n {o.attribute_name} -v {o.attribute_value} -k "{o.asset_key}"') + self.verify(f'add asset -n {o.asset_name} -d {o.asset_dns}') + self.verify(f'add attribute -n {o.attribute_name} -v {o.attribute_value} -k "{o.asset_key}"') + + self.verify('list attributes -p all', [o.asset_attribute_key]) + self.verify(f'list attributes -f {o.attribute_name} -p all', [o.asset_attribute_key]) + self.verify(f'list attributes -k "{o.asset_key}" -p all', [o.asset_attribute_key]) + self.verify(f'list attributes -k "{o.asset_key}" -d -p all', [o.asset_attribute_key, '"key"', '"data"']) - verify_cli('list attributes -p all', [o.asset_attribute_key]) - verify_cli(f'list attributes -f {o.attribute_name} -p all', [o.asset_attribute_key]) - verify_cli(f'list attributes -k "{o.asset_key}" -p all', [o.asset_attribute_key]) - verify_cli(f'list attributes -k "{o.asset_key}" -d -p all', [o.asset_attribute_key, '"key"', '"data"']) + self.verify(f'get attribute "{o.asset_attribute_key}"', [o.asset_attribute_key, '"key"', '"name"']) - verify_cli(f'get attribute "{o.asset_attribute_key}"', [o.asset_attribute_key, '"key"', '"name"']) + self.verify(f'update attribute -s {Attribute.DELETED.value} "{o.asset_attribute_key}"') + self.verify(f'get attribute "{o.asset_attribute_key}"', + [o.asset_attribute_key, f'"status": "{Attribute.DELETED.value}"']) - verify_cli(f'delete attribute "{o.asset_attribute_key}"') - verify_cli(f'get attribute "{o.asset_attribute_key}"') + self.verify(f'delete attribute "{o.asset_attribute_key}"') + self.verify(f'get attribute "{o.asset_attribute_key}"') clean_test_entities(self.sdk, o) def test_search_cli(self): o = make_test_values(lambda: None) - verify_cli(f'add asset -n {o.asset_name} -d {o.asset_dns}') + self.verify(f'add asset -n {o.asset_name} -d {o.asset_dns}') - verify_cli(f'search -t "#asset#{o.asset_dns}"', [o.asset_key]) - verify_cli(f'search -t "#asset#{o.asset_dns}" -d', [o.asset_key, '"key"', '"data"']) - verify_cli(f'search -t "#asset#{o.asset_dns}" -c', ['"A": 1']) + self.verify(f'search -t "#asset#{o.asset_dns}" -p all', [o.asset_key]) + self.verify(f'search -t "#asset#{o.asset_dns}" -d -p all', [o.asset_key, '"key"', '"data"']) + self.verify(f'search -t "#asset#{o.asset_dns}" -c -p all', ['"A": 1']) - verify_cli(f'search -t "source:{o.asset_key}"', ['surface#provided', o.asset_key, 'attribute']) - verify_cli(f'search -t "ip:{o.asset_name}"', [o.asset_key]) - verify_cli(f'search -t "name:{o.asset_name}"', [o.asset_key]) - verify_cli(f'search -t "dns:{o.asset_dns}"', [o.asset_key]) + self.verify(f'search -t "source:{o.asset_key}" -p all', ['surface#provided', o.asset_key, 'attribute']) + self.verify(f'search -t "ip:{o.asset_name}" -p all', [o.asset_key]) + self.verify(f'search -t "name:{o.asset_name}" -p all', [o.asset_key]) + self.verify(f'search -t "dns:{o.asset_dns}" -p all', [o.asset_key]) - verify_cli(f'search -t "source:{o.asset_key}"', ['surface#provided', o.asset_key, 'attribute']) - verify_cli(f'search -t "ip:{o.asset_name}"', [o.asset_key]) - verify_cli(f'search -t "name:{o.asset_name}"', [o.asset_key]) - verify_cli(f'search -t "dns:{o.asset_dns}"', [o.asset_key]) - verify_cli(f'search -t "status:{Asset.ACTIVE.value}"', [o.asset_key]) + self.verify(f'search -t "source:{o.asset_key}" -p all', ['surface#provided', o.asset_key, 'attribute']) + self.verify(f'search -t "ip:{o.asset_name}" -p all', [o.asset_key]) + self.verify(f'search -t "name:{o.asset_name}" -p all', [o.asset_key]) + self.verify(f'search -t "dns:{o.asset_dns}" -p all', [o.asset_key]) + self.verify(f'search -t "status:{Asset.ACTIVE.value}" -p all', [o.asset_key]) - verify_cli(f'add attribute -n {o.attribute_name} -v {o.attribute_value} -k "{o.asset_key}"') + self.verify(f'add attribute -n {o.attribute_name} -v {o.attribute_value} -k "{o.asset_key}"') - verify_cli(f'search -t "name:{o.attribute_name}"', [o.asset_key, 'attribute']) + self.verify(f'search -t "name:{o.attribute_name}" -p all', [o.asset_key, 'attribute']) clean_test_entities(self.sdk, o) def test_webhook_cli(self): - verify_cli(f'delete webhook', ignore_stdout=True) + self.verify(f'delete webhook', ignore_stdout=True) - verify_cli(f'add webhook', ['amazonaws.com/', '/hook/', 'https://']) - verify_cli(f'get webhook', ['amazonaws.com/', '/hook/', 'https://']) - verify_cli(f'add webhook', ['There is an existing webhook.']) - verify_cli(f'delete webhook', ['Webhook successfully deleted.']) - verify_cli(f'delete webhook', ['No webhook previously exists.']) + self.verify(f'add webhook', ['amazonaws.com/', '/hook/', 'https://']) + self.verify(f'get webhook', ['amazonaws.com/', '/hook/', 'https://']) + self.verify(f'add webhook', ['There is an existing webhook.']) + self.verify(f'delete webhook', ['Webhook successfully deleted.']) + self.verify(f'delete webhook', ['No webhook previously exists.']) def test_account_cli(self): o = make_test_values(lambda: None) - verify_cli(f'link account {o.email}') - verify_cli(f'list accounts', [o.email]) - verify_cli(f'list accounts -d', [o.email, '"key"']) - verify_cli(f'list accounts -f {o.email}', [o.email]) - verify_cli(f'unlink account {o.email}') - verify_cli(f'list accounts -f {o.email}') + self.verify(f'link account {o.email}') + self.verify(f'list accounts', [o.email]) + self.verify(f'list accounts -d', [o.email, '"key"']) + self.verify(f'list accounts -f {o.email}', [o.email]) + self.verify(f'unlink account {o.email}') + self.verify(f'list accounts -f {o.email}') def test_integration_cli(self): - verify_cli('list integrations', ignore_stdout=True) - verify_cli('list integrations -d', ignore_stdout=True) + self.verify('list integrations', ignore_stdout=True) + self.verify('list integrations -d', ignore_stdout=True) def test_help_cli(self): - verify_cli('--help', ignore_stdout=True) - verify_cli('list --help', ignore_stdout=True) - verify_cli('list assets --help', ignore_stdout=True) - verify_cli('list risks --help', ignore_stdout=True) - verify_cli('list accounts --help', ignore_stdout=True) - verify_cli('list integrations --help', ignore_stdout=True) - verify_cli('list jobs --help', ignore_stdout=True) - verify_cli('list files --help', ignore_stdout=True) - verify_cli('list definitions --help', ignore_stdout=True) - verify_cli('list attributes --help', ignore_stdout=True) - - verify_cli('get --help', ignore_stdout=True) - verify_cli('get asset --help', ignore_stdout=True) - verify_cli('get risk --help', ignore_stdout=True) - verify_cli('get account --help', ignore_stdout=True) - verify_cli('get integration --help', ignore_stdout=True) - verify_cli('get job --help', ignore_stdout=True) - verify_cli('get file --help', ignore_stdout=True) - verify_cli('get definition --help', ignore_stdout=True) - verify_cli('get attribute --help', ignore_stdout=True) - verify_cli('get webhook --help', ignore_stdout=True) - - verify_cli('add --help', ignore_stdout=True) - verify_cli('add asset --help', ignore_stdout=True) - verify_cli('add risk --help', ignore_stdout=True) - verify_cli('add attribute --help', ignore_stdout=True) - verify_cli('add job --help', ignore_stdout=True) - verify_cli('add file --help', ignore_stdout=True) - verify_cli('add definition --help', ignore_stdout=True) - verify_cli('add webhook --help', ignore_stdout=True) - - verify_cli('imports --help', ignore_stdout=True) - verify_cli('imports qualys --help', ignore_stdout=True) - verify_cli('imports insightvm --help', ignore_stdout=True) - verify_cli('imports nessus --help', ignore_stdout=True) - - verify_cli('link --help', ignore_stdout=True) - verify_cli('link account --help', ignore_stdout=True) - - verify_cli('unlink --help', ignore_stdout=True) - verify_cli('unlink account --help', ignore_stdout=True) - - verify_cli('delete --help', ignore_stdout=True) - verify_cli('delete asset --help', ignore_stdout=True) - verify_cli('delete risk --help', ignore_stdout=True) - verify_cli('delete attribute --help', ignore_stdout=True) - verify_cli('delete webhook --help', ignore_stdout=True) - - verify_cli('update --help', ignore_stdout=True) - verify_cli('update asset --help', ignore_stdout=True) - verify_cli('update risk --help', ignore_stdout=True) - - verify_cli('search --help', ignore_stdout=True) - verify_cli('script --help', ignore_stdout=True) - verify_cli('purge --help', ignore_stdout=True) + self.verify('--help', ignore_stdout=True) + self.verify('list --help', ignore_stdout=True) + self.verify('list assets --help', ignore_stdout=True) + self.verify('list risks --help', ignore_stdout=True) + self.verify('list accounts --help', ignore_stdout=True) + self.verify('list integrations --help', ignore_stdout=True) + self.verify('list jobs --help', ignore_stdout=True) + self.verify('list files --help', ignore_stdout=True) + self.verify('list definitions --help', ignore_stdout=True) + self.verify('list attributes --help', ignore_stdout=True) + + self.verify('get --help', ignore_stdout=True) + self.verify('get asset --help', ignore_stdout=True) + self.verify('get risk --help', ignore_stdout=True) + self.verify('get account --help', ignore_stdout=True) + self.verify('get integration --help', ignore_stdout=True) + self.verify('get job --help', ignore_stdout=True) + self.verify('get file --help', ignore_stdout=True) + self.verify('get definition --help', ignore_stdout=True) + self.verify('get attribute --help', ignore_stdout=True) + self.verify('get webhook --help', ignore_stdout=True) + + self.verify('add --help', ignore_stdout=True) + self.verify('add asset --help', ignore_stdout=True) + self.verify('add risk --help', ignore_stdout=True) + self.verify('add attribute --help', ignore_stdout=True) + self.verify('add job --help', ignore_stdout=True) + self.verify('add file --help', ignore_stdout=True) + self.verify('add definition --help', ignore_stdout=True) + self.verify('add webhook --help', ignore_stdout=True) + + self.verify('imports --help', ignore_stdout=True) + self.verify('imports qualys --help', ignore_stdout=True) + self.verify('imports insightvm --help', ignore_stdout=True) + self.verify('imports nessus --help', ignore_stdout=True) + + self.verify('link --help', ignore_stdout=True) + self.verify('link account --help', ignore_stdout=True) + + self.verify('unlink --help', ignore_stdout=True) + self.verify('unlink account --help', ignore_stdout=True) + + self.verify('delete --help', ignore_stdout=True) + self.verify('delete asset --help', ignore_stdout=True) + self.verify('delete risk --help', ignore_stdout=True) + self.verify('delete attribute --help', ignore_stdout=True) + self.verify('delete webhook --help', ignore_stdout=True) + + self.verify('update --help', ignore_stdout=True) + self.verify('update asset --help', ignore_stdout=True) + self.verify('update risk --help', ignore_stdout=True) + + self.verify('search --help', ignore_stdout=True) + self.verify('script --help', ignore_stdout=True) + self.verify('purge --help', ignore_stdout=True) + + def verify(self, command, expected_stdout=[], expected_stderr=[], ignore_stdout=False): + result = run(f'praetorian --profile "{self.sdk.keychain.profile}" chariot {command}', capture_output=True, + text=True, shell=True) + if expected_stdout: + for out in expected_stdout: + assert out in result.stdout, f'CLI "{command}" does not contain {out} in stdout; instead, got {result.stdout}' + else: + if not ignore_stdout: + assert len(result.stdout) == 0, \ + f'CLI "{command}" should not have content in stdout; instead, got {result.stdout}' + + if expected_stderr: + for err in expected_stderr: + assert err in result.stderr, f'CLI "{command}" of CLI does not contain {out} in stderr; instead, got {result.stderr}' + else: + assert len(result.stderr) == 0, \ + f'CLI "{command}" should not have content in stderr; instead, got {result.stderr}' diff --git a/praetorian_cli/sdk/test/utils.py b/praetorian_cli/sdk/test/utils.py index 2e71a57..8706a76 100644 --- a/praetorian_cli/sdk/test/utils.py +++ b/praetorian_cli/sdk/test/utils.py @@ -1,11 +1,10 @@ import os import time from random import randint -from subprocess import run from praetorian_cli.sdk.chariot import Chariot from praetorian_cli.sdk.keychain import Keychain -from praetorian_cli.sdk.model.utils import risk_key, asset_key, attribute_key +from praetorian_cli.sdk.model.utils import risk_key, asset_key, attribute_key, seed_key def epoch_micro(): @@ -28,6 +27,8 @@ def make_test_values(o): o.asset_dns = random_dns() o.asset_name = random_ip() o.asset_key = asset_key(o.asset_dns, o.asset_name) + o.seed_dns = random_dns() + o.seed_key = seed_key('domain', o.seed_dns) o.risk_name = f'test-risk-name-{epoch_micro()}' o.risk_key = risk_key(o.asset_dns, o.risk_name) o.comment = f'Test comment {epoch_micro()}' @@ -47,23 +48,5 @@ def clean_test_entities(sdk, o): sdk.assets.delete(o.asset_key) -def verify_cli(command, expected_stdout=[], expected_stderr=[], ignore_stdout=False): - result = run(f'praetorian chariot {command}', capture_output=True, text=True, shell=True) - if expected_stdout: - for out in expected_stdout: - assert out in result.stdout, f'CLI "{command}" does not contain {out} in stdout; instead, got {result.stdout}' - else: - if not ignore_stdout: - assert len(result.stdout) == 0, \ - f'CLI "{command}" should not have content in stdout; instead, got {result.stdout}' - - if expected_stderr: - for err in expected_stderr: - assert err in result.stderr, f'CLI "{command}" of CLI does not contain {out} in stderr; instead, got {result.stderr}' - else: - assert len(result.stderr) == 0, \ - f'CLI "{command}" should not have content in stderr; instead, got {result.stderr}' - - def setup_chariot(): return Chariot(Keychain(os.environ.get('CHARIOT_TEST_PROFILE'))) diff --git a/setup.cfg b/setup.cfg index 8e30f26..e80f609 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = praetorian-cli -version = 1.5.3 +version = 1.5.9 author = Praetorian author_email = support@praetorian.com description = For interacting with the Chariot API