Skip to content
This repository has been archived by the owner on Mar 10, 2024. It is now read-only.

Commit

Permalink
feat: Iterate across multiple entity types with dynamic table ref dur…
Browse files Browse the repository at this point in the history
…ing sync
  • Loading branch information
tonyxiao committed Jan 31, 2024
1 parent 82af8df commit 54d2154
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 47 deletions.
12 changes: 9 additions & 3 deletions packages/sdk/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@
"schema": {
"type": "object",
"properties": {
"hasNextPage": {
"type": "boolean"
"nextPageCursor": {
"type": [
"string",
"null"
]
},
"items": {
"type": "array",
Expand All @@ -101,7 +104,6 @@
}
},
"required": [
"hasNextPage",
"items"
]
}
Expand Down Expand Up @@ -702,6 +704,10 @@
},
"last_name": {
"type": "string"
},
"raw_data": {
"type": "object",
"additionalProperties": {}
}
},
"required": [
Expand Down
5 changes: 4 additions & 1 deletion packages/sdk/openapi.types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export interface components {
id: string
first_name: string
last_name: string
raw_data?: {
[key: string]: unknown
}
}
/**
* Error
Expand Down Expand Up @@ -201,7 +204,7 @@ export interface operations {
200: {
content: {
'application/json': {
hasNextPage: boolean
nextPageCursor?: string | null
items: components['schemas']['sales-engagement.contact'][]

Check warning on line 208 in packages/sdk/openapi.types.d.ts

View workflow job for this annotation

GitHub Actions / Run type checks, lint, and tests

Array type using 'T[]' is forbidden for non-simple types. Use 'Array<T>' instead
}
}
Expand Down
4 changes: 3 additions & 1 deletion packages/worker/postgres/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ import {env} from '../env'
import * as schema from './schema'

export const pgClient = postgres(env.POSTGRES_URL)
export const db = drizzle(pgClient, {schema})
export const db = drizzle(pgClient, {schema, logger: !!process.env['DEBUG']})

export {migrate} from 'drizzle-orm/postgres-js/migrator'
4 changes: 2 additions & 2 deletions packages/worker/postgres/schema-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ export function getCommonObjectTable<TName extends string>(tableName: TName) {
supaglueApplicationId: text('_supaglue_application_id').notNull(),
supaglueProviderName: text('_supaglue_provider_name').notNull(),
supaglueCustomerId: text('_supaglue_customer_id').notNull(),
id: text('id').notNull(),
supaglueEmittedAt: timestamp('_supaglue_emitted_at', {
precision: 3,
mode: 'string',
}).notNull(),
id: text('id').notNull(),
createdAt: timestamp('created_at', {precision: 3, mode: 'string'}),
updatedAt: timestamp('updated_at', {precision: 3, mode: 'string'}),
isDeleted: boolean('is_deleted').default(false).notNull(),
Expand Down Expand Up @@ -57,11 +57,11 @@ export function getProviderObjectTable<TName extends string>(
supaglueApplicationId: text('_supaglue_application_id').notNull(),
supaglueProviderName: text('_supaglue_provider_name').notNull(),
supaglueCustomerId: text('_supaglue_customer_id').notNull(),
id: text('id').notNull(),
supaglueEmittedAt: timestamp('_supaglue_emitted_at', {
precision: 3,
mode: 'string',
}).notNull(),
id: text('id').notNull(),
supaglueLastModifiedAt: timestamp('_supaglue_last_modified_at', {
precision: 3,
mode: 'string',
Expand Down
76 changes: 47 additions & 29 deletions packages/worker/routines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import type {SendEventPayload} from 'inngest/helpers/types'
import {nango} from './env'
import type {Events} from './events'
import {db} from './postgres'
import {engagementSequences, syncLog} from './postgres/schema'
import {syncLog} from './postgres/schema'
import {getCommonObjectTable} from './postgres/schema-factory'
import {dbUpsert} from './postgres/upsert'

/**
Expand Down Expand Up @@ -62,39 +63,56 @@ export async function syncConnection({
},
})

// TODO: Need to figure out if stepFunction behaves as expected...
let cursor = null as null | string | undefined
do {
cursor = await step.run(`sync-page-${cursor}`, async () => {
const res = await supaglue.GET('/engagement/v2/sequences', {
params: {query: {cursor}},
// Load this from a config please...
const vertical = 'engagement' as const
const entitiesToSync = ['contacts', 'sequences'] as const

for (const entity of entitiesToSync) {
let cursor = null as null | string | undefined
do {
cursor = await step.run(`${entity}-sync-page-${cursor}`, async () => {
const res = await supaglue.GET(`/${vertical}/v2/${entity}`, {
params: {query: {cursor}},
})
console.log(
`Syncing ${vertical} ${entity} count=`,
res.data.items.length,
)

const table = getCommonObjectTable(`${vertical}_${entity}`)

// TODO: Do migration for our table...
// migrate(db, {migrationsTable: []})

await dbUpsert(
db,
table,
res.data.items.map(({raw_data, ...item}) => ({
supaglueApplicationId: '$YOUR_APPLICATION_ID',
supaglueCustomerId: connectionId, // '$YOUR_CUSTOMER_ID',
supaglueProviderName: providerConfigKey,
id: item.id,
lastModifiedAt: new Date().toISOString(),
supaglueEmittedAt: new Date().toISOString(),
isDeleted: false,
// Workaround jsonb support issue... https://github.com/drizzle-team/drizzle-orm/issues/724
rawData: sql`${raw_data ?? ''}::jsonb`,
supaglueUnifiedData: sql`${item}::jsonb`,
})),
{shallowMergeJsonbColumns: ['supaglueUnifiedData']},
)
return res.data.nextPageCursor
})
console.log('Syncing sequences count=', res.data.items.length)
await dbUpsert(
db,
engagementSequences,
res.data.items.map(({raw_data, ...item}) => ({
supaglueApplicationId: '$YOUR_APPLICATION_ID',
supaglueCustomerId: connectionId, // '$YOUR_CUSTOMER_ID',
supaglueProviderName: providerConfigKey,
id: item.id,
lastModifiedAt: new Date().toISOString(),
supaglueEmittedAt: new Date().toISOString(),
isDeleted: false,
// Workaround jsonb support issue... https://github.com/drizzle-team/drizzle-orm/issues/724
rawData: sql`${raw_data}::jsonb`,
supaglueUnifiedData: sql`${item}::jsonb`,
})),
{shallowMergeJsonbColumns: ['supaglueUnifiedData']},
)
return res.data.nextPageCursor
})
break
} while (cursor)
break
} while (cursor)
}

console.log('[syncConnection] Complete', {
connectionId,
providerConfigKey,
eventId: event.id,
})
}

// Later...
// Need to figure out if stepFunction behaves as expected...
2 changes: 1 addition & 1 deletion verticals/vertical-sales-engagement/commonModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const contact = z
// updated_at: z.string(),
// is_deleted: z.boolean(),
// last_modified_at: z.string(),
// raw_data: z.object({}).catchall(z.any()).optional(),
raw_data: z.record(z.unknown()).optional(),
})
.openapi({ref: 'sales-engagement.contact'})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ export const outreachProvider = {
links: (defaultLinks) => [...proxyLinks, ...defaultLinks],
}),
listContacts: async ({instance}) => {
const res = await instance.GET('/prospects', {params: {query: {}}})
const res = await instance.GET('/prospects', {
params: {query: {'page[size]': 1}},
})
return {hasNextPage: true, items: res.data.data?.map(mappers.contact) ?? []}
},
listSequences: async ({instance, input}) => {
Expand All @@ -66,7 +68,7 @@ export const outreachProvider = {
'',
) as '/sequences')
: '/sequences',
// {params: {query: {'page[size]': 1}}},
{params: {query: {'page[size]': 1}}},
)
return {
nextPageCursor: listResponse.parse(res.data).links?.next,
Expand Down
25 changes: 17 additions & 8 deletions verticals/vertical-sales-engagement/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const salesEngagementRouter = trpc.router({
.input(zPaginationParams.nullish())
.output(
z.object({
hasNextPage: z.boolean(),
nextPageCursor: z.string().nullish(),
items: z.array(commonModels.contact),
}),
)
Expand Down Expand Up @@ -73,15 +73,24 @@ export const salesEngagementRouter = trpc.router({
.input(
z.object({
record: z.object({
contact_id: z.string().openapi({example: '9f3e97fd-4d5d-4efc-959d-bbebfac079f5'}),
mailbox_id: z.string().openapi({example: 'ae4be028-9078-4850-a0bf-d2112b7c4d11'}),
sequence_id: z.string().openapi({example: 'b854e510-1c40-4ef6-ade4-8eb35f49d331'}),
user_id: z.string().nullish().openapi({example: 'c590dc63-8e43-48a4-8154-1fbb00ac936b'})
})
})
contact_id: z
.string()
.openapi({example: '9f3e97fd-4d5d-4efc-959d-bbebfac079f5'}),
mailbox_id: z
.string()
.openapi({example: 'ae4be028-9078-4850-a0bf-d2112b7c4d11'}),
sequence_id: z
.string()
.openapi({example: 'b854e510-1c40-4ef6-ade4-8eb35f49d331'}),
user_id: z
.string()
.nullish()
.openapi({example: 'c590dc63-8e43-48a4-8154-1fbb00ac936b'}),
}),
}),
)
.output(z.object({record: z.object({id: z.string()}).optional()}))
.query(async ({input, ctx}) => proxyCallProvider({input, ctx}))
.query(async ({input, ctx}) => proxyCallProvider({input, ctx})),
})

export type SalesEngagementProvider<TInstance> = ProviderFromRouter<
Expand Down

0 comments on commit 54d2154

Please sign in to comment.