-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_native.py
241 lines (194 loc) · 8.06 KB
/
example_native.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Flask + Requests-OAuthlib example.
This example demonstrates how to integrate a server application with
Authentiq Connect, using standard OAuth 2.0. It uses the popular
requests-oauthlib module to make this trivial in Flask.
As with all plain OAuth 2.0 integrations, we use the UserInfo endpoint to
retrieve the user profile after authorization. Check out our native
AuthentiqJS snippet or an OpenID Connect library to optimise this.
"""
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals
)
import os
import oauthlib.oauth2.rfc6749.errors as oauth2_errors
import requests
from flask import (
Flask,
abort,
g,
redirect,
render_template,
request,
session,
url_for
)
from requests_oauthlib import OAuth2Session
class Config(object):
"""
Flask configuration container.
"""
DEBUG = True
TESTING = False
SECRET_KEY = "wu8EiPh2LeeChaikoh3doo2n"
AUTHENTIQ_BASE = "https://connect.authentiq.io/"
AUTHORIZE_URL = AUTHENTIQ_BASE + "authorize"
TOKEN_URL = AUTHENTIQ_BASE + "token"
USERINFO_URL = AUTHENTIQ_BASE + "userinfo"
# The following app is registered at Authentiq Connect.
CLIENT_ID = os.environ.get("CLIENT_ID", "examples-flask-native")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET", "ed25519")
# Personal details requested from the user. See the "scopes_supported" key in
# the following JSON document for an up to date list of supported scopes:
#
# https://connect.authentiq.io/.well-known/openid-configuration
#
REQUESTED_SCOPES = ["openid", "aq:name", "email", "aq:push"]
PORT = 8000
REDIRECT_URL = "http://localhost:%d/authorized" % PORT
DISPLAY = "modal"
app = Flask(__name__)
app.config.from_object(Config)
@app.before_request
def requests_session():
g.user = g.userinfo = None
# TODO: all this stuff is plain oauth2, for oidc it's much simpler
# should support both flows here for demo purposes
if "token" in session:
# Pass in our token; requests-oauthlib will
# take care of setting the access_token in the Authorization header
authentiq = OAuth2Session(
CLIENT_ID,
token=session.get("token")
)
# Now we can use the access_token to retrieve an OpenID Connect
# compatible UserInfo structure from the provider. Once again,
# requests-oauthlib adds a valid Authorization header for us.
#
# Note that this request can be optimized out if using an OIDC or
# native Authentiq Connect client.
try:
userinfo = authentiq.get(USERINFO_URL).json()
# store user and userinfo as Flask globals
g.user = userinfo["sub"]
g.userinfo = userinfo
print("User {} is signed in".format(g.user))
except ValueError:
app.logger.warning("No user is signed in")
g.user = g.userinfo = None
except oauth2_errors.OAuth2Error as e:
code = e.status_code or 400
description = "Provider returned: " + (e.description or e.error)
app.logger.error("%d: %s" % (code, description))
g.user = g.userinfo = None
# The HTTP request to the UserInfo endpoint failed.
except requests.exceptions.HTTPError as e:
abort(502, description="Request to userinfo endpoint failed: " +
e.response.reason)
@app.route("/")
def index():
state = None
# if user is not logged in, then create a new session
if g.user is None:
# Check if redirect_uri matches with the one registered with the
# example client.
assert url_for("authorized", _external=True) == REDIRECT_URL, (
"For this demo to work correctly, please make sure it is "
"hosted on localhost, so that the redirect URL is exactly " +
REDIRECT_URL + ".")
# Initialise an authentication session. Here we pass in scope and
# redirect_uri explicitly, though when omitted defaults will be taken
# from the registered client.
authentiq = OAuth2Session(
client_id=CLIENT_ID,
scope=REQUESTED_SCOPES,
redirect_uri=url_for("authorized", _external=True),
)
# Build the authorization URL and retrieve some client state.
authorization_url, state = authentiq.authorization_url(AUTHORIZE_URL)
# Save state to match it in the response.
session["state"] = state
# Redirect to the Authentiq Connect authentication endpoint.
return render_template("index.html",
provider_uri=AUTHENTIQ_BASE,
client_id=CLIENT_ID,
scope=" ".join(REQUESTED_SCOPES),
redirect_uri=REDIRECT_URL,
state=state,
display=DISPLAY,
logout_uri=url_for(".logout"))
@app.route("/authorized")
def authorized():
"""
OAuth 2.0 redirection point.
"""
# Pass in our client side crypto state; requests-oauthlib will
# take care of matching it in the OAuth2 response.
authentiq = OAuth2Session(CLIENT_ID, state=session.get("state"))
try:
error = request.args["error"]
oauth2_errors.raise_from_error(error, request.args)
except KeyError:
pass
except oauth2_errors.OAuth2Error as e:
code = e.status_code or 400
description = "Provider returned: " + (e.description or e.error)
app.logger.error("%d: %s" % (code, description))
# Redirect to the Authentiq Connect authentication endpoint.
return render_template("authorized.html",
provider_uri=AUTHENTIQ_BASE,
client_id=CLIENT_ID,
redirect_uri=REDIRECT_URL,
state=session.get("state"),
display=DISPLAY,
redirect_to=url_for(".index"))
try:
# Use our client_secret to exchange the authorization code for a
# token. Requests-oauthlib parses the redirected URL for us.
# The token will contain the access_token, a refresh_token, and the
# scope the end-user consented to.
token = authentiq.fetch_token(TOKEN_URL,
client_secret=CLIENT_SECRET,
authorization_response=request.url)
session["token"] = token
app.logger.info("Received token: %s" % token)
# The incoming request looks flaky, let's not handle it further.
except oauth2_errors.OAuth2Error as e:
description = "Request to token endpoint failed: " + \
(e.description or e.error)
abort(e.status_code or 400, description=description)
# The HTTP request to the token endpoint failed.
except requests.exceptions.HTTPError as e:
code = e.response.status_code or 502
description = "Request to token endpoint failed: " + e.response.reason
abort(code, description=description)
# Display the structure, use userinfo["sub"] as the user's UUID.
# return jsonify(userinfo)
# Redirect to the Authentiq Connect authentication endpoint.
return render_template("authorized.html",
provider_uri=AUTHENTIQ_BASE,
client_id=CLIENT_ID,
redirect_uri=REDIRECT_URL,
state=session.get("state"),
display=DISPLAY,
redirect_to=url_for(".index"))
@app.route("/logout")
def logout():
for key in {"user", "token"}:
try:
del session[key]
except KeyError:
pass
return redirect(url_for(".index"))
if __name__ == "__main__":
if app.debug:
import os
# Allow insecure oauth2 when debugging
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
# Explicitly set `host=localhost` in order to get the correct redirect_uri.
app.run(host="localhost", port=PORT)