Skip to content

Commit

Permalink
fix pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
hassan254-prog committed Jan 8, 2024
1 parent 3af2754 commit 756aaaf
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 148 deletions.
44 changes: 7 additions & 37 deletions integration-templates/pipedrive/pipedrive-activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ export default async function fetchData(nango: NangoSync) {
const config = {
...(nango.lastSyncDate ? { params: { since: nango.lastSyncDate?.toISOString() } } : {}),
paginate: {
type: 'cursor',
cursor_path_in_response: 'additional_data.next_cursor',
cursor_name_in_request: 'cursor',
limit_name_in_request: 'limit',
response_path: 'data',
limit: 100
}
};

for await (const activity of paginate(nango, endpoint, config)) {
for await (const activity of nango.paginate({ ...config, endpoint })) {
const mappedActivity: PipeDriveActivity[] = activity.map(mapActivity) || [];
// Save Activitiy
const batchSize: number = mappedActivity.length;
totalRecords += batchSize;
await nango.log(`Saving batch of ${batchSize} activities (total activities: ${totalRecords})`);
Expand All @@ -24,41 +29,6 @@ export default async function fetchData(nango: NangoSync) {
}
}

async function* paginate(nango: NangoSync, endpoint: string, config?: any, queryParams?: Record<string, string | string[]>) {
let cursor: string | undefined;
let callParams = queryParams || {};

while (true) {
if (cursor) {
callParams['cursor'] = `${cursor}`;
}

const resp = await nango.proxy({
method: 'GET',
endpoint: endpoint,
params: {
...(config?.paginate?.limit && { limit: config.paginate.limit }),
...(config?.params?.since && { since: config.params.since }),
...callParams
}
});

const activities = resp.data.data;

if (!activities || activities.length === 0) {
break;
}

yield activities;

if (!resp.data.additional_data || !resp.data.additional_data.next_cursor) {
break;
} else {
cursor = resp.data.additional_data.next_cursor;
}
}
}

function mapActivity(activity: any): PipeDriveActivity {
return {
id: activity.id,
Expand Down
44 changes: 7 additions & 37 deletions integration-templates/pipedrive/pipedrive-deals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ export default async function fetchData(nango: NangoSync) {
const config = {
...(nango.lastSyncDate ? { params: { since: nango.lastSyncDate?.toISOString() } } : {}),
paginate: {
type: 'cursor',
cursor_path_in_response: 'additional_data.next_cursor',
cursor_name_in_request: 'cursor',
limit_name_in_request: 'limit',
response_path: 'data',
limit: 100
}
};

for await (const deal of paginate(nango, endpoint, config)) {
for await (const deal of nango.paginate({ ...config, endpoint })) {
const mappedDeal: PipeDriveDeal[] = deal.map(mapDeal) || [];
// Save Deal
const batchSize: number = mappedDeal.length;
totalRecords += batchSize;
await nango.log(`Saving batch of ${batchSize} deals (total deals: ${totalRecords})`);
Expand All @@ -24,41 +29,6 @@ export default async function fetchData(nango: NangoSync) {
}
}

async function* paginate(nango: NangoSync, endpoint: string, config?: any, queryParams?: Record<string, string | string[]>) {
let cursor: string | undefined;
let callParams = queryParams || {};

while (true) {
if (cursor) {
callParams['cursor'] = `${cursor}`;
}

const resp = await nango.proxy({
method: 'GET',
endpoint: endpoint,
params: {
...(config?.paginate?.limit && { limit: config.paginate.limit }),
...(config?.params?.since && { since: config.params.since }),
...callParams
}
});

const deals = resp.data.data;

if (!deals || deals.length === 0) {
break;
}

yield deals;

if (!resp.data.additional_data || !resp.data.additional_data.next_cursor) {
break;
} else {
cursor = resp.data.additional_data.next_cursor;
}
}
}

function mapDeal(deal: any): PipeDriveDeal {
return {
id: deal.id,
Expand Down
44 changes: 7 additions & 37 deletions integration-templates/pipedrive/pipedrive-organizations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ export default async function fetchData(nango: NangoSync) {
const config = {
...(nango.lastSyncDate ? { params: { since: nango.lastSyncDate?.toISOString() } } : {}),
paginate: {
type: 'cursor',
cursor_path_in_response: 'additional_data.next_cursor',
cursor_name_in_request: 'cursor',
limit_name_in_request: 'limit',
response_path: 'data',
limit: 100
}
};

for await (const organization of paginate(nango, endpoint, config)) {
for await (const organization of nango.paginate({ ...config, endpoint })) {
const mappedOrganization: PipeDriveOrganization[] = organization.map(mapOrganization) || [];
// Save Organization
const batchSize: number = mappedOrganization.length;
totalRecords += batchSize;
await nango.log(`Saving batch of ${batchSize} organizations (total organizations: ${totalRecords})`);
Expand All @@ -24,41 +29,6 @@ export default async function fetchData(nango: NangoSync) {
}
}

async function* paginate(nango: NangoSync, endpoint: string, config?: any, queryParams?: Record<string, string | string[]>) {
let cursor: string | undefined;
let callParams = queryParams || {};

while (true) {
if (cursor) {
callParams['cursor'] = `${cursor}`;
}

const resp = await nango.proxy({
method: 'GET',
endpoint: endpoint,
params: {
...(config?.paginate?.limit && { limit: config.paginate.limit }),
...(config?.params?.since && { since: config.params.since }),
...callParams
}
});

const organizations = resp.data.data;

if (!organizations || organizations.length === 0) {
break;
}

yield organizations;

if (!resp.data.additional_data || !resp.data.additional_data.next_cursor) {
break;
} else {
cursor = resp.data.additional_data.next_cursor;
}
}
}

function mapOrganization(organization: any): PipeDriveOrganization {
return {
id: organization.id,
Expand Down
44 changes: 7 additions & 37 deletions integration-templates/pipedrive/pipedrive-persons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ export default async function fetchData(nango: NangoSync) {
const config = {
...(nango.lastSyncDate ? { params: { since: nango.lastSyncDate?.toISOString() } } : {}),
paginate: {
type: 'cursor',
cursor_path_in_response: 'additional_data.next_cursor',
cursor_name_in_request: 'cursor',
limit_name_in_request: 'limit',
response_path: 'data',
limit: 100
}
};

for await (const person of paginate(nango, endpoint, config)) {
for await (const person of nango.paginate({ ...config, endpoint })) {
const mappedPerson: PipeDrivePerson[] = person.map(mapPerson) || [];
// Save Person
const batchSize: number = mappedPerson.length;
totalRecords += batchSize;
await nango.log(`Saving batch of ${batchSize} persons (total persons: ${totalRecords})`);
Expand All @@ -24,41 +29,6 @@ export default async function fetchData(nango: NangoSync) {
}
}

async function* paginate(nango: NangoSync, endpoint: string, config?: any, queryParams?: Record<string, string | string[]>) {
let cursor: string | undefined;
let callParams = queryParams || {};

while (true) {
if (cursor) {
callParams['cursor'] = `${cursor}`;
}

const resp = await nango.proxy({
method: 'GET',
endpoint: endpoint,
params: {
...(config?.paginate?.limit && { limit: config.paginate.limit }),
...(config?.params?.since && { since: config.params.since }),
...callParams
}
});

const persons = resp.data.data;

if (!persons || persons.length === 0) {
break;
}

yield persons;

if (!resp.data.additional_data || !resp.data.additional_data.next_cursor) {
break;
} else {
cursor = resp.data.additional_data.next_cursor;
}
}
}

function mapPerson(person: any): PipeDrivePerson {
return {
id: person.id,
Expand Down

0 comments on commit 756aaaf

Please sign in to comment.