From 54d21540859ed1e092f4a8f986bfa1b6044253f9 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 31 Jan 2024 02:16:35 -0800 Subject: [PATCH] feat: Iterate across multiple entity types with dynamic table ref during sync --- packages/sdk/openapi.json | 12 ++- packages/sdk/openapi.types.d.ts | 5 +- packages/worker/postgres/index.ts | 4 +- packages/worker/postgres/schema-factory.ts | 4 +- packages/worker/routines.ts | 76 ++++++++++++------- .../vertical-sales-engagement/commonModels.ts | 2 +- .../providers/outreach-provider.ts | 6 +- verticals/vertical-sales-engagement/router.ts | 25 ++++-- 8 files changed, 87 insertions(+), 47 deletions(-) diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 88b0f30..a6ae5c8 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -90,8 +90,11 @@ "schema": { "type": "object", "properties": { - "hasNextPage": { - "type": "boolean" + "nextPageCursor": { + "type": [ + "string", + "null" + ] }, "items": { "type": "array", @@ -101,7 +104,6 @@ } }, "required": [ - "hasNextPage", "items" ] } @@ -702,6 +704,10 @@ }, "last_name": { "type": "string" + }, + "raw_data": { + "type": "object", + "additionalProperties": {} } }, "required": [ diff --git a/packages/sdk/openapi.types.d.ts b/packages/sdk/openapi.types.d.ts index 5f8cc65..05ff8ec 100644 --- a/packages/sdk/openapi.types.d.ts +++ b/packages/sdk/openapi.types.d.ts @@ -70,6 +70,9 @@ export interface components { id: string first_name: string last_name: string + raw_data?: { + [key: string]: unknown + } } /** * Error @@ -201,7 +204,7 @@ export interface operations { 200: { content: { 'application/json': { - hasNextPage: boolean + nextPageCursor?: string | null items: components['schemas']['sales-engagement.contact'][] } } diff --git a/packages/worker/postgres/index.ts b/packages/worker/postgres/index.ts index 6f3ecad..35dbcd6 100644 --- a/packages/worker/postgres/index.ts +++ b/packages/worker/postgres/index.ts @@ -4,4 +4,6 @@ import {env} from '../env' import * as schema from './schema' export const pgClient = postgres(env.POSTGRES_URL) -export const db = drizzle(pgClient, {schema}) +export const db = drizzle(pgClient, {schema, logger: !!process.env['DEBUG']}) + +export {migrate} from 'drizzle-orm/postgres-js/migrator' diff --git a/packages/worker/postgres/schema-factory.ts b/packages/worker/postgres/schema-factory.ts index 9aa0103..5a412bd 100644 --- a/packages/worker/postgres/schema-factory.ts +++ b/packages/worker/postgres/schema-factory.ts @@ -17,11 +17,11 @@ export function getCommonObjectTable(tableName: TName) { supaglueApplicationId: text('_supaglue_application_id').notNull(), supaglueProviderName: text('_supaglue_provider_name').notNull(), supaglueCustomerId: text('_supaglue_customer_id').notNull(), - id: text('id').notNull(), supaglueEmittedAt: timestamp('_supaglue_emitted_at', { precision: 3, mode: 'string', }).notNull(), + id: text('id').notNull(), createdAt: timestamp('created_at', {precision: 3, mode: 'string'}), updatedAt: timestamp('updated_at', {precision: 3, mode: 'string'}), isDeleted: boolean('is_deleted').default(false).notNull(), @@ -57,11 +57,11 @@ export function getProviderObjectTable( supaglueApplicationId: text('_supaglue_application_id').notNull(), supaglueProviderName: text('_supaglue_provider_name').notNull(), supaglueCustomerId: text('_supaglue_customer_id').notNull(), - id: text('id').notNull(), supaglueEmittedAt: timestamp('_supaglue_emitted_at', { precision: 3, mode: 'string', }).notNull(), + id: text('id').notNull(), supaglueLastModifiedAt: timestamp('_supaglue_last_modified_at', { precision: 3, mode: 'string', diff --git a/packages/worker/routines.ts b/packages/worker/routines.ts index 3d87598..c940ccf 100644 --- a/packages/worker/routines.ts +++ b/packages/worker/routines.ts @@ -4,7 +4,8 @@ import type {SendEventPayload} from 'inngest/helpers/types' import {nango} from './env' import type {Events} from './events' import {db} from './postgres' -import {engagementSequences, syncLog} from './postgres/schema' +import {syncLog} from './postgres/schema' +import {getCommonObjectTable} from './postgres/schema-factory' import {dbUpsert} from './postgres/upsert' /** @@ -62,35 +63,49 @@ export async function syncConnection({ }, }) - // TODO: Need to figure out if stepFunction behaves as expected... - let cursor = null as null | string | undefined - do { - cursor = await step.run(`sync-page-${cursor}`, async () => { - const res = await supaglue.GET('/engagement/v2/sequences', { - params: {query: {cursor}}, + // Load this from a config please... + const vertical = 'engagement' as const + const entitiesToSync = ['contacts', 'sequences'] as const + + for (const entity of entitiesToSync) { + let cursor = null as null | string | undefined + do { + cursor = await step.run(`${entity}-sync-page-${cursor}`, async () => { + const res = await supaglue.GET(`/${vertical}/v2/${entity}`, { + params: {query: {cursor}}, + }) + console.log( + `Syncing ${vertical} ${entity} count=`, + res.data.items.length, + ) + + const table = getCommonObjectTable(`${vertical}_${entity}`) + + // TODO: Do migration for our table... + // migrate(db, {migrationsTable: []}) + + await dbUpsert( + db, + table, + res.data.items.map(({raw_data, ...item}) => ({ + supaglueApplicationId: '$YOUR_APPLICATION_ID', + supaglueCustomerId: connectionId, // '$YOUR_CUSTOMER_ID', + supaglueProviderName: providerConfigKey, + id: item.id, + lastModifiedAt: new Date().toISOString(), + supaglueEmittedAt: new Date().toISOString(), + isDeleted: false, + // Workaround jsonb support issue... https://github.com/drizzle-team/drizzle-orm/issues/724 + rawData: sql`${raw_data ?? ''}::jsonb`, + supaglueUnifiedData: sql`${item}::jsonb`, + })), + {shallowMergeJsonbColumns: ['supaglueUnifiedData']}, + ) + return res.data.nextPageCursor }) - console.log('Syncing sequences count=', res.data.items.length) - await dbUpsert( - db, - engagementSequences, - res.data.items.map(({raw_data, ...item}) => ({ - supaglueApplicationId: '$YOUR_APPLICATION_ID', - supaglueCustomerId: connectionId, // '$YOUR_CUSTOMER_ID', - supaglueProviderName: providerConfigKey, - id: item.id, - lastModifiedAt: new Date().toISOString(), - supaglueEmittedAt: new Date().toISOString(), - isDeleted: false, - // Workaround jsonb support issue... https://github.com/drizzle-team/drizzle-orm/issues/724 - rawData: sql`${raw_data}::jsonb`, - supaglueUnifiedData: sql`${item}::jsonb`, - })), - {shallowMergeJsonbColumns: ['supaglueUnifiedData']}, - ) - return res.data.nextPageCursor - }) - break - } while (cursor) + break + } while (cursor) + } console.log('[syncConnection] Complete', { connectionId, @@ -98,3 +113,6 @@ export async function syncConnection({ eventId: event.id, }) } + +// Later... +// Need to figure out if stepFunction behaves as expected... diff --git a/verticals/vertical-sales-engagement/commonModels.ts b/verticals/vertical-sales-engagement/commonModels.ts index f7b41eb..af5d096 100644 --- a/verticals/vertical-sales-engagement/commonModels.ts +++ b/verticals/vertical-sales-engagement/commonModels.ts @@ -49,7 +49,7 @@ export const contact = z // updated_at: z.string(), // is_deleted: z.boolean(), // last_modified_at: z.string(), - // raw_data: z.object({}).catchall(z.any()).optional(), + raw_data: z.record(z.unknown()).optional(), }) .openapi({ref: 'sales-engagement.contact'}) diff --git a/verticals/vertical-sales-engagement/providers/outreach-provider.ts b/verticals/vertical-sales-engagement/providers/outreach-provider.ts index 07e00be..01b5c07 100644 --- a/verticals/vertical-sales-engagement/providers/outreach-provider.ts +++ b/verticals/vertical-sales-engagement/providers/outreach-provider.ts @@ -54,7 +54,9 @@ export const outreachProvider = { links: (defaultLinks) => [...proxyLinks, ...defaultLinks], }), listContacts: async ({instance}) => { - const res = await instance.GET('/prospects', {params: {query: {}}}) + const res = await instance.GET('/prospects', { + params: {query: {'page[size]': 1}}, + }) return {hasNextPage: true, items: res.data.data?.map(mappers.contact) ?? []} }, listSequences: async ({instance, input}) => { @@ -66,7 +68,7 @@ export const outreachProvider = { '', ) as '/sequences') : '/sequences', - // {params: {query: {'page[size]': 1}}}, + {params: {query: {'page[size]': 1}}}, ) return { nextPageCursor: listResponse.parse(res.data).links?.next, diff --git a/verticals/vertical-sales-engagement/router.ts b/verticals/vertical-sales-engagement/router.ts index 4fc0ac9..2983129 100644 --- a/verticals/vertical-sales-engagement/router.ts +++ b/verticals/vertical-sales-engagement/router.ts @@ -20,7 +20,7 @@ export const salesEngagementRouter = trpc.router({ .input(zPaginationParams.nullish()) .output( z.object({ - hasNextPage: z.boolean(), + nextPageCursor: z.string().nullish(), items: z.array(commonModels.contact), }), ) @@ -73,15 +73,24 @@ export const salesEngagementRouter = trpc.router({ .input( z.object({ record: z.object({ - contact_id: z.string().openapi({example: '9f3e97fd-4d5d-4efc-959d-bbebfac079f5'}), - mailbox_id: z.string().openapi({example: 'ae4be028-9078-4850-a0bf-d2112b7c4d11'}), - sequence_id: z.string().openapi({example: 'b854e510-1c40-4ef6-ade4-8eb35f49d331'}), - user_id: z.string().nullish().openapi({example: 'c590dc63-8e43-48a4-8154-1fbb00ac936b'}) - }) - }) + contact_id: z + .string() + .openapi({example: '9f3e97fd-4d5d-4efc-959d-bbebfac079f5'}), + mailbox_id: z + .string() + .openapi({example: 'ae4be028-9078-4850-a0bf-d2112b7c4d11'}), + sequence_id: z + .string() + .openapi({example: 'b854e510-1c40-4ef6-ade4-8eb35f49d331'}), + user_id: z + .string() + .nullish() + .openapi({example: 'c590dc63-8e43-48a4-8154-1fbb00ac936b'}), + }), + }), ) .output(z.object({record: z.object({id: z.string()}).optional()})) - .query(async ({input, ctx}) => proxyCallProvider({input, ctx})) + .query(async ({input, ctx}) => proxyCallProvider({input, ctx})), }) export type SalesEngagementProvider = ProviderFromRouter<