Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 054128c

Browse files
committedSep 16, 2023
[#] lambda_worker: Add twitter OAuth key retrieve job type
1 parent c83e401 commit 054128c

File tree

4 files changed

+124
-80
lines changed

4 files changed

+124
-80
lines changed
 

‎cmd/lambda_worker/main.go

+40-17
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,18 @@ func handler(ctx context.Context, sqs_event events.SQSEvent) (events.SQSEventRes
7474
case types.QueueActions.ArweaveUpload:
7575
arweaveMsgs[message.Persona] = raw_message.MessageId
7676
case types.QueueActions.Revalidate:
77-
if err := revalidate_single(ctx, &message); err != nil {
77+
if err := revalidateSingle(ctx, &message); err != nil {
7878
fmt.Printf("error revalidating proof record %d: %s\n", message.ProofID, err)
7979
// Ignore failed revalidation job since failed job will still update DB.
8080
// failures = append(failures, events.SQSBatchItemFailure{ItemIdentifier: raw_message.MessageId})
8181
}
82+
case types.QueueActions.TwitterOAuthTokenAcquire: {
83+
err := twitterRetrieveOAuthToken()
84+
if err != nil {
85+
// Ignore errors for now
86+
fmt.Printf("Error when retrieving Twitter OAuth key: %s", err.Error())
87+
}
88+
}
8289
default:
8390
logrus.Warnf("unsupported queue action: %s", message.Action)
8491
failures = append(failures, events.SQSBatchItemFailure{ItemIdentifier: raw_message.MessageId})
@@ -137,7 +144,7 @@ func arweave_upload_many(personas []string) error {
137144
break
138145
}
139146

140-
item, err := arweave_bundle_single(pc, previous)
147+
item, err := arweaveBundleSingle(pc, previous)
141148
if err != nil {
142149
logrus.Errorf("error marshalling proof chain %s: %w", pc.Uuid, err)
143150
break
@@ -169,7 +176,7 @@ func arweave_upload_many(personas []string) error {
169176
return nil
170177
}
171178

172-
func arweave_bundle_single(pc *model.ProofChain, previous *model.ProofChain) (*artypes.BundleItem, error) {
179+
func arweaveBundleSingle(pc *model.ProofChain, previous *model.ProofChain) (*artypes.BundleItem, error) {
173180
previousUuid := ""
174181
previousArweaveID := ""
175182
if previous != nil {
@@ -224,7 +231,7 @@ func arweave_bundle_single(pc *model.ProofChain, previous *model.ProofChain) (*a
224231
return &item, nil
225232
}
226233

227-
func revalidate_single(ctx context.Context, message *types.QueueMessage) error {
234+
func revalidateSingle(ctx context.Context, message *types.QueueMessage) error {
228235
proof := model.Proof{}
229236
tx := model.DB.Preload("ProofChain").Preload("ProofChain.Previous").Where("id = ?", message.ProofID).First(&proof)
230237
if tx.Error != nil {
@@ -233,15 +240,15 @@ func revalidate_single(ctx context.Context, message *types.QueueMessage) error {
233240
return proof.Revalidate()
234241
}
235242

236-
func init_db(cfg aws.Config) {
243+
func initDB(cfg aws.Config) {
237244
model.Init(false) // TODO: should read auto migrate from ENV
238245
}
239246

240247
// func init_sqs(cfg aws.Config) {
241248
// sqs.Init(cfg)
242249
// }
243250

244-
func init_validators() {
251+
func initValidators() {
245252
twitter.Init()
246253
ethereum.Init()
247254
keybase.Init()
@@ -264,19 +271,19 @@ func init() {
264271
logrus.Fatalf("Unable to load AWS config: %s", err)
265272
}
266273
common.CurrentRuntime = common.Runtimes.Lambda
267-
init_config_from_aws_secret()
274+
initConfigFromAWSSecret()
268275
logrus.SetLevel(logrus.InfoLevel)
269276

270-
init_db(cfg)
277+
initDB(cfg)
271278
// init_sqs(cfg)
272-
init_validators()
279+
initValidators()
273280
}
274281

275-
func init_config_from_aws_secret() {
282+
func initConfigFromAWSSecret() {
276283
if initialized {
277284
return
278285
}
279-
secret_name := getE("SECRET_NAME", "")
286+
secretName := getE("SECRET_NAME", "")
280287
region := getE("SECRET_REGION", "")
281288

282289
// Create a Secrets Manager client
@@ -290,7 +297,7 @@ func init_config_from_aws_secret() {
290297

291298
client := secretsmanager.NewFromConfig(cfg)
292299
input := secretsmanager.GetSecretValueInput{
293-
SecretId: aws.String(secret_name),
300+
SecretId: aws.String(secretName),
294301
VersionStage: aws.String("AWSCURRENT"),
295302
}
296303
result, err := client.GetSecretValue(context.Background(), &input)
@@ -319,17 +326,33 @@ func init_config_from_aws_secret() {
319326
initialized = true
320327
}
321328

322-
func getE(env_key, default_value string) string {
323-
result := os.Getenv(env_key)
329+
func getE(envKey, defaultValue string) string {
330+
result := os.Getenv(envKey)
324331
if len(result) == 0 {
325-
if len(default_value) > 0 {
326-
return default_value
332+
if len(defaultValue) > 0 {
333+
return defaultValue
327334
} else {
328-
logrus.Fatalf("ENV %s must be given! Abort.", env_key)
335+
logrus.Fatalf("ENV %s must be given! Abort.", envKey)
329336
return ""
330337
}
331338

332339
} else {
333340
return result
334341
}
335342
}
343+
344+
func twitterRetrieveOAuthToken() (err error) {
345+
type TokenList struct {
346+
Tokens [] twitter.Tokens `json:"tokens"`
347+
}
348+
// TODO: Retrieve existed token from a storage space (i.e., KV / S3)
349+
350+
tokens, err := twitter.GenerateOauthToken()
351+
if err != nil {
352+
return err
353+
}
354+
fmt.Printf("TWITTER OAUTH KEY REGISTERED: %+v", *tokens)
355+
356+
// TODO: save new token to a storage space (i.e., KV / S3)
357+
return nil
358+
}

‎types/mq.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package types
33
type QueueAction string
44

55
var QueueActions = struct {
6-
Revalidate QueueAction
7-
ArweaveUpload QueueAction
6+
Revalidate QueueAction
7+
ArweaveUpload QueueAction
8+
TwitterOAuthTokenAcquire QueueAction
89
}{
9-
Revalidate: "revalidate",
10-
ArweaveUpload: "arweave_upload",
10+
Revalidate: "revalidate",
11+
ArweaveUpload: "arweave_upload",
12+
TwitterOAuthTokenAcquire: "twitter_oauth_token_acquire",
1113
}
1214

1315
// QueueMessage indicates structure of messages in Amazon SQS.

‎validator/twitter/api.go

+60-46
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99
"time"
1010

11+
"github.com/nextdotid/proof_server/util"
1112
"github.com/samber/lo"
1213
"golang.org/x/xerrors"
1314
)
@@ -20,16 +21,18 @@ type APIResponse struct {
2021
Text string `json:"text"`
2122
}
2223

23-
const (
24-
BASIC_AUTH_USERNAME = "3rJOl1ODzm9yZy63FACdg"
25-
BASIC_AUTH_PASSWORD = "5jPoQ5kQvMJFDYRNE8bQ4rHuds4xJqhvgNJM4awaE8"
26-
)
24+
type Tokens struct {
25+
AccessToken string `json:"access_token"`
26+
GuestToken string `json:"guest_token"`
27+
FlowToken string `json:"flow_token"`
28+
OAuthKey string `json:"oauth_key"`
29+
OAuthSecret string `json:"oauth_secret"`
30+
CreatedAt string `json:"created_at"`
31+
}
2732

28-
var (
29-
// TODO: should save accessToken to somewhere else (shared by all Lambda instances)
30-
accessToken string
31-
guestToken string
32-
flowToken string
33+
const (
34+
TWITTER_CONSUMER_KEY = "3rJOl1ODzm9yZy63FACdg"
35+
TWITTER_CONSUMER_SECRET = "5jPoQ5kQvMJFDYRNE8bQ4rHuds4xJqhvgNJM4awaE8"
3336
)
3437

3538
func fetchPostWithAPI(id string, maxRetries int) (tweet *APIResponse, err error) {
@@ -38,7 +41,7 @@ func fetchPostWithAPI(id string, maxRetries int) (tweet *APIResponse, err error)
3841
return nil, nil
3942
}
4043

41-
func setHeaders(req *http.Request, setAccessToken, setGuestToken bool) {
44+
func setHeaders(req *http.Request, tokens *Tokens, setAccessToken, setGuestToken bool) {
4245
req.Header.Set("User-Agent", "TwitterAndroid/9.95.0-release.0 (29950000-r-0) ONEPLUS+A3010/9 (OnePlus;ONEPLUS+A3010;OnePlus;OnePlus3;0;;1;2016)")
4346
req.Header.Set("Content-Type", "application/json")
4447
req.Header.Set("Accept", "application/json")
@@ -49,20 +52,23 @@ func setHeaders(req *http.Request, setAccessToken, setGuestToken bool) {
4952
req.Header.Set("System-User-Agent", "Dalvik/2.1.0 (Linux; U; Android 9; ONEPLUS A3010 Build/PKQ1.181203.001)")
5053
req.Header.Set("X-Twitter-Active-User", "yes")
5154
if setGuestToken {
52-
req.Header.Set("X-Guest-Token", guestToken)
55+
req.Header.Set("X-Guest-Token", tokens.GuestToken)
5356
}
5457
if setAccessToken {
55-
req.Header.Set("Authorization", "Bearer "+accessToken)
58+
req.Header.Set("Authorization", "Bearer "+tokens.AccessToken)
5659
}
5760
}
5861

59-
func GetOauthToken() (err error) {
60-
if flowToken == "" {
61-
if err := getFlowToken(); err != nil {
62-
return err
63-
}
62+
// GenerateOauthToken generates a new Twitter OAuth guest token
63+
// which can be used in calling Official APIs.
64+
func GenerateOauthToken() (tokens *Tokens, err error) {
65+
tokens = new(Tokens)
66+
tokens.CreatedAt = util.TimeToTimestampString(time.Now())
67+
68+
if err := tokens.getFlowToken(); err != nil {
69+
return nil, err
6470
}
65-
l.Infof("Access token: %s\nGuest token: %s\nFlow token: %s\n", accessToken, guestToken, flowToken)
71+
l.Infof("Access token: %s\nGuest token: %s\nFlow token: %s\n", tokens.AccessToken, tokens.GuestToken, tokens.FlowToken)
6672

6773
requestBody := fmt.Sprintf(`{
6874
"flow_token": "%s",
@@ -128,22 +134,22 @@ func GetOauthToken() (err error) {
128134
"location_permission_prompt": 2,
129135
"notifications_permission_prompt": 4
130136
}
131-
}`, flowToken)
137+
}`, tokens.FlowToken)
132138

133139
req, err := http.NewRequest("POST", "https://api.twitter.com/1.1/onboarding/task.json", strings.NewReader(requestBody))
134140
if err != nil {
135-
return err
141+
return nil, err
136142
}
137-
setHeaders(req, true, true)
143+
setHeaders(req, tokens, true, true)
138144

139145
resp, err := new(http.Client).Do(req)
140146
if err != nil {
141-
return err
147+
return nil, err
142148
}
143149

144150
body, err := io.ReadAll(resp.Body)
145151
if err != nil {
146-
return err
152+
return nil, err
147153
}
148154
l.Infof("Response: \n%s\n", body)
149155
type ResponseSubtask struct {
@@ -163,36 +169,37 @@ func GetOauthToken() (err error) {
163169
// Should be "success"
164170
Status string `json:"status"`
165171
// A new flow token, usually ends with ":3"
166-
FlowToken string `json:"flow_token"`
172+
FlowToken string `json:"flow_token"`
167173
Subtasks []ResponseSubtask `json:"subtasks"`
168174
}
169175
response := new(Response)
170176
err = json.Unmarshal(body, response)
171177
if err != nil {
172-
return err
178+
return nil, err
173179
}
174180
if response.Errors != nil {
175-
return xerrors.Errorf("error when getting oauth token: %+v", *response.Errors)
181+
return nil, xerrors.Errorf("error when getting oauth token: %+v", *response.Errors)
176182
}
177183
if response.Status != "success" {
178-
return xerrors.Errorf("wrong API status: %s", response.Status)
184+
return nil, xerrors.Errorf("wrong API status: %s", response.Status)
179185
}
180186

181187
st, found := lo.Find(response.Subtasks, func(subtask ResponseSubtask) bool {
182188
return (subtask.OpenAccount != nil)
183189
})
184190
if !found {
185-
return xerrors.Errorf("oauth token not found in response")
191+
return nil, xerrors.Errorf("oauth token not found in response")
186192
}
187-
flowToken = response.FlowToken
193+
// Update new FlowToken
194+
tokens.FlowToken = response.FlowToken
188195
l.Infof("OAUTH TOKEN REGISTERED: %s --- %s", st.OpenAccount.OauthToken, st.OpenAccount.OauthTokenSecret)
189196

190-
return nil
197+
return tokens, nil
191198
}
192199

193-
func getFlowToken() (err error) {
194-
if guestToken == "" {
195-
if err := getGuestToken(); err != nil {
200+
func (tokens *Tokens) getFlowToken() (err error) {
201+
if tokens.GuestToken == "" {
202+
if err := tokens.getGuestToken(); err != nil {
196203
return err
197204
}
198205
}
@@ -269,7 +276,7 @@ func getFlowToken() (err error) {
269276
if err != nil {
270277
return err
271278
}
272-
setHeaders(req, true, true)
279+
setHeaders(req, tokens, true, true)
273280

274281
resp, err := new(http.Client).Do(req)
275282
if err != nil {
@@ -293,24 +300,24 @@ func getFlowToken() (err error) {
293300
return xerrors.Errorf("empty FlowToken")
294301
}
295302

296-
flowToken = response.FlowToken
303+
tokens.FlowToken = response.FlowToken
297304
return nil
298305
}
299306

300-
func getGuestToken() (err error) {
301-
if guestToken != "" {
307+
func (tokens *Tokens) getGuestToken() (err error) {
308+
if tokens.GuestToken != "" {
302309
return nil
303310
}
304-
if accessToken == "" {
305-
if err = getAccessToken(); err != nil {
311+
if tokens.AccessToken == "" {
312+
if err = tokens.getAccessToken(); err != nil {
306313
return err
307314
}
308315
}
309316
req, err := http.NewRequest("POST", "https://api.twitter.com/1.1/guest/activate.json", nil)
310317
if err != nil {
311318
return err
312319
}
313-
setHeaders(req, true, false)
320+
setHeaders(req, tokens, true, false)
314321
type Response struct {
315322
GuestToken string `json:"guest_token"`
316323
}
@@ -333,13 +340,13 @@ func getGuestToken() (err error) {
333340
if response.GuestToken == "" {
334341
return xerrors.Errorf("Wrong guest token: %s", response.GuestToken)
335342
}
336-
guestToken = response.GuestToken
343+
tokens.GuestToken = response.GuestToken
337344

338345
return nil
339346
}
340347

341-
func getAccessToken() (err error) {
342-
if accessToken != "" {
348+
func (tokens *Tokens) getAccessToken() (err error) {
349+
if tokens.AccessToken != "" {
343350
return nil
344351
}
345352

@@ -351,8 +358,8 @@ func getAccessToken() (err error) {
351358
if err != nil {
352359
return err
353360
}
354-
setHeaders(req, false, false)
355-
req.SetBasicAuth(BASIC_AUTH_USERNAME, BASIC_AUTH_PASSWORD)
361+
setHeaders(req, tokens, false, false)
362+
req.SetBasicAuth(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET)
356363
resp, err := new(http.Client).Do(req)
357364
if err != nil {
358365
return err
@@ -373,6 +380,13 @@ func getAccessToken() (err error) {
373380
return xerrors.Errorf("Wrong bearer token: %s %s", response.TokenType, response.AccessToken)
374381
}
375382

376-
accessToken = response.AccessToken
383+
tokens.AccessToken = response.AccessToken
377384
return nil
378385
}
386+
387+
func (tokens *Tokens) IsExpired() bool {
388+
const EXPIRED_AT = "24h"
389+
expiredAt, _ := time.ParseDuration(EXPIRED_AT)
390+
createdAt, _ := util.TimestampStringToTime(tokens.CreatedAt)
391+
return createdAt.Add(expiredAt).Before(time.Now())
392+
}

‎validator/twitter/api_test.go

+18-13
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,38 @@ import (
88

99
func Test_getAccessToken(t *testing.T) {
1010
t.Run("success", func(t *testing.T) {
11-
require.NoError(t, getAccessToken())
12-
require.NotEmpty(t, accessToken)
13-
t.Logf("Access token: %s", accessToken)
11+
tokens := new(Tokens)
12+
require.NoError(t, tokens.getAccessToken())
13+
require.NotEmpty(t, tokens.AccessToken)
14+
t.Logf("Access token: %s", tokens.AccessToken)
1415
})
1516
}
1617

1718
func Test_getGuestToken(t *testing.T) {
1819
t.Run("success", func(t *testing.T) {
19-
require.NoError(t, getGuestToken())
20-
require.NotEmpty(t, guestToken)
21-
t.Logf("Guest token: %s", guestToken)
20+
tokens := new(Tokens)
21+
require.NoError(t, tokens.getGuestToken())
22+
require.NotEmpty(t, tokens.GuestToken)
23+
t.Logf("Guest token: %s", tokens.GuestToken)
2224
})
2325
}
2426

2527
func Test_FlowToken(t *testing.T) {
2628
t.Run("success", func(t *testing.T) {
27-
require.NoError(t, getFlowToken())
28-
require.NotEmpty(t, flowToken)
29-
t.Logf("Flow token: %s", flowToken)
29+
tokens := new(Tokens)
30+
require.NoError(t, tokens.getFlowToken())
31+
require.NotEmpty(t, tokens.FlowToken)
32+
t.Logf("Flow token: %s", tokens.FlowToken)
3033
})
3134
}
3235

3336
func Test_OauthToken(t *testing.T) {
3437
t.Run("success", func(t *testing.T) {
35-
require.NoError(t, GetOauthToken())
36-
t.Logf("Access token: %s", accessToken)
37-
t.Logf("Guest token: %s", guestToken)
38-
t.Logf("Flow token: %s", flowToken)
38+
tokens, err := GenerateOauthToken()
39+
require.NoError(t, err)
40+
require.False(t, tokens.IsExpired())
41+
t.Logf("Access token: %s", tokens.AccessToken)
42+
t.Logf("Guest token: %s", tokens.GuestToken)
43+
t.Logf("Flow token: %s", tokens.FlowToken)
3944
})
4045
}

0 commit comments

Comments
 (0)
Please sign in to comment.