From fee7832d6f1116feac6b08ab3450f4ed1d706f7b Mon Sep 17 00:00:00 2001 From: Alexander Ioannidis Date: Thu, 1 Nov 2018 16:54:30 +0100 Subject: [PATCH] cli: add basic token management commands * Addresses #186. --- invenio_oauth2server/cli.py | 100 ++++++++++++++++++++++++++++++++++++ setup.py | 3 ++ tests/conftest.py | 7 +++ tests/test_cli.py | 62 ++++++++++++++++++++++ 4 files changed, 172 insertions(+) create mode 100644 invenio_oauth2server/cli.py create mode 100644 tests/test_cli.py diff --git a/invenio_oauth2server/cli.py b/invenio_oauth2server/cli.py new file mode 100644 index 00000000..5f77d17c --- /dev/null +++ b/invenio_oauth2server/cli.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2018 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""CLI commands for managing the OAuth server.""" + +from functools import wraps + +import click +from flask.cli import with_appcontext +from invenio_accounts.models import User +from invenio_db import db +from werkzeug.local import LocalProxy + +from .models import Client, Token +from .validators import validate_scopes + + +def lazy_result(f): + """Decorate function to return LazyProxy.""" + @wraps(f) + def decorated(ctx, param, value): + return LocalProxy(lambda: f(ctx, param, value)) + return decorated + + +@lazy_result +def process_user(ctx, param, value): + """Return a user if exists.""" + if value: + if value.isdigit(): + user = User.query.get(str(value)) + else: + user = User.query.filter(User.email == value).one_or_none() + return user + + +@lazy_result +def process_scopes(ctx, param, values): + """Return a user if exists.""" + validate_scopes(values) + return values + + +@click.group() +def tokens(): + """OAuth2 server token commands.""" + + +@tokens.command('create') +@click.option('-n', '--name', required=True) +@click.option( + '-u', '--user', required=True, callback=process_user, + help='User ID or email.') +@click.option( + '-s', '--scope', 'scopes', multiple=True, callback=process_scopes) +@click.option('-i', '--internal', is_flag=True) +@with_appcontext +def tokens_create(name, user, scopes, internal): + """Create a personal OAuth token.""" + token = Token.create_personal( + name, user.id, scopes=scopes, is_internal=internal) + db.session.commit() + click.secho(token.access_token, fg='blue') + + +@tokens.command('delete') +@click.option('-n', '--name') +@click.option('-u', '--user', callback=process_user, help='User ID or email.') +@click.option('--token', 'read_access_token', is_flag=True) +@click.option('--force', is_flag=True) +@with_appcontext +def tokens_delete(name=None, user=None, read_access_token=None, force=False): + """Delete a personal OAuth token.""" + if not (name or user) and not read_access_token: + click.get_current_context().fail( + 'You have to pass either a "name" and "user" or the "token"') + if name and user: + client = Client.query.filter( + Client.user_id == user.id, + Client.name == name, + Client.is_internal.is_(True)).one() + token = Token.query.filter( + Token.user_id == user.id, + Token.is_personal.is_(True), + Token.client_id == client.client_id).one() + elif read_access_token: + access_token = click.prompt('Token', hide_input=True) + token = Token.query.filter(Token.access_token == access_token).one() + else: + click.get_current_context().fail('No token was found with provided') + if force or click.confirm('Are you sure you want to delete the token?'): + db.session.delete(token) + db.session.commit() + click.secho( + 'Token "{}" deleted.'.format(token.access_token), fg='yellow') diff --git a/setup.py b/setup.py index b5202682..5fed9a43 100644 --- a/setup.py +++ b/setup.py @@ -102,6 +102,9 @@ include_package_data=True, platforms='any', entry_points={ + 'flask.commands': [ + 'tokens = invenio_oauth2server.cli:tokens', + ], 'invenio_admin.views': [ 'invenio_oauth2server_clients_adminview = ' 'invenio_oauth2server.admin:oauth2server_clients_adminview', diff --git a/tests/conftest.py b/tests/conftest.py index c2f1d598..49ac7987 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,6 +17,7 @@ import pytest from flask import Flask, url_for +from flask.cli import ScriptInfo from flask.views import MethodView from flask_babelex import Babel from flask_breadcrumbs import Breadcrumbs @@ -119,6 +120,12 @@ def api_app_with_test_view(api_app): return api_app +@pytest.fixture +def script_info(app): + """Get ScriptInfo object for testing CLI.""" + return ScriptInfo(create_app=lambda info: app) + + @pytest.fixture def settings_fixture(app): """Fixture for testing settings views.""" diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 00000000..10f4a188 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2018 CERN. +# +# Invenio is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +from click.testing import CliRunner +from invenio_db import db + +from invenio_oauth2server.cli import tokens_create, tokens_delete +from invenio_oauth2server.models import Client, Token + + +""" +$ invenio tokens create "my-token" \ + --email foobar@example.com --internal \ + --scopes user:email,records:read +abcdef1234567890abcdef1234567890 + +# Delete a token +$ invenio tokens delete --name "my-token" --email foobar@example.com +Token "abcdef1234567890abcdef1234567890" deleted + +# ...or... +$ invenio tokens delete +Enter token: +Token "abcdef1234567890abcdef1234567890" deleted +""" + + +def test_cli_tokens(app, script_info, settings_fixture): + """Test create user CLI.""" + runner = CliRunner() + + result = runner.invoke( + tokens_create, + ['--name', 'test-token', + '--user', 'info@inveniosoftware.org'], + obj=script_info) + assert result.exit_code == 0 + access_token = result.output.strip() + + with app.app_context(): + client = Client.query.one() + assert client.user.email == 'info@inveniosoftware.org' + + token = Token.query.one() + assert token.access_token == access_token + assert token.scopes == [] + + result = runner.invoke( + tokens_delete, + ['--name', 'test-token', + '--user', 'info@inveniosoftware.org', + '--force'], + obj=script_info) + assert result.exit_code == 0 + + with app.app_context(): + assert Token.query.count() == 0