-
Notifications
You must be signed in to change notification settings - Fork 6
/
weibo.py
127 lines (101 loc) · 3.65 KB
/
weibo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -*- coding: utf-8 -*-
"""Python sina weibo sdk.
Rely on `requests` to do the dirty work, so it's much simpler and cleaner
than the official SDK.
For more info, refer to:
http://lxyu.github.io/weibo/
"""
from __future__ import absolute_import
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
import json
import time
import requests
class Client(object):
def __init__(self, api_key, api_secret, redirect_uri, token=None,
username=None, password=None):
# const define
self.site = 'https://api.weibo.com/'
self.authorization_url = self.site + 'oauth2/authorize'
self.token_url = self.site + 'oauth2/access_token'
self.api_url = self.site + '2/'
# init basic info
self.client_id = api_key
self.client_secret = api_secret
self.redirect_uri = redirect_uri
self.session = requests.session()
if username and password:
self.session.auth = username, password
# activate client directly if given token
if token:
self.set_token(token)
@property
def authorize_url(self):
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri
}
return "{0}?{1}".format(self.authorization_url, urlencode(params))
@property
def alive(self):
if self.expires_at:
return self.expires_at > time.time()
else:
return False
def set_code(self, authorization_code):
"""Activate client by authorization_code.
"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': self.redirect_uri
}
res = requests.post(self.token_url, data=params)
token = json.loads(res.text)
self._assert_error(token)
token[u'expires_at'] = int(time.time()) + int(token.pop(u'expires_in'))
self.set_token(token)
def set_token(self, token):
"""Directly activate client by access_token.
"""
self.token = token
self.uid = token['uid']
self.access_token = token['access_token']
self.expires_at = token['expires_at']
self.session.params = {'access_token': self.access_token}
def _assert_error(self, d):
"""Assert if json response is error.
"""
if 'error_code' in d and 'error' in d:
raise RuntimeError("{0} {1}".format(
d.get("error_code", ""), d.get("error", "")))
def get(self, uri, **kwargs):
"""Request resource by get method.
"""
url = "{0}{1}.json".format(self.api_url, uri)
# for username/password client auth
if self.session.auth:
kwargs['source'] = self.client_id
res = json.loads(self.session.get(url, params=kwargs).text)
self._assert_error(res)
return res
def post(self, uri, **kwargs):
"""Request resource by post method.
"""
url = "{0}{1}.json".format(self.api_url, uri)
# for username/password client auth
if self.session.auth:
kwargs['source'] = self.client_id
if "pic" not in kwargs:
res = json.loads(self.session.post(url, data=kwargs).text)
else:
files = {"pic": kwargs.pop("pic")}
res = json.loads(self.session.post(url,
data=kwargs,
files=files).text)
self._assert_error(res)
return res