From f23c7a1a1f9ddbf3154534e49e7f900f08cbb7db Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 16:05:16 -0700 Subject: [PATCH 01/15] New database/table, new trial for test --- dbos-config.yaml | 2 +- ..._tables.js => 20240429155106_create_account_tables.js} | 8 ++++---- scripts/staging_test.py | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) rename migrations/{20240408151906_create_subscription_tables.js => 20240429155106_create_account_tables.js} (68%) diff --git a/dbos-config.yaml b/dbos-config.yaml index 308b571..08ddc4d 100644 --- a/dbos-config.yaml +++ b/dbos-config.yaml @@ -8,7 +8,7 @@ database: port: 5432 username: 'postgres' password: ${PGPASSWORD} - app_db_name: 'cloudsub' + app_db_name: 'cloud_account' connectionTimeoutMillis: 3000 app_db_client: 'knex' migrate: diff --git a/migrations/20240408151906_create_subscription_tables.js b/migrations/20240429155106_create_account_tables.js similarity index 68% rename from migrations/20240408151906_create_subscription_tables.js rename to migrations/20240429155106_create_account_tables.js index 212c4b4..336dcf7 100644 --- a/migrations/20240408151906_create_subscription_tables.js +++ b/migrations/20240429155106_create_account_tables.js @@ -1,10 +1,10 @@ const { Knex } = require("knex"); exports.up = async function(knex) { - await knex.schema.createTable('subscriptions', table => { - table.text('auth0_user_id').primary(); + await knex.schema.createTable('accounts', table => { + table.text('auth0_subject_id').primary(); + table.text('email').notNullable(); table.text('stripe_customer_id').notNullable(); - table.text('dbos_plan').notNullable().defaultTo('free'); table.bigInteger('created_at') .notNullable() .defaultTo(knex.raw('(EXTRACT(EPOCH FROM now())*1000)::bigint')); @@ -15,5 +15,5 @@ exports.up = async function(knex) { }; exports.down = async function(knex) { - return knex.schema.dropTable('subscriptions'); + return knex.schema.dropTable('accounts'); }; diff --git a/scripts/staging_test.py b/scripts/staging_test.py index 363d7c0..a367dff 100644 --- a/scripts/staging_test.py +++ b/scripts/staging_test.py @@ -28,10 +28,13 @@ def test_endpoints(path: str): raise Exception("No Stripe customer found for test email") customer_id = customers.data[0].id - # Create a subscription that uses the default test payment + # Create a subscription that sets a trial that ends in 1 day. subscription = stripe.Subscription.create( customer=customer_id, items=[{"price": config.stripe_pro_price}], + trial_from_plan=True, + trial_period_days=1, + trial_settings={"end_behavior": "cancel"}, ) time.sleep(30) # Wait for subscription to take effect From b09d5eac1d293a04ad2f09e0d6d12de30864f9e5 Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 17:32:01 -0700 Subject: [PATCH 02/15] New schema, refactor --- dbos-config.yaml | 2 +- .../20240429155106_create_account_tables.js | 1 + src/operations.ts | 47 ++--- src/utils.ts | 187 ++++++++++-------- 4 files changed, 135 insertions(+), 102 deletions(-) diff --git a/dbos-config.yaml b/dbos-config.yaml index 08ddc4d..caad9c7 100644 --- a/dbos-config.yaml +++ b/dbos-config.yaml @@ -17,11 +17,11 @@ database: - npx knex migrate:rollback application: STRIPE_WEBHOOK_SECRET: ${STRIPE_WEBHOOK_SECRET} - STRIPE_DBOS_PRO_PRICE: ${STRIPE_DBOS_PRO_PRICE} env: DBOS_DOMAIN: ${DBOS_DOMAIN} STRIPE_SECRET_KEY: ${STRIPE_SECRET_KEY} DBOS_DEPLOY_REFRESH_TOKEN: ${DBOS_DEPLOY_REFRESH_TOKEN} + STRIPE_DBOS_PRO_PRICE: ${STRIPE_DBOS_PRO_PRICE} http: cors_middleware: true credentials: true diff --git a/migrations/20240429155106_create_account_tables.js b/migrations/20240429155106_create_account_tables.js index 336dcf7..665678c 100644 --- a/migrations/20240429155106_create_account_tables.js +++ b/migrations/20240429155106_create_account_tables.js @@ -11,6 +11,7 @@ exports.up = async function(knex) { table.bigInteger('updated_at') .notNullable() .defaultTo(knex.raw('(EXTRACT(EPOCH FROM now())*1000)::bigint')); + table.index('stripe_customer_id'); }); }; diff --git a/src/operations.ts b/src/operations.ts index e7dac9d..9988b29 100644 --- a/src/operations.ts +++ b/src/operations.ts @@ -18,37 +18,32 @@ const dbosJWT = jwt({ issuer: `https://${DBOSLoginDomain}/`, audience: 'dbos-cloud-api' }); +let lastTokenFetch = 0; // These endpoints can only be called with an authenticated user on DBOS cloud @Authentication(Utils.userAuthMiddleware) @KoaMiddleware(dbosJWT) export class CloudSubscription { @RequiredRole(['user']) - @PostApi('/create-customer-portal') - static async createCustomerPortal(ctxt: HandlerContext) { - const authUser = ctxt.authenticatedUser; - const sessionURL = await ctxt.invoke(Utils).createPortal(authUser); + @PostApi('/subscribe') + static async subscribePlan(ctxt: HandlerContext, @ArgSource(ArgSources.BODY) plan: string) { + if (plan !== DBOSProPlanString) { throw new DBOSResponseError("Invalid DBOS Plan", 400); } + const auth0UserID = ctxt.authenticatedUser; + const userEmail = ctxt.koaContext.state.user["https://dbos.dev/email"] as string; + const sessionURL = await ctxt.invoke(Utils).createSubscription(auth0UserID, userEmail).then(x => x.getResult()); if (!sessionURL) { - ctxt.logger.error("Failed to create a customer portal!"); - throw new DBOSResponseError("Failed to create customer portal!", 500); + throw new DBOSResponseError("Failed to create a checkout session!"); } return { url: sessionURL }; } - // This function redirects user to a subscription page @RequiredRole(['user']) - @PostApi('/subscribe') - static async subscribePlan(ctxt: HandlerContext, @ArgSource(ArgSources.BODY) plan: string) { - // Validate argument - if (plan !== DBOSProPlanString) { - ctxt.logger.error(`Invalid DBOS plan: ${plan}`); - throw new DBOSResponseError("Invalid DBOS Plan", 400); - } - - const authUser = ctxt.authenticatedUser; - const sessionURL = await ctxt.invoke(Utils).createCheckout(authUser); + @PostApi('/create-customer-portal') + static async createCustomerPortal(ctxt: HandlerContext) { + const auth0User = ctxt.authenticatedUser; + const sessionURL = await ctxt.invoke(Utils).createStripeCustomerPortal(auth0User).then(x => x.getResult()); if (!sessionURL) { - throw new Error("Failed to create a checkout session!"); + throw new DBOSResponseError("Failed to create customer portal!", 500); } return { url: sessionURL }; } @@ -74,10 +69,18 @@ export class StripeWebhook { throw new DBOSResponseError("Webhook Error", 400); } - // Fetch auth0 credential every 6 hours. - const ts = Date.now(); - const uuidStr = 'authtoken-' + (ts - (ts % 21600000)).toString(); - await ctxt.invoke(Utils, uuidStr).retrieveCloudCredential(); + // Fetch auth0 credential every 12 hours. + // TODO: use cron job. + try { + const ts = Date.now(); + if ((ts - lastTokenFetch) > 43200000) { + const uuidStr = 'authtoken-' + (ts - (ts % 43200000)).toString(); + await ctxt.invoke(Utils, uuidStr).retrieveCloudCredential(); + lastTokenFetch = ts; + } + } catch (err) { + ctxt.logger.error(err); + } // Handle the event. // Use event ID as the idempotency key for the workflow, making sure once-and-only-once execution. diff --git a/src/utils.ts b/src/utils.ts index c2c8ff8..43d9be7 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -11,87 +11,136 @@ export const DBOSLoginDomain = DBOS_DOMAIN === "cloud.dbos.dev" ? "login.dbos.de const DBOS_DEPLOY_REFRESH_TOKEN = process.env.DBOS_DEPLOY_REFRESH_TOKEN; // eslint-disable-next-line no-secrets/no-secrets const DBOSAuth0ClientID = DBOS_DOMAIN === 'cloud.dbos.dev' ? '6p7Sjxf13cyLMkdwn14MxlH7JdhILled' : 'G38fLmVErczEo9ioCFjVIHea6yd0qMZu'; +const DBOSProStripePrice = process.env.STRIPE_DBOS_PRO_PRICE ?? ""; let dbosAuth0Token: string; export class Utils { // eslint-disable-next-line @typescript-eslint/require-await static async userAuthMiddleware(ctxt: MiddlewareContext) { - ctxt.logger.debug("Request: " + JSON.stringify(ctxt.koaContext.request)); if (ctxt.requiredRole.length > 0) { if (!ctxt.koaContext.state.user) { throw new DBOSResponseError("No authenticated DBOS User!", 401); } - ctxt.logger.debug(ctxt.koaContext.state.user); const authenticatedUser = ctxt.koaContext.state.user["sub"] as string; if (!authenticatedUser) { throw new DBOSResponseError("No valid DBOS user found!", 401); } + const userEmail = ctxt.koaContext.state.user["https://dbos.dev/email"] as string; + if (!userEmail) { + throw new DBOSResponseError("No email found for the authenticated user", 400); + } return { authenticatedRoles: ['user'], authenticatedUser: authenticatedUser }; } } - @Communicator() - static async retrieveStripeCustomer(ctxt: CommunicatorContext, authUser: string): Promise { - // Look up customer from stripe - const customers = await stripe.customers.search({ - query: `metadata["auth0_user_id"]:"${authUser}"`, + @Communicator({intervalSeconds: 10, maxAttempts: 2}) + static async createStripeBillingPortal(_ctxt: CommunicatorContext, customerID: string): Promise { + const session = await stripe.billingPortal.sessions.create({ + customer: customerID, + return_url: 'https://dbos.dev' }); + return session.url; + } - let customerID = ""; - if (customers.data.length === 1) { - customerID = customers.data[0].id; - } else { - ctxt.logger.error(`Failed to look up customer from stripe: ${customers.data.length}`); - throw new DBOSResponseError("Failed to look up customer from stripe", 400); + @Workflow() + static async createStripeCustomerPortal(ctxt: WorkflowContext, auth0UserID: string): Promise { + const stripeCustomerID = await ctxt.invoke(Utils).findStripeCustomerID(auth0UserID); + if (!stripeCustomerID) { + ctxt.logger.error(`Cannot find stripe customer for user ${auth0UserID}`); + return null; } - return customerID; + const sessionURL = await ctxt.invoke(Utils).createStripeBillingPortal(stripeCustomerID); + return sessionURL; } - @Communicator() - static async createPortal(ctxt: CommunicatorContext, authUser: string): Promise { - const customerID = await Utils.retrieveStripeCustomer(ctxt, authUser); // Directly invoke another communicator - const session = await stripe.billingPortal.sessions.create({ - customer: customerID, - return_url: 'https://dbos.dev' + @Communicator({intervalSeconds: 10}) + static async createStripeCheckout(_ctxt: CommunicatorContext, stripeCustomerID: string): Promise { + const session = await stripe.checkout.sessions.create({ + customer: stripeCustomerID, + billing_address_collection: 'auto', + line_items: [ + { + price: DBOSProStripePrice, + quantity: 1, + }, + ], + mode: 'subscription', + success_url: `https://docs.dbos.dev`, + cancel_url: `https://www.dbos.dev/pricing`, }); return session.url; } - @Communicator() - static async createCheckout(ctxt: CommunicatorContext, authUser: string): Promise { - const customerID = await Utils.retrieveStripeCustomer(ctxt, authUser); // Directly invoke another communicator - const prices = await stripe.prices.retrieve(ctxt.getConfig("STRIPE_DBOS_PRO_PRICE") as string); - const session = await stripe.checkout.sessions.create({ - customer: customerID, - billing_address_collection: 'auto', - line_items: [ - { - price: prices.id, - quantity: 1, - }, - ], - mode: 'subscription', - success_url: `https://docs.dbos.dev`, - cancel_url: `https://www.dbos.dev/pricing`, - }); - return session.url; + @Transaction({readOnly: true}) + static async findStripeCustomerID(ctxt: TransactionContext, auth0UserID: string): Promise { + const client = ctxt.client; + const res = await client("accounts") + .select("stripe_customer_id") + .where("auth0_subject_id", auth0UserID).first(); + if (!res) { + return undefined; + } + return res.stripe_customer_id; } - // Find the Auth0 user info from stripe customer - @Communicator() - static async findAuth0User(ctxt: CommunicatorContext, customerID: string): Promise { - const customer = await stripe.customers.retrieve(customerID) as Stripe.Customer; - const dbosAuthID = customer.metadata["auth0_user_id"]; - if (!dbosAuthID) { - ctxt.logger.error(`Cannot find DBOS Auth ID from ${customerID}`); - throw new Error(`Cannot find DBOS Auth ID for customer ${customerID}`); + @Transaction({readOnly: true}) + static async findAuth0UserID(ctxt: TransactionContext, stripeCustomerID: string): Promise { + const client = ctxt.client; + const res = await client("accounts") + .select("auth0_subject_id") + .where("stripe_customer_id", stripeCustomerID).first(); + if (!res) { + return undefined; + } + return res.auth0_subject_id; + } + + @Transaction() + static async recordStripeCustomer(ctxt: TransactionContext, auth0UserID: string, stripeCustomerID: string, email: string): Promise { + const client = ctxt.client; + const res = await client("accounts") + .insert<{ rowCount: number }>({ + auth0_subject_id: auth0UserID, + stripe_customer_id: stripeCustomerID, + email: email, + }).onConflict("auth0_subject_id").ignore(); + if (res.rowCount !== 1) { + throw new Error(`Failed to record stripe customer ${stripeCustomerID} for user ${auth0UserID}`); } - return dbosAuthID; + } + + @Communicator({intervalSeconds: 10, maxAttempts: 2}) + static async createStripeCustomer(ctxt: CommunicatorContext, auth0UserID: string, userEmail: string): Promise { + const customer = await stripe.customers.create({ + email: userEmail, + description: "Automatically generated by DBOS", + metadata: { + auth0_user_id: auth0UserID, + }, + }); + ctxt.logger.info(`Created stripe customer ${customer.id} for user ${auth0UserID}`); + return customer.id; + } + + @Workflow() + static async createSubscription(ctxt: WorkflowContext, auth0UserID: string, userEmail: string): Promise { + // First, look up the customer from the accounts table + let stripeCustomerID = await ctxt.invoke(Utils).findStripeCustomerID(auth0UserID); + + // If customer is not found, create a new customer in stripe, and record in our database + if (!stripeCustomerID) { + stripeCustomerID = await ctxt.invoke(Utils).createStripeCustomer(auth0UserID, userEmail); + await ctxt.invoke(Utils).recordStripeCustomer(auth0UserID, stripeCustomerID, userEmail); + } + + // Finally, create a Stripe subscription. + const res = await ctxt.invoke(Utils).createStripeCheckout(stripeCustomerID); + return res; } @Communicator() - static async retrieveSubscription(ctxt: CommunicatorContext, subscriptionID: string): Promise { + static async retrieveSubscription(_ctxt: CommunicatorContext, subscriptionID: string): Promise { const subscription = await stripe.subscriptions.retrieve(subscriptionID); return { id: subscriptionID, @@ -101,22 +150,6 @@ export class Utils { }; } - @Transaction() - static async updateDBRecord(ctxt: TransactionContext, dbosAuthID: string, stripeCustomerID: string, plan: string): Promise{ - const check = `SELECT dbos_plan FROM subscriptions WHERE auth0_user_id = ?`; - const { rows } = await ctxt.client.raw(check, [dbosAuthID]) as { rows: dbos_subscriptions[]}; - if (rows.length > 0) { - const currentPlan = rows[0].dbos_plan; - if (currentPlan === plan) { - ctxt.logger.info(`User ${dbosAuthID} already has plan ${plan}`); - return false; - } - } - // Use knex to upsert into subscriptions table. - const query = `INSERT INTO subscriptions (auth0_user_id, stripe_customer_id, dbos_plan, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT (auth0_user_id) DO UPDATE SET dbos_plan = EXCLUDED.dbos_plan, updated_at = EXCLUDED.updated_at`; - await ctxt.client.raw(query, [dbosAuthID, stripeCustomerID, plan, Date.now()]); - return true; - } @Communicator({intervalSeconds: 10}) static async retrieveCloudCredential(ctxt: CommunicatorContext): Promise { @@ -179,24 +212,20 @@ export class Utils { @Workflow() static async subscriptionWorkflow(ctxt: WorkflowContext, subscriptionID: string, customerID: string) { // Check subscription from stripe and only active the account if plan is active. - const proPrice = ctxt.getConfig("STRIPE_DBOS_PRO_PRICE") as string; - const dbosAuthID = await ctxt.invoke(Utils).findAuth0User(customerID); + const dbosAuthID = await ctxt.invoke(Utils).findAuth0UserID(customerID); + if (!dbosAuthID) { + throw new Error(`Cannot find auth0 user for stripe customer ${customerID}`); + } const subscription = await ctxt.invoke(Utils).retrieveSubscription(subscriptionID); if (subscription.status === 'incomplete' || subscription.status === 'incomplete_expired') { ctxt.logger.info(`Subscription ${subscriptionID} is incomplete. Do nothing.`); return; - } else if (subscription.status === 'active' && subscription.price === proPrice) { + } else if (subscription.status === 'active' && subscription.price === DBOSProStripePrice) { ctxt.logger.info(`Subscription ${subscriptionID} is active for user ${dbosAuthID}`); - const needUpdate = await ctxt.invoke(Utils).updateDBRecord(dbosAuthID, customerID, 'pro'); - if (needUpdate) { - await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'pro'); - } - } else if (subscription.status === 'canceled' && subscription.price === proPrice) { + await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'pro'); + } else if (subscription.status === 'canceled' && subscription.price === DBOSProStripePrice) { ctxt.logger.info(`Subscription ${subscriptionID} is canceled for user ${dbosAuthID}`); - const needUpdate = await ctxt.invoke(Utils).updateDBRecord(dbosAuthID, customerID, 'free'); - if (needUpdate) { - await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'free'); - } + await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'free'); } else { ctxt.logger.warn(`Unknown subscription status: ${subscription.status}; or price: ${subscription.price}; user ${dbosAuthID}`); } @@ -210,10 +239,10 @@ interface RefreshTokenAuthResponse { token_type: string; } -interface dbos_subscriptions { - auth0_user_id: string; +interface Accounts { + auth0_subject_id: string; + email: string; stripe_customer_id: string; - dbos_plan: string; created_at: number; updated_at: number; } From 33a9ed82d2a9559a94d612270d81c2e4c787de14 Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 17:48:21 -0700 Subject: [PATCH 03/15] Add tests for transactions --- src/operations.test.ts | 11 ++++ src/operations.ts | 8 +-- src/utils.ts | 135 ++++++++++++++++++++++------------------- 3 files changed, 88 insertions(+), 66 deletions(-) diff --git a/src/operations.test.ts b/src/operations.test.ts index 7772d74..65c67ee 100644 --- a/src/operations.test.ts +++ b/src/operations.test.ts @@ -4,15 +4,26 @@ import request from "supertest"; describe("cors-tests", () => { let testRuntime: TestingRuntime; + const auth0TestID = "testauth0123"; + const stripeTestID = "teststripe123"; + const testEmail = "testemail@dbos.dev"; beforeAll(async () => { testRuntime = await createTestingRuntime([CloudSubscription, Utils]); + await testRuntime.queryUserDB(`DELETE FROM accounts WHERE auth0_subject_id='${auth0TestID}';`); }); afterAll(async () => { await testRuntime.destroy(); }); + test("account-management", async () => { + // Check our transactions are correct + await expect(testRuntime.invoke(Utils).recordStripeCustomer(auth0TestID, stripeTestID, testEmail)).resolves.toBeFalsy(); // No error + await expect(testRuntime.invoke(Utils).findStripeCustomerID(auth0TestID)).resolves.toBe(stripeTestID); + await expect(testRuntime.invoke(Utils).findAuth0UserID(stripeTestID)).resolves.toBe(auth0TestID); + }); + test("subscribe-cors", async () => { const req = { plan: "dbospro", diff --git a/src/operations.ts b/src/operations.ts index 9988b29..72ab996 100644 --- a/src/operations.ts +++ b/src/operations.ts @@ -8,7 +8,7 @@ import { DBOSLoginDomain, stripe, Utils } from './utils'; export { Utils } from './utils'; const DBOSProPlanString = "dbospro"; -const dbosJWT = jwt({ +const auth0JwtVerifier = jwt({ secret: koaJwtSecret({ jwksUri: `https://${DBOSLoginDomain}/.well-known/jwks.json`, cache: true, @@ -22,7 +22,7 @@ let lastTokenFetch = 0; // These endpoints can only be called with an authenticated user on DBOS cloud @Authentication(Utils.userAuthMiddleware) -@KoaMiddleware(dbosJWT) +@KoaMiddleware(auth0JwtVerifier) export class CloudSubscription { @RequiredRole(['user']) @PostApi('/subscribe') @@ -90,13 +90,13 @@ export class StripeWebhook { case 'customer.subscription.deleted': case 'customer.subscription.updated': { const subscription = event.data.object as Stripe.Subscription; - await ctxt.invoke(Utils, event.id).subscriptionWorkflow(subscription.id, subscription.customer as string); + await ctxt.invoke(Utils, event.id).stripeWebhookWorkflow(subscription.id, subscription.customer as string); break; } case 'checkout.session.completed': { const checkout = event.data.object as Stripe.Checkout.Session; if (checkout.mode === 'subscription') { - await ctxt.invoke(Utils, event.id).subscriptionWorkflow(checkout.subscription as string, checkout.customer as string); + await ctxt.invoke(Utils, event.id).stripeWebhookWorkflow(checkout.subscription as string, checkout.customer as string); } break; } diff --git a/src/utils.ts b/src/utils.ts index 43d9be7..5289afd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -34,13 +34,42 @@ export class Utils { } } - @Communicator({intervalSeconds: 10, maxAttempts: 2}) - static async createStripeBillingPortal(_ctxt: CommunicatorContext, customerID: string): Promise { - const session = await stripe.billingPortal.sessions.create({ - customer: customerID, - return_url: 'https://dbos.dev' - }); - return session.url; + @Workflow() + static async stripeWebhookWorkflow(ctxt: WorkflowContext, subscriptionID: string, customerID: string) { + // Check subscription from stripe and only active the account if plan is active. + const dbosAuthID = await ctxt.invoke(Utils).findAuth0UserID(customerID); + if (!dbosAuthID) { + throw new Error(`Cannot find auth0 user for stripe customer ${customerID}`); + } + const subscription = await ctxt.invoke(Utils).retrieveSubscription(subscriptionID); + if (subscription.status === 'incomplete' || subscription.status === 'incomplete_expired') { + ctxt.logger.info(`Subscription ${subscriptionID} is incomplete. Do nothing.`); + return; + } else if (subscription.status === 'active' && subscription.price === DBOSProStripePrice) { + ctxt.logger.info(`Subscription ${subscriptionID} is active for user ${dbosAuthID}`); + await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'pro'); + } else if (subscription.status === 'canceled' && subscription.price === DBOSProStripePrice) { + ctxt.logger.info(`Subscription ${subscriptionID} is canceled for user ${dbosAuthID}`); + await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'free'); + } else { + ctxt.logger.warn(`Unknown subscription status: ${subscription.status}; or price: ${subscription.price}; user ${dbosAuthID}`); + } + } + + @Workflow() + static async createSubscription(ctxt: WorkflowContext, auth0UserID: string, userEmail: string): Promise { + // First, look up the customer from the accounts table + let stripeCustomerID = await ctxt.invoke(Utils).findStripeCustomerID(auth0UserID); + + // If customer is not found, create a new customer in stripe, and record in our database + if (!stripeCustomerID) { + stripeCustomerID = await ctxt.invoke(Utils).createStripeCustomer(auth0UserID, userEmail); + await ctxt.invoke(Utils).recordStripeCustomer(auth0UserID, stripeCustomerID, userEmail); + } + + // Finally, create a Stripe subscription. + const res = await ctxt.invoke(Utils).createStripeCheckout(stripeCustomerID); + return res; } @Workflow() @@ -54,23 +83,9 @@ export class Utils { return sessionURL; } - @Communicator({intervalSeconds: 10}) - static async createStripeCheckout(_ctxt: CommunicatorContext, stripeCustomerID: string): Promise { - const session = await stripe.checkout.sessions.create({ - customer: stripeCustomerID, - billing_address_collection: 'auto', - line_items: [ - { - price: DBOSProStripePrice, - quantity: 1, - }, - ], - mode: 'subscription', - success_url: `https://docs.dbos.dev`, - cancel_url: `https://www.dbos.dev/pricing`, - }); - return session.url; - } + /** + * Transactions managing user accounts + */ @Transaction({readOnly: true}) static async findStripeCustomerID(ctxt: TransactionContext, auth0UserID: string): Promise { @@ -110,6 +125,37 @@ export class Utils { } } + /** + * Communicators interacting with Stripe + */ + + @Communicator({intervalSeconds: 10, maxAttempts: 2}) + static async createStripeBillingPortal(_ctxt: CommunicatorContext, customerID: string): Promise { + const session = await stripe.billingPortal.sessions.create({ + customer: customerID, + return_url: 'https://dbos.dev' + }); + return session.url; + } + + @Communicator({intervalSeconds: 10}) + static async createStripeCheckout(_ctxt: CommunicatorContext, stripeCustomerID: string): Promise { + const session = await stripe.checkout.sessions.create({ + customer: stripeCustomerID, + billing_address_collection: 'auto', + line_items: [ + { + price: DBOSProStripePrice, + quantity: 1, + }, + ], + mode: 'subscription', + success_url: `https://docs.dbos.dev`, + cancel_url: `https://www.dbos.dev/pricing`, + }); + return session.url; + } + @Communicator({intervalSeconds: 10, maxAttempts: 2}) static async createStripeCustomer(ctxt: CommunicatorContext, auth0UserID: string, userEmail: string): Promise { const customer = await stripe.customers.create({ @@ -123,22 +169,6 @@ export class Utils { return customer.id; } - @Workflow() - static async createSubscription(ctxt: WorkflowContext, auth0UserID: string, userEmail: string): Promise { - // First, look up the customer from the accounts table - let stripeCustomerID = await ctxt.invoke(Utils).findStripeCustomerID(auth0UserID); - - // If customer is not found, create a new customer in stripe, and record in our database - if (!stripeCustomerID) { - stripeCustomerID = await ctxt.invoke(Utils).createStripeCustomer(auth0UserID, userEmail); - await ctxt.invoke(Utils).recordStripeCustomer(auth0UserID, stripeCustomerID, userEmail); - } - - // Finally, create a Stripe subscription. - const res = await ctxt.invoke(Utils).createStripeCheckout(stripeCustomerID); - return res; - } - @Communicator() static async retrieveSubscription(_ctxt: CommunicatorContext, subscriptionID: string): Promise { const subscription = await stripe.subscriptions.retrieve(subscriptionID); @@ -150,6 +180,9 @@ export class Utils { }; } + /** + * Communicators interacting with DBOS Cloud + */ @Communicator({intervalSeconds: 10}) static async retrieveCloudCredential(ctxt: CommunicatorContext): Promise { @@ -208,28 +241,6 @@ export class Utils { throw err; } } - - @Workflow() - static async subscriptionWorkflow(ctxt: WorkflowContext, subscriptionID: string, customerID: string) { - // Check subscription from stripe and only active the account if plan is active. - const dbosAuthID = await ctxt.invoke(Utils).findAuth0UserID(customerID); - if (!dbosAuthID) { - throw new Error(`Cannot find auth0 user for stripe customer ${customerID}`); - } - const subscription = await ctxt.invoke(Utils).retrieveSubscription(subscriptionID); - if (subscription.status === 'incomplete' || subscription.status === 'incomplete_expired') { - ctxt.logger.info(`Subscription ${subscriptionID} is incomplete. Do nothing.`); - return; - } else if (subscription.status === 'active' && subscription.price === DBOSProStripePrice) { - ctxt.logger.info(`Subscription ${subscriptionID} is active for user ${dbosAuthID}`); - await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'pro'); - } else if (subscription.status === 'canceled' && subscription.price === DBOSProStripePrice) { - ctxt.logger.info(`Subscription ${subscriptionID} is canceled for user ${dbosAuthID}`); - await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'free'); - } else { - ctxt.logger.warn(`Unknown subscription status: ${subscription.status}; or price: ${subscription.price}; user ${dbosAuthID}`); - } - } } interface RefreshTokenAuthResponse { From 50d14bc41bdb1778756b21b461218bae2ae44f4e Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 17:57:50 -0700 Subject: [PATCH 04/15] fix --- scripts/staging_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/staging_test.py b/scripts/staging_test.py index a367dff..8d03cd8 100644 --- a/scripts/staging_test.py +++ b/scripts/staging_test.py @@ -32,7 +32,6 @@ def test_endpoints(path: str): subscription = stripe.Subscription.create( customer=customer_id, items=[{"price": config.stripe_pro_price}], - trial_from_plan=True, trial_period_days=1, trial_settings={"end_behavior": "cancel"}, ) From e6292b4e4cbcd83e463765fb75c559f312fc097e Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 18:04:25 -0700 Subject: [PATCH 05/15] fix --- scripts/staging_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/staging_test.py b/scripts/staging_test.py index 8d03cd8..8b76555 100644 --- a/scripts/staging_test.py +++ b/scripts/staging_test.py @@ -33,7 +33,7 @@ def test_endpoints(path: str): customer=customer_id, items=[{"price": config.stripe_pro_price}], trial_period_days=1, - trial_settings={"end_behavior": "cancel"}, + trial_settings={"end_behavior": {"missing_payment_method": "cancel"}}, ) time.sleep(30) # Wait for subscription to take effect From 5fff44262a4c34d001bc357604ca0850e5d3d41e Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 18:33:31 -0700 Subject: [PATCH 06/15] test all endpoints --- scripts/staging_test.py | 27 ++++++++++++++++++++++++++- scripts/utils.py | 7 +++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/scripts/staging_test.py b/scripts/staging_test.py index 8b76555..6964fcb 100644 --- a/scripts/staging_test.py +++ b/scripts/staging_test.py @@ -2,7 +2,9 @@ import json import os import time -from utils import (login, run_subprocess) + +import requests +from utils import (login, run_subprocess, get_credentials) from config import config import stripe @@ -22,6 +24,29 @@ def test_endpoints(path: str): if json_data['SubscriptionPlan'] != "free": raise Exception("Free tier check failed") + # Test the subscribe endpoint + credentials = get_credentials(path) + token = credentials['token'] + url = f"https://subscribe-dbos.{config.dbos_domain}/subscribe" + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + data = { + 'plan': 'dbospro' + } + res = requests.post(url, headers=headers, data=json.dumps(data)) + assert res.status_code == 200, f"Cloud subscribe endpoint failed: {res.status_code} - {res.text}" + + # Test customer portal endpoint + url = f"https://subscribe-dbos.{config.dbos_domain}/create-customer-portal" + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + res = requests.post(url, headers=headers) + assert res.status_code == 200, f"Cloud create-customer-portal endpoint failed: {res.status_code} - {res.text}" + # Look up customer ID customers = stripe.Customer.list(email=config.test_email, limit=1) if len(customers) == 0: diff --git a/scripts/utils.py b/scripts/utils.py index 875e9c7..ddbcd33 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -23,3 +23,10 @@ def login(path: str, is_deploy: bool = False): # Automated login using the refresh token refresh_token = config.deploy_refresh_token if is_deploy else config.test_refresh_token run_subprocess(['npx', 'dbos-cloud', 'login', '--with-refresh-token', refresh_token], path, check=True) + +def get_credentials(path: str): + credentials_path = os.path.join(path, '.dbos', 'credentials') + if not os.path.exists(credentials_path): + raise Exception(f'Could not find credentials file {credentials_path}') + with open(credentials_path, 'r') as f: + return json.load(f) \ No newline at end of file From d1345b7ee66c2167040981bccf90d820e50797ad Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 19:55:27 -0700 Subject: [PATCH 07/15] Handle more status --- src/utils.ts | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 5289afd..ef10aaf 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -36,23 +36,31 @@ export class Utils { @Workflow() static async stripeWebhookWorkflow(ctxt: WorkflowContext, subscriptionID: string, customerID: string) { - // Check subscription from stripe and only active the account if plan is active. + // Retrieve the updated subscription from Stripe + const subscription = await ctxt.invoke(Utils).retrieveSubscription(subscriptionID); + if (subscription.price !== DBOSProStripePrice) { + throw new Error(`Unknown price: ${subscription.price}; customer ${customerID}; subscription ${subscriptionID}`); + } + + // Map the Stripe customer ID to DBOS Cloud user ID const dbosAuthID = await ctxt.invoke(Utils).findAuth0UserID(customerID); if (!dbosAuthID) { throw new Error(`Cannot find auth0 user for stripe customer ${customerID}`); } - const subscription = await ctxt.invoke(Utils).retrieveSubscription(subscriptionID); - if (subscription.status === 'incomplete' || subscription.status === 'incomplete_expired') { - ctxt.logger.info(`Subscription ${subscriptionID} is incomplete. Do nothing.`); - return; - } else if (subscription.status === 'active' && subscription.price === DBOSProStripePrice) { - ctxt.logger.info(`Subscription ${subscriptionID} is active for user ${dbosAuthID}`); - await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'pro'); - } else if (subscription.status === 'canceled' && subscription.price === DBOSProStripePrice) { - ctxt.logger.info(`Subscription ${subscriptionID} is canceled for user ${dbosAuthID}`); - await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'free'); - } else { - ctxt.logger.warn(`Unknown subscription status: ${subscription.status}; or price: ${subscription.price}; user ${dbosAuthID}`); + + // Send a request to the DBOS Cloud admin API to change the user's subscription status. + switch (subscription.status) { + case 'active': + case 'trialing': + await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'pro'); + break; + case 'canceled': + case 'unpaid': + case 'paused': + await ctxt.invoke(Utils).updateCloudEntitlement(dbosAuthID, 'free'); + break; + default: + ctxt.logger.info(`Do nothing for ${subscription.status} status.`); } } @@ -170,8 +178,9 @@ export class Utils { } @Communicator() - static async retrieveSubscription(_ctxt: CommunicatorContext, subscriptionID: string): Promise { + static async retrieveSubscription(ctxt: CommunicatorContext, subscriptionID: string): Promise { const subscription = await stripe.subscriptions.retrieve(subscriptionID); + ctxt.logger.info(`Subscription ${subscriptionID} is ${subscription.status} for customer ${subscription.customer as string}`); return { id: subscriptionID, customer: subscription.customer as string, @@ -235,9 +244,9 @@ export class Utils { }; try { const response = await axios.request(entitlementRequest); - ctxt.logger.info(`Update entitlement response: ${response.status}`); + ctxt.logger.info(`Update entitlement for ${dbosAuth0Token} to plan ${plan}, response: ${response.status}`); } catch (err) { - ctxt.logger.error(`Failed to update entitlement: ${(err as Error).message}`); + ctxt.logger.error(`Failed to update ${dbosAuth0Token} to ${plan}: ${(err as Error).message}`); throw err; } } From 6f127056e461cc812473ee779372889d62ba424e Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 20:14:20 -0700 Subject: [PATCH 08/15] clean up --- scripts/auth0_post_login.js | 69 ------------------------------------- src/operations.ts | 3 +- src/utils.ts | 15 ++++---- 3 files changed, 9 insertions(+), 78 deletions(-) delete mode 100644 scripts/auth0_post_login.js diff --git a/scripts/auth0_post_login.js b/scripts/auth0_post_login.js deleted file mode 100644 index 553fc64..0000000 --- a/scripts/auth0_post_login.js +++ /dev/null @@ -1,69 +0,0 @@ -/** -* Handler that will be called during the execution of an Auth0 PostLogin flow. -* -* @param {Event} event - Details about the user and the context in which they are logging in. -* @param {PostLoginAPI} api - Interface whose methods can be used to change the behavior of the login. -*/ -exports.onExecutePostLogin = async (event, api) => { - try { - /** - * Check for data integrity, if stripe_customer_id already exists, then just return - */ - if (event.user.app_metadata.stripe_customer_id) { - console.log(`app_metadata for new user already has stripe_customer_id property.`); - return; - } - - if (event.user.email === 'dbos-cloud-subscription@dbos.dev') { - console.log(`skip stripe account creation for dbos-cloud-subscription user`); - return; - } - - /** - * Initialize Stripe library - */ - const stripe = require("stripe")(event.secrets.STRIPE_SECRET_KEY); - - - /** - * Check if existing Stripe Customer - */ - const stripeCustomerMatchesByEmail = await stripe.customers.list({ - email: event.user.email, - }); - - if (stripeCustomerMatchesByEmail.data.length > 0) { - // This means someone has created an account in stripe, bypassing DBOS - const error = `Stripe Customer with email ${event.user.email} already exists.`; - console.error(error); - - api.access.deny( - "We could not create your payment account. Do you already have an account with the same email?\n" + - "Please contact support for assistance." - ); - return; - } - - /** - * Create Stripe Customer - */ - const newStripeCustomer = await stripe.customers.create({ - email: event.user.email, - description: "Automatically generated by an Auth0 Action", - metadata: { auth0_user_id: event.user.user_id }, - }); - - /** - * Add Stripe Customer ID to app_metadata - */ - api.user.setAppMetadata("stripe_customer_id", newStripeCustomer.id); - - } catch (error) { - console.error(error.message); - - api.access.deny( - "We could not login to your account.\n" + - "Please try again or contact support for assistance." - ); - } -}; \ No newline at end of file diff --git a/src/operations.ts b/src/operations.ts index 72ab996..5111b6e 100644 --- a/src/operations.ts +++ b/src/operations.ts @@ -74,8 +74,7 @@ export class StripeWebhook { try { const ts = Date.now(); if ((ts - lastTokenFetch) > 43200000) { - const uuidStr = 'authtoken-' + (ts - (ts % 43200000)).toString(); - await ctxt.invoke(Utils, uuidStr).retrieveCloudCredential(); + await ctxt.invoke(Utils).retrieveCloudCredential(); lastTokenFetch = ts; } } catch (err) { diff --git a/src/utils.ts b/src/utils.ts index ef10aaf..aad7ef7 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -203,11 +203,11 @@ export class Utils { method: 'POST', url: `https://${DBOSLoginDomain}/oauth/token`, headers: { 'content-type': 'application/x-www-form-urlencoded' }, - data: new URLSearchParams({ + data: { grant_type: "refresh_token", client_id: DBOSAuth0ClientID, refresh_token: refreshToken - }), + }, }; try { const response = await axios.request(loginRequest); @@ -221,15 +221,16 @@ export class Utils { } } - @Communicator({intervalSeconds: 10}) + // This will retry the request every 10 seconds, up to 20 times, with a backoff rate of 1.2 + @Communicator({intervalSeconds: 10, maxAttempts: 20, backoffRate: 1.2}) static async updateCloudEntitlement(ctxt: CommunicatorContext, dbosAuthID: string, plan: string) { - // Obtain cloud credentials - let token = dbosAuth0Token; + const token = dbosAuth0Token; if (!token) { - token = await Utils.retrieveCloudCredential(ctxt); + ctxt.logger.error("No access token found, aborting update entitlement"); + return; } - // Update the entitlement in DBOS Cloud + // Update the user's subscription status in DBOS Cloud const entitlementRequest = { method: 'POST', url: `https://${DBOS_DOMAIN}/admin/v1alpha1/users/update-sub`, From 6daeeda7dc3a0d93adec25fbb8d33372009945a7 Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 20:17:56 -0700 Subject: [PATCH 09/15] clean up logging --- src/utils.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index aad7ef7..a149909 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -245,9 +245,9 @@ export class Utils { }; try { const response = await axios.request(entitlementRequest); - ctxt.logger.info(`Update entitlement for ${dbosAuth0Token} to plan ${plan}, response: ${response.status}`); + ctxt.logger.info(`Update entitlement for ${dbosAuthID} to plan ${plan}, response: ${response.status}`); } catch (err) { - ctxt.logger.error(`Failed to update ${dbosAuth0Token} to ${plan}: ${(err as Error).message}`); + ctxt.logger.error(`Failed to update ${dbosAuthID} to ${plan}: ${(err as Error).message}`); throw err; } } From 29bf0c13644a100b6b76ef9a8bb1084c606c278a Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 20:23:06 -0700 Subject: [PATCH 10/15] all env --- dbos-config.yaml | 3 +-- src/operations.ts | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbos-config.yaml b/dbos-config.yaml index caad9c7..94caa65 100644 --- a/dbos-config.yaml +++ b/dbos-config.yaml @@ -15,13 +15,12 @@ database: - npx knex migrate:latest rollback: - npx knex migrate:rollback -application: - STRIPE_WEBHOOK_SECRET: ${STRIPE_WEBHOOK_SECRET} env: DBOS_DOMAIN: ${DBOS_DOMAIN} STRIPE_SECRET_KEY: ${STRIPE_SECRET_KEY} DBOS_DEPLOY_REFRESH_TOKEN: ${DBOS_DEPLOY_REFRESH_TOKEN} STRIPE_DBOS_PRO_PRICE: ${STRIPE_DBOS_PRO_PRICE} + STRIPE_WEBHOOK_SECRET: ${STRIPE_WEBHOOK_SECRET} http: cors_middleware: true credentials: true diff --git a/src/operations.ts b/src/operations.ts index 5111b6e..552410e 100644 --- a/src/operations.ts +++ b/src/operations.ts @@ -8,6 +8,7 @@ import { DBOSLoginDomain, stripe, Utils } from './utils'; export { Utils } from './utils'; const DBOSProPlanString = "dbospro"; +const StripeWebhookSecret = process.env.STRIPE_WEBHOOK_SECRET || ""; const auth0JwtVerifier = jwt({ secret: koaJwtSecret({ jwksUri: `https://${DBOSLoginDomain}/.well-known/jwks.json`, @@ -63,10 +64,10 @@ export class StripeWebhook { const payload: string = req.rawBody; let event: Stripe.Event; try { - event = stripe.webhooks.constructEvent(payload, sigHeader, ctxt.getConfig("STRIPE_WEBHOOK_SECRET") as string); + event = stripe.webhooks.constructEvent(payload, sigHeader, StripeWebhookSecret); } catch (err) { ctxt.logger.error(err); - throw new DBOSResponseError("Webhook Error", 400); + throw new DBOSResponseError("Unable to verify event from Stripe", 400); } // Fetch auth0 credential every 12 hours. From 62bc5d2876f76825bb892618b73ce6201f7f29ea Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 21:15:30 -0700 Subject: [PATCH 11/15] Cleaner handler --- src/operations.ts | 31 +++++-------------------------- src/utils.ts | 31 +++++++++++++++++++++---------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/src/operations.ts b/src/operations.ts index 552410e..8224181 100644 --- a/src/operations.ts +++ b/src/operations.ts @@ -4,11 +4,10 @@ import { HandlerContext, ArgSource, ArgSources, PostApi, DBOSResponseError, Requ import Stripe from 'stripe'; import jwt from "koa-jwt"; import { koaJwtSecret } from "jwks-rsa"; -import { DBOSLoginDomain, stripe, Utils } from './utils'; +import { DBOSLoginDomain, Utils } from './utils'; export { Utils } from './utils'; const DBOSProPlanString = "dbospro"; -const StripeWebhookSecret = process.env.STRIPE_WEBHOOK_SECRET || ""; const auth0JwtVerifier = jwt({ secret: koaJwtSecret({ jwksUri: `https://${DBOSLoginDomain}/.well-known/jwks.json`, @@ -19,7 +18,6 @@ const auth0JwtVerifier = jwt({ issuer: `https://${DBOSLoginDomain}/`, audience: 'dbos-cloud-api' }); -let lastTokenFetch = 0; // These endpoints can only be called with an authenticated user on DBOS cloud @Authentication(Utils.userAuthMiddleware) @@ -54,37 +52,18 @@ export class CloudSubscription { export class StripeWebhook { @PostApi('/stripe_webhook') static async stripeWebhook(ctxt: HandlerContext) { - // Make sure the request is actually from Stripe. + // Verify the request is actually from Stripe. const req = ctxt.koaContext.request; - const sigHeader = req.headers['stripe-signature']; - if (typeof sigHeader !== 'string') { - throw new DBOSResponseError("Invalid stripe request", 400); - } - - const payload: string = req.rawBody; let event: Stripe.Event; try { - event = stripe.webhooks.constructEvent(payload, sigHeader, StripeWebhookSecret); + event = Utils.verifyStripeEvent(req.rawBody as string, req.headers['stripe-signature']); } catch (err) { ctxt.logger.error(err); throw new DBOSResponseError("Unable to verify event from Stripe", 400); } - // Fetch auth0 credential every 12 hours. - // TODO: use cron job. - try { - const ts = Date.now(); - if ((ts - lastTokenFetch) > 43200000) { - await ctxt.invoke(Utils).retrieveCloudCredential(); - lastTokenFetch = ts; - } - } catch (err) { - ctxt.logger.error(err); - } - - // Handle the event. - // Use event ID as the idempotency key for the workflow, making sure once-and-only-once execution. - // Invoke the workflow but don't wait for it to finish. Fast response to Stripe. + // Invoke the workflow asynchronously and quickly response to Stripe. + // Use event.id as the workflow idempotency key to guarantee exactly once processing. switch (event.type) { case 'customer.subscription.created': case 'customer.subscription.deleted': diff --git a/src/utils.ts b/src/utils.ts index a149909..97bbc5c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -12,8 +12,7 @@ const DBOS_DEPLOY_REFRESH_TOKEN = process.env.DBOS_DEPLOY_REFRESH_TOKEN; // eslint-disable-next-line no-secrets/no-secrets const DBOSAuth0ClientID = DBOS_DOMAIN === 'cloud.dbos.dev' ? '6p7Sjxf13cyLMkdwn14MxlH7JdhILled' : 'G38fLmVErczEo9ioCFjVIHea6yd0qMZu'; const DBOSProStripePrice = process.env.STRIPE_DBOS_PRO_PRICE ?? ""; - -let dbosAuth0Token: string; +const StripeWebhookSecret = process.env.STRIPE_WEBHOOK_SECRET ?? ""; export class Utils { // eslint-disable-next-line @typescript-eslint/require-await @@ -192,7 +191,8 @@ export class Utils { /** * Communicators interacting with DBOS Cloud */ - + static dbosAuth0Token: string|undefined; + static lastTokenFetch = 0; @Communicator({intervalSeconds: 10}) static async retrieveCloudCredential(ctxt: CommunicatorContext): Promise { const refreshToken = DBOS_DEPLOY_REFRESH_TOKEN; @@ -212,22 +212,33 @@ export class Utils { try { const response = await axios.request(loginRequest); const tokenResponse = response.data as RefreshTokenAuthResponse; - dbosAuth0Token = tokenResponse.access_token; - return dbosAuth0Token; + Utils.dbosAuth0Token = tokenResponse.access_token; + return tokenResponse.access_token; } catch (err) { ctxt.logger.error(`Failed to get access token: ${(err as Error).message}`); - dbosAuth0Token = ""; + Utils.dbosAuth0Token = undefined; throw err; } } + static verifyStripeEvent(payload: string, sigHeader: unknown) { + if (typeof sigHeader !== 'string') { + throw new Error("Invalid stripe request, no stripe-signature header"); + } + const event = stripe.webhooks.constructEvent(payload, sigHeader, StripeWebhookSecret); + return event; + } + // This will retry the request every 10 seconds, up to 20 times, with a backoff rate of 1.2 @Communicator({intervalSeconds: 10, maxAttempts: 20, backoffRate: 1.2}) static async updateCloudEntitlement(ctxt: CommunicatorContext, dbosAuthID: string, plan: string) { - const token = dbosAuth0Token; - if (!token) { - ctxt.logger.error("No access token found, aborting update entitlement"); - return; + let token = Utils.dbosAuth0Token; + const ts = Date.now(); + // Fetch auth0 credential every 12 hours. + // TODO: use cron job. + if (!token || (ts - Utils.lastTokenFetch) > 43200000) { + token = await Utils.retrieveCloudCredential(ctxt); + Utils.lastTokenFetch = ts; } // Update the user's subscription status in DBOS Cloud From 5d1f93255feb72913dc6390ad603154e55decccd Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 21:35:17 -0700 Subject: [PATCH 12/15] Cleaner deploy script --- scripts/dbos_deploy.py | 8 ++++---- scripts/utils.py | 9 ++++++++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/scripts/dbos_deploy.py b/scripts/dbos_deploy.py index ff62351..21838f0 100644 --- a/scripts/dbos_deploy.py +++ b/scripts/dbos_deploy.py @@ -1,6 +1,6 @@ # This script is used to automatically deploy this subscription app to DBOS Cloud import os -from utils import (login, run_subprocess) +from utils import (login, run_subprocess, generate_password) from config import config script_dir = os.path.dirname(os.path.abspath(__file__)) @@ -9,9 +9,9 @@ def deploy(path: str): output = run_subprocess(['npx', 'dbos-cloud', 'database', 'status', config.db_name], path, check=False) if "error" in output: - raise Exception(f"Database {config.db_name} errored!") - - # run_subprocess(['npx', 'dbos-cloud', 'applications', 'register', '--database', DB_NAME], path, check=False) + # Provision a database + run_subprocess(['npx', 'dbos-cloud', 'db', 'provision', config.db_name, '-U', config.deploy_username, '-W', generate_password()], path) + run_subprocess(['npx', 'dbos-cloud', 'applications', 'register', '--database', config.db_name], path, check=False) run_subprocess(['npx', 'dbos-cloud', 'applications', 'deploy'], path) if __name__ == "__main__": diff --git a/scripts/utils.py b/scripts/utils.py index ddbcd33..3f39002 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -1,5 +1,7 @@ import json import os +import random +import string import subprocess import requests @@ -29,4 +31,9 @@ def get_credentials(path: str): if not os.path.exists(credentials_path): raise Exception(f'Could not find credentials file {credentials_path}') with open(credentials_path, 'r') as f: - return json.load(f) \ No newline at end of file + return json.load(f) + +def generate_password(): + characters = string.ascii_letters + string.digits + random_string = ''.join(random.choice(characters) for _ in range(16)) + return random_string \ No newline at end of file From 7df6dfff888bfffa4b5177ff296bb8c5fecfb9e5 Mon Sep 17 00:00:00 2001 From: Qian Li Date: Mon, 29 Apr 2024 21:36:56 -0700 Subject: [PATCH 13/15] Cleaner deploy script --- scripts/dbos_deploy.py | 6 +++--- scripts/utils.py | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/scripts/dbos_deploy.py b/scripts/dbos_deploy.py index 21838f0..00edc87 100644 --- a/scripts/dbos_deploy.py +++ b/scripts/dbos_deploy.py @@ -7,12 +7,12 @@ app_dir = os.path.join(script_dir, "..") def deploy(path: str): - output = run_subprocess(['npx', 'dbos-cloud', 'database', 'status', config.db_name], path, check=False) + output = run_subprocess(['npx', 'dbos-cloud', 'db', 'status', config.db_name], path, check=False) if "error" in output: # Provision a database run_subprocess(['npx', 'dbos-cloud', 'db', 'provision', config.db_name, '-U', config.deploy_username, '-W', generate_password()], path) - run_subprocess(['npx', 'dbos-cloud', 'applications', 'register', '--database', config.db_name], path, check=False) - run_subprocess(['npx', 'dbos-cloud', 'applications', 'deploy'], path) + run_subprocess(['npx', 'dbos-cloud', 'app', 'register', '--database', config.db_name], path, check=False) + run_subprocess(['npx', 'dbos-cloud', 'app', 'deploy'], path) if __name__ == "__main__": login(app_dir, is_deploy=True) diff --git a/scripts/utils.py b/scripts/utils.py index 3f39002..1b6f8da 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -4,8 +4,6 @@ import string import subprocess -import requests - from config import config def run_subprocess(command, path: str, check: bool = True, silent: bool = False): From ca1cd7eeb313b6bfc7e99cfdda895c922f0b7923 Mon Sep 17 00:00:00 2001 From: Qian Li Date: Tue, 30 Apr 2024 16:04:17 -0700 Subject: [PATCH 14/15] Clean up events --- src/operations.ts | 8 +------- src/utils.ts | 9 +++++++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/operations.ts b/src/operations.ts index 8224181..bce2962 100644 --- a/src/operations.ts +++ b/src/operations.ts @@ -54,13 +54,7 @@ export class StripeWebhook { static async stripeWebhook(ctxt: HandlerContext) { // Verify the request is actually from Stripe. const req = ctxt.koaContext.request; - let event: Stripe.Event; - try { - event = Utils.verifyStripeEvent(req.rawBody as string, req.headers['stripe-signature']); - } catch (err) { - ctxt.logger.error(err); - throw new DBOSResponseError("Unable to verify event from Stripe", 400); - } + const event = Utils.verifyStripeEvent(req.rawBody as string, req.headers['stripe-signature']); // Invoke the workflow asynchronously and quickly response to Stripe. // Use event.id as the workflow idempotency key to guarantee exactly once processing. diff --git a/src/utils.ts b/src/utils.ts index 97bbc5c..ee20fb9 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -223,9 +223,14 @@ export class Utils { static verifyStripeEvent(payload: string, sigHeader: unknown) { if (typeof sigHeader !== 'string') { - throw new Error("Invalid stripe request, no stripe-signature header"); + throw new DBOSResponseError("Invalid stripe request, no stripe-signature header", 400); + } + let event: Stripe.Event; + try { + event = stripe.webhooks.constructEvent(payload, sigHeader, StripeWebhookSecret); + } catch (err) { + throw new DBOSResponseError("Unable to verify event from Stripe", 400); } - const event = stripe.webhooks.constructEvent(payload, sigHeader, StripeWebhookSecret); return event; } From 7d707a9bd186ca36ef51ef6f2ced01d055e31eb8 Mon Sep 17 00:00:00 2001 From: Qian Li Date: Tue, 30 Apr 2024 16:08:21 -0700 Subject: [PATCH 15/15] clean up test --- .github/workflows/test-staging.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/test-staging.yml b/.github/workflows/test-staging.yml index 1d7fcf9..ad232b2 100644 --- a/.github/workflows/test-staging.yml +++ b/.github/workflows/test-staging.yml @@ -4,8 +4,6 @@ on: schedule: # Runs every six hours - cron: '0 */6 * * *' - push: - branches: [ "main" ] workflow_dispatch: jobs: