From 3ba83243d23eea66a4c3a9d2dc55f6b75a0f33ea Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Thu, 31 Oct 2024 09:12:31 +0300 Subject: [PATCH 01/14] remove axios --- packages/salesforce/package.json | 3 +-- pnpm-lock.yaml | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/salesforce/package.json b/packages/salesforce/package.json index 4524cae62..14d4c31db 100644 --- a/packages/salesforce/package.json +++ b/packages/salesforce/package.json @@ -34,7 +34,6 @@ "dependencies": { "@openfn/language-common": "1.15.2", "any-ascii": "^0.3.2", - "axios": "^1.7.7", "jsforce": "^1.11.1", "lodash": "^4.17.21" }, @@ -52,4 +51,4 @@ "type": "module", "types": "types/index.d.ts", "main": "dist/index.cjs" -} +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d205c76a6..8a815c1bd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1669,9 +1669,6 @@ importers: any-ascii: specifier: ^0.3.2 version: 0.3.2 - axios: - specifier: ^1.7.7 - version: 1.7.7 jsforce: specifier: ^1.11.1 version: 1.11.1 From 70ff07760c029f5759bd412b026d50c72fcfe325 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Wed, 17 Jul 2024 13:49:40 +0300 Subject: [PATCH 02/14] move helper functions to Utils --- packages/salesforce/src/Utils.js | 136 +++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 packages/salesforce/src/Utils.js diff --git a/packages/salesforce/src/Utils.js b/packages/salesforce/src/Utils.js new file mode 100644 index 000000000..62e47e85b --- /dev/null +++ b/packages/salesforce/src/Utils.js @@ -0,0 +1,136 @@ +import jsforce from 'jsforce'; + +function getConnection(state, options) { + const { apiVersion } = state.configuration; + + const apiVersionRegex = /^\d{2}\.\d$/; + + if (apiVersion && apiVersionRegex.test(apiVersion)) { + options.version = apiVersion; + } else { + options.version = '47.0'; + } + console.log('Using Salesforce API version:', options.version); + + return new jsforce.Connection(options); +} + +async function createBasicAuthConnection(state) { + const { loginUrl, username, password, securityToken } = state.configuration; + + const connection = getConnection(state, { loginUrl }); + + await connection + .login(username, securityToken ? password + securityToken : password) + .catch(e => { + console.error(`Failed to connect to salesforce as ${username}`); + throw e; + }); + + console.info(`Connected to salesforce as ${username}.`); + + return { + ...state, + connection, + }; +} + +function createAccessTokenConnection(state) { + const { instance_url, access_token } = state.configuration; + + const connection = getConnection(state, { + instanceUrl: instance_url, + accessToken: access_token, + }); + + console.log(`Connected with ${connection._sessionType} session type`); + + return { + ...state, + connection, + }; +} + +/** + * Creates a connection to Salesforce using Basic Auth or OAuth. + * @function createConnection + * @private + * @param {State} state - Runtime state. + * @returns {State} + */ +export function createConnection(state) { + if (state.connection) { + return state; + } + + const { access_token } = state.configuration; + + return access_token + ? createAccessTokenConnection(state) + : createBasicAuthConnection(state); +} + +/** + * Removes state.connection from state. + * @example + * removeConnection(state) + * @function + * @private + * @param {State} state + * @returns {State} + */ +export function removeConnection(state) { + delete state.connection; + return state; +} + +export async function pollJobResult(conn, job, pollInterval, pollTimeout) { + let attempt = 0; + + const maxPollingAttempts = Math.floor(pollTimeout / pollInterval); + + while (attempt < maxPollingAttempts) { + // Make an HTTP GET request to check the job status + const jobInfo = await conn + .request({ + method: 'GET', + url: `/services/data/v${conn.version}/jobs/query/${job.id}`, + headers: { + 'Content-Type': 'application/json', + }, + }) + .catch(error => { + console.log('Failed to fetch job information', error); + }); + + if (jobInfo && jobInfo.state === 'JobComplete') { + const response = await conn.request({ + method: 'GET', + url: `/services/data/v${conn.version}/jobs/query/${job.id}/results`, + headers: { + 'Content-Type': 'application/json', + }, + }); + + console.log('Job result retrieved', response.length); + return response; + } else { + // Handle maxPollingAttempts + if (attempt + 1 === maxPollingAttempts) { + console.error( + 'Maximum polling attempt reached, Please increase pollInterval and pollTimeout' + ); + throw new Error(`Polling time out. Job Id = ${job.id}`); + } + console.log( + `Attempt ${attempt + 1} - Job ${jobInfo.id} is still in ${ + jobInfo.state + }:` + ); + } + + // Wait for the polling interval before the next attempt + await new Promise(resolve => setTimeout(resolve, pollInterval)); + attempt++; + } +} From ba26aaa728b8efc54ffd30c970dde04fc1874321 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Thu, 1 Aug 2024 12:12:49 +0300 Subject: [PATCH 03/14] function cleanup --- packages/salesforce/ast.json | 81 -- packages/salesforce/src/Adaptor.js | 1038 ++++++++-------------- packages/salesforce/test/Adaptor.test.js | 105 +-- 3 files changed, 381 insertions(+), 843 deletions(-) diff --git a/packages/salesforce/ast.json b/packages/salesforce/ast.json index 73e1dc224..edf306eb4 100644 --- a/packages/salesforce/ast.json +++ b/packages/salesforce/ast.json @@ -687,87 +687,6 @@ }, "valid": true }, - { - "name": "createIf", - "params": [ - "logical", - "sObject", - "attrs" - ], - "docs": { - "description": "Create a new sObject if conditions are met.\n\n**The `createIf()` function has been deprecated. Use `fnIf(condition,create())` instead.**", - "tags": [ - { - "title": "public", - "description": null, - "type": null - }, - { - "title": "example", - "description": "createIf(true, 'obj_name', {\n attr1: \"foo\",\n attr2: \"bar\"\n})" - }, - { - "title": "function", - "description": null, - "name": null - }, - { - "title": "param", - "description": "a logical statement that will be evaluated.", - "type": { - "type": "NameExpression", - "name": "boolean" - }, - "name": "logical" - }, - { - "title": "param", - "description": "API name of the sObject.", - "type": { - "type": "NameExpression", - "name": "string" - }, - "name": "sObject" - }, - { - "title": "param", - "description": "Field attributes for the new object.", - "type": { - "type": "UnionType", - "elements": [ - { - "type": "NameExpression", - "name": "object" - }, - { - "type": "TypeApplication", - "expression": { - "type": "NameExpression", - "name": "Array" - }, - "applications": [ - { - "type": "NameExpression", - "name": "object" - } - ] - } - ] - }, - "name": "attrs" - }, - { - "title": "returns", - "description": null, - "type": { - "type": "NameExpression", - "name": "Operation" - } - } - ] - }, - "valid": true - }, { "name": "upsert", "params": [ diff --git a/packages/salesforce/src/Adaptor.js b/packages/salesforce/src/Adaptor.js index eb79c37e8..97e0b1852 100644 --- a/packages/salesforce/src/Adaptor.js +++ b/packages/salesforce/src/Adaptor.js @@ -20,8 +20,8 @@ import { } from '@openfn/language-common'; import { expandReferences as newExpandReferences } from '@openfn/language-common/util'; +import * as util from './Utils'; -import jsforce from 'jsforce'; import flatten from 'lodash/flatten'; let anyAscii = undefined; @@ -35,331 +35,26 @@ const loadAnyAscii = state => }); /** - * Adds a lookup relation or 'dome insert' to a record. - * @public - * @example - * Data Sourced Value: - * relationship("relationship_name__r", "externalID on related object", dataSource("path")) - * Fixed Value: - * relationship("relationship_name__r", "externalID on related object", "hello world") - * @function - * @param {string} relationshipName - `__r` relationship field on the record. - * @param {string} externalId - Salesforce ExternalID field. - * @param {string} dataSource - resolvable source. - * @returns {object} - */ -export function relationship(relationshipName, externalId, dataSource) { - return field(relationshipName, state => { - if (typeof dataSource == 'function') { - return { [externalId]: dataSource(state) }; - } - return { [externalId]: dataSource }; - }); -} - -/** - * Prints the total number of all available sObjects and pushes the result to `state.references`. - * @public - * @example - * describeAll() - * @function - * @returns {Operation} - */ -export function describeAll() { - return state => { - const { connection } = state; - - return connection.describeGlobal().then(result => { - const { sobjects } = result; - console.log(`Retrieved ${sobjects.length} sObjects`); - - return { - ...state, - references: [sobjects, ...state.references], - }; - }); - }; -} - -/** - * Prints an sObject metadata and pushes the result to state.references - * @public - * @example - * describe('obj_name') + * Executes an operation. * @function - * @param {string} sObject - API name of the sObject. - * @returns {Operation} + * @private + * @param {Operation} operations - Operations + * @returns {State} */ -export function describe(sObject) { - return state => { - const { connection } = state; - - const objectName = expandReferences(sObject)(state); - - return connection - .sobject(objectName) - .describe() - .then(result => { - console.log('Label : ' + result.label); - console.log('Num of Fields : ' + result.fields.length); - - return { - ...state, - references: [result, ...state.references], - }; - }); +export function execute(...operations) { + const initialState = { + references: [], + data: null, + configuration: {}, }; -} -/** - * Retrieves a Salesforce sObject(s). - * @public - * @example - * retrieve('ContentVersion', '0684K0000020Au7QAE/VersionData'); - * @function - * @param {string} sObject - The sObject to retrieve - * @param {string} id - The id of the record - * @param {function} callback - A callback to execute once the record is retrieved - * @returns {Operation} - */ -export function retrieve(sObject, id, callback) { return state => { - const { connection } = state; - - const finalId = expandReferences(id)(state); - - return connection - .sobject(sObject) - .retrieve(finalId) - .then(result => { - return { - ...state, - references: [result, ...state.references], - }; - }) - .then(state => { - if (callback) { - return callback(state); - } - return state; - }); - }; -} - -/** - * Execute an SOQL query. - * Note that in an event of a query error, - * error logs will be printed but the operation will not throw the error. - * - * The Salesforce query API is subject to rate limits, {@link https://sforce.co/3W9zyaQ See for more details}. - * @public - * @example - * query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`); - * @example Query more records if next records are available - * query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`, { autoFetch: true }); - * @function - * @param {string} qs - A query string. Must be less than `4000` characters in WHERE clause - * @param {object} options - Options passed to the bulk api. - * @param {boolean} [options.autoFetch=false] - Fetch next records if available. - * @param {function} callback - A callback to execute once the record is retrieved - * @returns {Operation} - */ -export function query(qs, options = {}, callback = s => s) { - return async state => { - const { connection } = state; - const [resolvedQs, resolvedOptions] = newExpandReferences( - state, - qs, - options - ); - console.log(`Executing query: ${resolvedQs}`); - const autoFetch = resolvedOptions.autoFetch || resolvedOptions.autofetch; - - if (autoFetch) { - console.log('autoFetch is enabled: all records will be downloaded'); - } - - const result = { - done: true, - totalSize: 0, - records: [], - }; - - const processRecords = async res => { - const { done, totalSize, records, nextRecordsUrl } = res; - - result.done = done; - result.totalSize = totalSize; - result.records.push(...records); - - if (!done && !autoFetch && nextRecordsUrl) { - result.nextRecordsUrl = nextRecordsUrl; - } - if (!done && autoFetch) { - console.log('Fetched records so far:', result.records.length); - console.log('Fetching next records...'); - - try { - const newResult = await connection.request({ url: nextRecordsUrl }); - await processRecords(newResult); - } catch (err) { - const { message, errorCode } = err; - console.error(`Error ${errorCode}: ${message}`); - throw err; - } - } - }; - - try { - const qResult = await connection.query(resolvedQs); - if (qResult.totalSize > 0) { - console.log('Total records:', qResult.totalSize); - await processRecords(qResult); - console.log('Done ✔ retrieved records:', result.records.length); - } else { - console.log('No records found.'); - } - } catch (err) { - const { message, errorCode } = err; - console.log(`Error ${errorCode}: ${message}`); - throw err; - } - - console.log( - 'Results retrieved and pushed to position [0] of the references array.' - ); - - const nextState = { - ...state, - references: [result, ...state.references], - }; - return callback(nextState); - }; -} - -async function pollJobResult(conn, job, pollInterval, pollTimeout) { - let attempt = 0; - - const maxPollingAttempts = Math.floor(pollTimeout / pollInterval); - - while (attempt < maxPollingAttempts) { - // Make an HTTP GET request to check the job status - const jobInfo = await conn - .request({ - method: 'GET', - url: `/services/data/v${conn.version}/jobs/query/${job.id}`, - headers: { - 'Content-Type': 'application/json', - }, - }) - .catch(error => { - console.log('Failed to fetch job information', error); - }); - - if (jobInfo && jobInfo.state === 'JobComplete') { - const response = await conn.request({ - method: 'GET', - url: `/services/data/v${conn.version}/jobs/query/${job.id}/results`, - headers: { - 'Content-Type': 'application/json', - }, - }); - - console.log('Job result retrieved', response.length); - return response; - } else { - // Handle maxPollingAttempts - if (attempt + 1 === maxPollingAttempts) { - console.error( - 'Maximum polling attempt reached, Please increase pollInterval and pollTimeout' - ); - throw new Error(`Polling time out. Job Id = ${job.id}`); - } - console.log( - `Attempt ${attempt + 1} - Job ${jobInfo.id} is still in ${ - jobInfo.state - }:` - ); - } - - // Wait for the polling interval before the next attempt - await new Promise(resolve => setTimeout(resolve, pollInterval)); - attempt++; - } -} - -const defaultOptions = { - pollTimeout: 90000, // in ms - pollInterval: 3000, // in ms -}; -/** - * Execute an SOQL Bulk Query. - * This function uses bulk query to efficiently query large data sets and reduce the number of API requests. - * `bulkQuery()` uses {@link https://sforce.co/4azgczz Bulk API v.2.0 Query} which is available in API version 47.0 and later. - * This API is subject to {@link https://sforce.co/4b6kn6z rate limits}. - * @public - * @example - * The results will be available on `state.data` - * bulkQuery(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`); - * @example - * bulkQuery( - * (state) => - * `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`, - * { pollTimeout: 10000, pollInterval: 6000 } - * ); - * @function - * @param {string} qs - A query string. - * @param {object} options - Options passed to the bulk api. - * @param {integer} [options.pollTimeout=90000] - Polling timeout in milliseconds. - * @param {integer} [options.pollInterval=3000] - Polling interval in milliseconds. - * @param {function} callback - A callback to execute once the record is retrieved - * @returns {Operation} - */ -export function bulkQuery(qs, options, callback) { - return async state => { - const { connection } = state; - const [resolvedQs, resolvedOptions] = newExpandReferences( - state, - qs, - options - ); - - if (parseFloat(connection.version) < 47.0) - throw new Error('bulkQuery requires API version 47.0 and later'); - - const { pollTimeout, pollInterval } = { - ...defaultOptions, - ...resolvedOptions, - }; - - console.log(`Executing query: ${resolvedQs}`); - - const queryJob = await connection.request({ - method: 'POST', - url: `/services/data/v${connection.version}/jobs/query`, - body: JSON.stringify({ - operation: 'query', - query: resolvedQs, - }), - headers: { - 'Content-Type': 'application/json', - }, - }); - - const result = await pollJobResult( - connection, - queryJob, - pollInterval, - pollTimeout - ); - - const nextState = { - ...composeNextState(state, result), - result, - }; - if (callback) return callback(nextState); - - return nextState; + return commonExecute( + loadAnyAscii, + util.createConnection, + ...flatten(operations), + util.removeConnection + )({ ...initialState, ...state }); }; } @@ -370,14 +65,13 @@ export function bulkQuery(qs, options, callback) { * bulk( * "Patient__c", * "insert", - * { failOnError: true }, - * (state) => state.someArray.map((x) => ({ Age__c: x.age, Name: x.name })) + * (state) => state.patients.map((x) => ({ Age__c: x.age, Name: x.name })), + * { failOnError: true } * ); * @example Bulk upsert * bulk( * "vera__Beneficiary__c", * "upsert", - * { extIdField: "vera__Result_UID__c" }, * [ * { * vera__Reporting_Period__c: 2023, @@ -385,40 +79,42 @@ export function bulkQuery(qs, options, callback) { * "vera__Indicator__r.vera__ExtId__c": 1001, * vera__Result_UID__c: "1001_2023_Uganda", * }, - * ] + * ], + * { extIdField: "vera__Result_UID__c" } * ); * @function - * @param {string} sObject - API name of the sObject. + * @param {string} sObjectName - API name of the sObject. * @param {string} operation - The bulk operation to be performed.Eg "insert" | "update" | "upsert" + * @param {array} records - an array of records, or a function which returns an array. * @param {object} options - Options passed to the bulk api. - * @param {integer} [options.pollTimeout=240000] - Polling timeout in milliseconds. - * @param {integer} [options.pollInterval=6000] - Polling interval in milliseconds. * @param {string} [options.extIdField] - External id field. + * @param {boolean} [options.allowNoOp=false] - Skipping bulk operation if no records. * @param {boolean} [options.failOnError=false] - Fail the operation on error. - * @param {array} records - an array of records, or a function which returns an array. + * @param {integer} [options.pollInterval=6000] - Polling interval in milliseconds. + * @param {integer} [options.pollTimeout=240000] - Polling timeout in milliseconds. * @returns {Operation} */ -export function bulk(sObject, operation, options, records) { +export function bulk(sObjectName, operation, records, options = {}) { return state => { const { connection } = state; const [ - resolvedSObject, + resolvedSObjectName, resolvedOperation, - resolvedOptions, resolvedRecords, - ] = newExpandReferences(state, sObject, operation, options, records); + resolvedOptions, + ] = expandReferences(state, sObjectName, operation, records, options); const { failOnError = false, allowNoOp = false, - pollTimeout, - pollInterval, + pollTimeout = 240000, + pollInterval = 6000, } = resolvedOptions; if (allowNoOp && resolvedRecords.length === 0) { console.info( - `No items in ${resolvedSObject} array. Skipping bulk ${resolvedOperation} operation.` + `No items in ${resolvedSObjectName} array. Skipping bulk ${resolvedOperation} operation.` ); return state; } @@ -432,17 +128,14 @@ export function bulk(sObject, operation, options, records) { chunkedBatches.map( chunkedBatch => new Promise((resolve, reject) => { - const timeout = pollTimeout || 240000; - const interval = pollInterval || 6000; - console.info( - `Creating bulk ${resolvedOperation} job for ${resolvedSObject} with ${chunkedBatch.length} records` + `Creating bulk ${resolvedOperation} job for ${resolvedSObjectName} with ${chunkedBatch.length} records` ); const job = connection.bulk.createJob( - resolvedSObject, + resolvedSObjectName, resolvedOperation, - options + resolvedOptions ); job.on('error', err => reject(err)); @@ -464,7 +157,7 @@ export function bulk(sObject, operation, options, records) { console.info(batchInfo); const batchId = batchInfo.id; var batch = job.batch(batchId); - batch.poll(interval, timeout); + batch.poll(pollInterval, pollTimeout); }) .then(async res => { await job.close(); @@ -475,8 +168,8 @@ export function bulk(sObject, operation, options, records) { }); errors.forEach(err => { - err[`${options.extIdField}`] = - chunkedBatch[err.position - 1][options.extIdField]; + err[`${resolvedOptions.extIdField}`] = + chunkedBatch[err.position - 1][resolvedOptions.extIdField]; }); if (failOnError && errors.length > 0) { @@ -489,13 +182,135 @@ export function bulk(sObject, operation, options, records) { }); }) ) - ).then(arrayOfResults => { + ).then(results => { console.log('Merging results arrays.'); - const merged = [].concat.apply([], arrayOfResults); - return { ...state, references: [merged, ...state.references] }; + return composeNextState(state, results.flat()); }); }; } +/** + * Execute an SOQL Bulk Query. + * This function uses bulk query to efficiently query large data sets and reduce the number of API requests. + * `bulkQuery()` uses {@link https://sforce.co/4azgczz Bulk API v.2.0 Query} which is available in API version 47.0 and later. + * This API is subject to {@link https://sforce.co/4b6kn6z rate limits}. + * @public + * @example + * The results will be available on `state.data` + * bulkQuery(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`); + * @example + * bulkQuery( + * (state) => + * `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`, + * { pollTimeout: 10000, pollInterval: 6000 } + * ); + * @function + * @param {string} qs - A query string. + * @param {object} options - Options passed to the bulk api. + * @param {integer} [options.pollTimeout=90000] - Polling timeout in milliseconds. + * @param {integer} [options.pollInterval=3000] - Polling interval in milliseconds. + * @returns {Operation} + */ +export function bulkQuery(qs, options = {}) { + return async state => { + const { connection } = state; + const [resolvedQs, resolvedOptions] = expandReferences(state, qs, options); + + if (parseFloat(connection.version) < 47.0) + throw new Error('bulkQuery requires API version 47.0 and later'); + + const { pollTimeout = 90000, pollInterval = 3000 } = resolvedOptions; + + console.log(`Executing query: ${resolvedQs}`); + + const queryJob = await connection.request({ + method: 'POST', + url: `/services/data/v${connection.version}/jobs/query`, + body: JSON.stringify({ + operation: 'query', + query: resolvedQs, + }), + headers: { + 'Content-Type': 'application/json', + }, + }); + + const result = await util.pollJobResult( + connection, + queryJob, + pollInterval, + pollTimeout + ); + + return composeNextState(state, result); + }; +} + +/** + * Create a new sObject record(s). + * @public + * @example Single record creation + * create("Account", { Name: "My Account #1" }); + * @example Multiple records creation + * create("Account",[{ Name: "My Account #1" }, { Name: "My Account #2" }]); + * @function + * @param {string} sObjectName - API name of the sObject. + * @param {object} records - Field attributes for the new record. + * @returns {Operation} + */ +export function create(sObjectName, records) { + return state => { + let { connection } = state; + const [resolvedSObjectName, resolvedRecords] = expandReferences( + state, + sObjectName, + records + ); + console.info(`Creating ${resolvedSObjectName}`, resolvedRecords); + + return connection + .create(resolvedSObjectName, resolvedRecords) + .then(recordResult => { + console.log('Result : ' + JSON.stringify(recordResult)); + return composeNextState(state, recordResult); + }); + }; +} + +/** + * Fetches and prints metadata for an sObject and pushes the result to `state.data`. + * If `sObjectName` is not specified, it will print the total number of all available sObjects and push the result to `state.data`. + * @public + * @example Fetch metadata for all available sObjects + * describe() + * @example Fetch metadata for Account sObject + * describe('Account') + * @function + * @param {string} [sObjectName] - The API name of the sObject. If omitted, fetches metadata for all sObjects. + * @returns {Operation} + */ +export function describe(sObjectName) { + return state => { + const { connection } = state; + + const [resolvedSObjectName] = expandReferences(state, sObjectName); + + return resolvedSObjectName + ? connection + .sobject(resolvedSObjectName) + .describe() + .then(result => { + console.log('Label : ' + result.label); + console.log('Num of Fields : ' + result.fields.length); + + return composeNextState(state, result); + }) + : connection.describeGlobal().then(result => { + const { sobjects } = result; + console.log(`Retrieved ${sobjects.length} sObjects`); + return composeNextState(state, result); + }); + }; +} /** * Delete records of an object. @@ -503,132 +318,205 @@ export function bulk(sObject, operation, options, records) { * @example * destroy('obj_name', [ * '0060n00000JQWHYAA5', - * '0090n00000JQEWHYAA5 + * '0090n00000JQEWHYAA5' * ], { failOnError: true }) * @function - * @param {string} sObject - API name of the sObject. - * @param {object} attrs - Array of IDs of records to delete. + * @param {string} sObjectName - API name of the sObject. + * @param {object} ids - Array of IDs of records to delete. * @param {object} options - Options for the destroy delete operation. * @returns {Operation} */ -export function destroy(sObject, attrs, options) { +export function destroy(sObjectName, ids, options = {}) { return state => { const { connection } = state; - const finalAttrs = expandReferences(attrs)(state); - const { failOnError } = options; - console.info(`Deleting ${sObject} records`); + const [resolvedSObjectName, resolvedIds, resolvedOptions] = + expandReferences(state, sObjectName, ids, options); + + const { failOnError = false } = resolvedOptions; + + console.info(`Deleting ${resolvedSObjectName} records`); return connection - .sobject(sObject) - .del(finalAttrs) + .sobject(resolvedSObjectName) + .del(resolvedIds) .then(function (result) { const successes = result.filter(r => r.success); + const failures = result.filter(r => !r.success); + console.log( 'Sucessfully deleted: ', JSON.stringify(successes, null, 2) ); - const failures = result.filter(r => !r.success); - console.log('Failed to delete: ', JSON.stringify(failures, null, 2)); + if (failures.length > 0) { + console.log('Failed to delete: ', JSON.stringify(failures, null, 2)); - if (failOnError && result.some(r => !r.success)) - throw 'Some deletes failed; exiting with failure code.'; + if (failOnError) + throw 'Some deletes failed; exiting with failure code.'; + } - return { - ...state, - references: [result, ...state.references], - }; + return composeNextState(state, result); }); }; } /** - * Create a new sObject record(s). - * @public - * @example Single record creation - * create("Account", { Name: "My Account #1" }); - * @example Multiple records creation - * create("Account",[{ Name: "My Account #1" }, { Name: "My Account #2" }]); - * @function - * @param {string} sObject - API name of the sObject. - * @param {object} attrs - Field attributes for the new record. + * Send a GET HTTP request using connected session information. + * @example + * get('/actions/custom/flow/POC_OpenFN_Test_Flow'); + * @param {string} path - The Salesforce API endpoint, Relative to request from + * @param {object} options - Request query parameters and headers * @returns {Operation} */ -export function create(sObject, attrs) { - return state => { - let { connection } = state; - const finalAttrs = expandReferences(attrs)(state); - console.info(`Creating ${sObject}`, finalAttrs); - - return connection.create(sObject, finalAttrs).then(function (recordResult) { - console.log('Result : ' + JSON.stringify(recordResult)); - return { - ...state, - references: [recordResult, ...state.references], - }; - }); +export function get(path, options = {}) { + return async state => { + const { connection } = state; + const [resolvedPath, resolvedOptions] = expandReferences( + state, + path, + options + ); + const { headers, ...query } = resolvedOptions; + console.log(`GET: ${resolvedPath}`); + const requestOptions = { + url: resolvedPath, + method: 'GET', + query, + headers: { 'content-type': 'application/json', ...headers }, + }; + + const result = await connection.request(requestOptions); + + return composeNextState(state, result); }; } - /** - * Alias for "create(sObject, attrs)". + * Alias for "create(sObjectName, attrs)". * @public * @example Single record creation * insert("Account", { Name: "My Account #1" }); * @example Multiple records creation * insert("Account",[{ Name: "My Account #1" }, { Name: "My Account #2" }]); * @function - * @param {string} sObject - API name of the sObject. - * @param {object} attrs - Field attributes for the new record. + * @param {string} sObjectName - API name of the sObject. + * @param {object} records - Field attributes for the new record. * @returns {Operation} */ -export function insert(sObject, attrs) { - return create(sObject, attrs); +export function insert(sObjectName, records) { + return create(sObjectName, records); } /** - * Create a new sObject if conditions are met. + * Send a POST HTTP request using connected session information. * - * **The `createIf()` function has been deprecated. Use `fnIf(condition,create())` instead.** + * @example + * post('/actions/custom/flow/POC_OpenFN_Test_Flow', { inputs: [{}] }); + * @param {string} path - The Salesforce API endpoint, Relative to request from + * @param {object} data - A JSON Object request body + * @param {object} options - Request options + * @param {object} [options.headers] - Object of request headers + * @param {object} [options.query] - A JSON Object request body + * @returns {Operation} + */ +export function post(path, data, options = {}) { + return async state => { + const { connection } = state; + const [resolvedPath, resolvedData, resolvedOptions] = expandReferences( + state, + path, + data, + options + ); + const { query, headers } = resolvedOptions; + + console.log(`POST: ${resolvedPath}`); + + const requestOptions = { + url: resolvedPath, + method: 'POST', + query, + headers: { 'content-type': 'application/json', ...headers }, + body: JSON.stringify(resolvedData), + }; + + const result = await connection.request(requestOptions); + + return composeNextState(state, result); + }; +} + +/** + * Execute an SOQL query. + * Note that in an event of a query error, + * error logs will be printed but the operation will not throw the error. + * + * The Salesforce query API is subject to rate limits, {@link https://sforce.co/3W9zyaQ See for more details}. * @public * @example - * createIf(true, 'obj_name', { - * attr1: "foo", - * attr2: "bar" - * }) + * query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`); + * @example Query more records if next records are available + * query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`, { autoFetch: true }); * @function - * @param {boolean} logical - a logical statement that will be evaluated. - * @param {string} sObject - API name of the sObject. - * @param {(object|object[])} attrs - Field attributes for the new object. + * @param {string} qs - A query string. Must be less than `4000` characters in WHERE clause + * @param {object} options - Options passed to the bulk api. + * @param {boolean} [options.autoFetch=false] - Fetch next records if available. * @returns {Operation} */ -export function createIf(logical, sObject, attrs) { - return state => { - const resolvedLogical = expandReferences(logical)(state); +export function query(qs, options = {}) { + return async state => { + let done = false; + let qResult = null; + let result = []; - console.warn( - `The 'createIf()' function has been deprecated. Use 'fnIf(condition,create())' instead.` - ); + const { connection } = state; + const [resolvedQs, resolvedOptions] = expandReferences(state, qs, options); + const { autoFetch = false } = resolvedOptions; - if (resolvedLogical) { - const { connection } = state; - const finalAttrs = expandReferences(attrs)(state); - console.info(`Creating ${sObject}`, finalAttrs); - return connection - .create(sObject, finalAttrs) - .then(function (recordResult) { - console.log('Result : ' + JSON.stringify(recordResult)); - return { - ...state, - references: [recordResult, ...state.references], - }; - }); + console.log(`Executing query: ${resolvedQs}`); + try { + qResult = await connection.query(resolvedQs); + } catch (err) { + const { message, errorCode } = err; + console.log(`Error ${errorCode}: ${message}`); + throw err; + } + + if (qResult.totalSize > 0) { + console.log('Total records', qResult.totalSize); + + while (!done) { + result.push(qResult); + + if (qResult.done) { + done = true; + } else if (autoFetch) { + console.log( + 'Fetched records so far', + result.map(ref => ref.records).flat().length + ); + console.log('Fetching next records...'); + try { + qResult = await connection.request({ url: qResult.nextRecordsUrl }); + } catch (err) { + const { message, errorCode } = err; + console.log(`Error ${errorCode}: ${message}`); + throw err; + } + } else { + done = true; + } + } + + console.log( + 'Done ✔ retrieved records', + result.map(ref => ref.records).flat().length + ); } else { - console.info(`Not creating ${sObject} because logical is false.`); - return { - ...state, - }; + result.push(qResult); + console.log('No records found.'); } + + return composeNextState(state, result, result?.records); }; } @@ -644,87 +532,33 @@ export function createIf(logical, sObject, attrs) { * { Name: "Record #2", ExtId__c : 'ID-0000002' }, * ]); * @function - * @param {string} sObject - API name of the sObject. - * @magic sObject - $.children[?(!@.meta.system)].name + * @param {string} sObjectName - API name of the sObject. + * @magic sObjectName - $.children[?(!@.meta.system)].name * @param {string} externalId - The external ID of the sObject. * @magic externalId - $.children[?(@.name=="{{args.sObject}}")].children[?(@.meta.externalId)].name - * @param {(object|object[])} attrs - Field attributes for the new object. - * @magic attrs - $.children[?(@.name=="{{args.sObject}}")].children[?(!@.meta.externalId)] + * @param {(object|object[])} records - Field attributes for the new object. + * @magic records - $.children[?(@.name=="{{args.sObject}}")].children[?(!@.meta.externalId)] * @returns {Operation} */ -export function upsert(sObject, externalId, attrs) { +export function upsert(sObjectName, externalId, records) { return state => { const { connection } = state; - const finalAttrs = expandReferences(attrs)(state); + const [resolvedSObjectName, resolvedExternalId, resolvedRecords] = + expandReferences(state, sObjectName, externalId, records); console.info( - `Upserting ${sObject} with externalId`, - externalId, + `Upserting ${resolvedSObjectName} with externalId`, + resolvedExternalId, ':', - finalAttrs + resolvedRecords ); return connection - .upsert(sObject, finalAttrs, externalId) - .then(function (recordResult) { - console.log('Result : ' + JSON.stringify(recordResult)); - return { - ...state, - references: [recordResult, ...state.references], - }; - }); - }; -} - -/** - * Conditionally create a new sObject record, or updates it if it already exists - * - * **The `upsertIf()` function has been deprecated. Use `fnIf(condition,upsert())` instead.** - * @public - * @example - * upsertIf(true, 'obj_name', 'ext_id', { - * attr1: "foo", - * attr2: "bar" - * }) - * @function - * @param {boolean} logical - a logical statement that will be evaluated. - * @param {string} sObject - API name of the sObject. - * @param {string} externalId - ID. - * @param {(object|object[])} attrs - Field attributes for the new object. - * @returns {Operation} - */ -export function upsertIf(logical, sObject, externalId, attrs) { - return state => { - const resolvedLogical = expandReferences(logical)(state); - - console.warn( - `The 'upsertIf()' function has been deprecated. Use 'fnIf(condition,upsert())' instead.` - ); - - if (resolvedLogical) { - const { connection } = state; - const finalAttrs = expandReferences(attrs)(state); - console.info( - `Upserting ${sObject} with externalId`, - externalId, - ':', - finalAttrs - ); + .upsert(resolvedSObjectName, resolvedRecords, resolvedExternalId) + .then(function (result) { + console.log('Result : ' + JSON.stringify(result)); - return connection - .upsert(sObject, finalAttrs, externalId) - .then(function (recordResult) { - console.log('Result : ' + JSON.stringify(recordResult)); - return { - ...state, - references: [recordResult, ...state.references], - }; - }); - } else { - console.info(`Not upserting ${sObject} because logical is false.`); - return { - ...state, - }; - } + return composeNextState(state, result); + }); }; } @@ -742,163 +576,28 @@ export function upsertIf(logical, sObject, externalId, attrs) { * { Id: "0010500000fxbcvAAA", Name: "Updated Account #2" }, * ]); * @function - * @param {string} sObject - API name of the sObject. - * @param {(object|object[])} attrs - Field attributes for the new object. + * @param {string} sObjectName - API name of the sObject. + * @param {(object|object[])} records - Field attributes for the new object. * @returns {Operation} */ -export function update(sObject, attrs) { +export function update(sObjectName, records) { return state => { let { connection } = state; - const finalAttrs = expandReferences(attrs)(state); - console.info(`Updating ${sObject}`, finalAttrs); - - return connection.update(sObject, finalAttrs).then(function (recordResult) { - console.log('Result : ' + JSON.stringify(recordResult)); - return { - ...state, - references: [recordResult, ...state.references], - }; - }); - }; -} - -/** - * Get a reference ID by an index. - * @public - * @example - * reference(0) - * @function - * @param {number} position - Position for references array. - * @returns {State} - */ -export function reference(position) { - return state => state.references[position].id; -} - -function getConnection(state, options) { - const { apiVersion } = state.configuration; - - const apiVersionRegex = /^\d{2}\.\d$/; - - if (apiVersion && apiVersionRegex.test(apiVersion)) { - options.version = apiVersion; - } else { - options.version = '47.0'; - } - console.log('Using Salesforce API version:', options.version); - - return new jsforce.Connection(options); -} - -async function createBasicAuthConnection(state) { - const { loginUrl, username, password, securityToken } = state.configuration; - - const connection = getConnection(state, { loginUrl }); - - await connection - .login(username, securityToken ? password + securityToken : password) - .catch(e => { - console.error(`Failed to connect to salesforce as ${username}`); - throw e; - }); - - console.info(`Connected to salesforce as ${username}.`); - - return { - ...state, - connection, - }; -} - -function createAccessTokenConnection(state) { - const { instance_url, access_token } = state.configuration; - - const connection = getConnection(state, { - instanceUrl: instance_url, - accessToken: access_token, - }); - - console.log(`Connected with ${connection._sessionType} session type`); - - return { - ...state, - connection, - }; -} - -/** - * Creates a connection to Salesforce using Basic Auth or OAuth. - * @function createConnection - * @private - * @param {State} state - Runtime state. - * @returns {State} - */ -function createConnection(state) { - if (state.connection) { - return state; - } - - const { access_token } = state.configuration; - - return access_token - ? createAccessTokenConnection(state) - : createBasicAuthConnection(state); -} - -/** - * Executes an operation. - * @function - * @param {Operation} operations - Operations - * @returns {State} - */ -export function execute(...operations) { - const initialState = { - logger: { - info: console.info.bind(console), - debug: console.log.bind(console), - }, - references: [], - data: null, - configuration: {}, - }; + const [resolvedSObjectName, resolvedRecords] = expandReferences( + state, + sObjectName, + records + ); + console.info(`Updating ${resolvedSObjectName}`, resolvedRecords); - return state => { - // Note: we no longer need `steps` anymore since `commonExecute` - // takes each operation as an argument. - return commonExecute( - loadAnyAscii, - createConnection, - ...flatten(operations), - cleanupState - )({ ...initialState, ...state }); + return connection + .update(resolvedSObjectName, resolvedRecords) + .then(function (result) { + console.log('Result : ' + JSON.stringify(result)); + return composeNextState(state, result); + }); }; } -/** - * Removes unserializable keys from the state. - * @example - * cleanupState(state) - * @function - * @param {State} state - * @returns {State} - */ -function cleanupState(state) { - delete state.connection; - return state; -} - -/** - * Flattens an array of operations. - * @example - * steps( - * createIf(params), - * update(params) - * ) - * @function - * @returns {array} - */ -export function steps(...operations) { - return flatten(operations); -} /** * Transliterates unicode characters to their best ASCII representation @@ -924,20 +623,18 @@ export function toUTF8(input) { * method: 'POST', * json: { inputs: [{}] }, * }); - * @param {string} url - Relative or absolute URL to request from - * @param {object} options - Request options + * @param {string} url - Relative to request from + * @param {object} options - The options for the request. * @param {string} [options.method=GET] - HTTP method to use. Defaults to GET * @param {object} [options.headers] - Object of request headers - * @param {object} [options.json] - A JSON Object request body + * @param {object} [options.json] - A JSON object to send as the request body. * @param {string} [options.body] - HTTP body (in POST/PUT/PATCH methods) - * @param {function} callback - A callback to execute once the request is complete * @returns {Operation} */ - -export function request(path, options, callback = s => s) { +export function request(path, options = {}) { return async state => { const { connection } = state; - const [resolvedPath, resolvedOptions] = newExpandReferences( + const [resolvedPath, resolvedOptions] = expandReferences( state, path, options @@ -955,20 +652,45 @@ export function request(path, options, callback = s => s) { const result = await connection.request(requestOptions); - const nextState = composeNextState(state, result); - - return callback(nextState); + return composeNextState(state, result); }; } -// Note that we expose the entire axios package to the user here. -import axios from 'axios'; -export { axios }; +/** + * Retrieves a Salesforce sObject(s). + * @public + * @example + * retrieve('ContentVersion', '0684K0000020Au7QAE/VersionData'); + * @function + * @param {string} sObjectName - The sObject to retrieve + * @param {string} id - The id of the record + * @returns {Operation} + */ +export function retrieve(sObjectName, id) { + return state => { + const { connection } = state; + + const [resolvedSObjectName, resolvedId] = expandReferences( + state, + sObjectName, + id + ); + + console.log( + `Retrieving data for sObject '${resolvedSObjectName}' with Id '${resolvedId}'` + ); + return connection + .sobject(resolvedSObjectName) + .retrieve(resolvedId) + .then(result => { + return composeNextState(state, result); + }); + }; +} export { alterState, arrayToString, - beta, chunk, combine, dataPath, diff --git a/packages/salesforce/test/Adaptor.test.js b/packages/salesforce/test/Adaptor.test.js index 5c39b3c6c..1161fbbe5 100644 --- a/packages/salesforce/test/Adaptor.test.js +++ b/packages/salesforce/test/Adaptor.test.js @@ -1,27 +1,10 @@ import chai from 'chai'; import sinon from 'sinon'; -import { - reference, - create, - createIf, - upsert, - upsertIf, - toUTF8, - execute, - query, -} from '../src/Adaptor'; +import { create, upsert, toUTF8, execute, query } from '../src/Adaptor'; const { expect } = chai; describe('Adaptor', () => { - describe('reference', () => { - it('returns the Id of a previous operation', () => { - let state = { references: [{ id: '12345' }] }; - let Id = reference(0)(state); - expect(Id).to.eql('12345'); - }); - }); - describe('create', () => { it('makes a new sObject', done => { const fakeConnection = { @@ -50,59 +33,6 @@ describe('Adaptor', () => { }); }); - describe('createIf', () => { - it("doesn't create a new sObject if a logical is false", done => { - const fakeConnection = { - create: function () { - return Promise.resolve({ Id: 10 }); - }, - }; - let state = { connection: fakeConnection, references: [] }; - - let logical = 1 + 1 == 3; - - let sObject = 'myObject'; - let fields = { field: 'value' }; - - let spy = sinon.spy(fakeConnection, 'create'); - - createIf(logical, sObject, fields)(state); - - expect(spy.called).to.eql(false); - expect(state).to.eql({ connection: fakeConnection, references: [] }); - done(); - }); - - it('makes a new sObject if a logical is true', done => { - const fakeConnection = { - create: function () { - return Promise.resolve({ Id: 10 }); - }, - }; - let state = { connection: fakeConnection, references: [] }; - - let logical = 1 + 1 == 2; - - let sObject = 'myObject'; - let fields = { field: 'value' }; - - let spy = sinon.spy(fakeConnection, 'create'); - - createIf( - logical, - sObject, - fields - )(state) - .then(state => { - expect(spy.args[0]).to.eql([sObject, fields]); - expect(spy.called).to.eql(true); - expect(state.references[0]).to.eql({ Id: 10 }); - }) - .then(done) - .catch(done); - }); - }); - describe('upsert', () => { it('is expected to call `upsert` on the connection', done => { const connection = { @@ -133,39 +63,6 @@ describe('Adaptor', () => { }); }); - describe('upsertIf', () => { - it('upserts if a logical is true', done => { - const fakeConnection = { - upsert: function () { - return Promise.resolve({ Id: 10 }); - }, - }; - let state = { connection: fakeConnection, references: [] }; - - let logical = 1 + 1 == 2; - - let sObject = 'myObject'; - let externalId = 'MyExternalId'; - let fields = { field: 'value' }; - - let spy = sinon.spy(fakeConnection, 'upsert'); - - upsertIf( - logical, - sObject, - externalId, - fields - )(state) - .then(state => { - expect(spy.args[0]).to.eql([sObject, fields, externalId]); - expect(spy.called).to.eql(true); - expect(state.references[0]).to.eql({ Id: 10 }); - }) - .then(done) - .catch(done); - }); - }); - describe('toUTF8', () => { it('Transliterate unicode to ASCII representation', async () => { const state = { From 1333e6870eabea81a77ff238029a40f82b789b1f Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Thu, 1 Aug 2024 12:16:20 +0300 Subject: [PATCH 04/14] wip prepareNextState --- packages/salesforce/ast.json | 595 +++++++++-------------------- packages/salesforce/src/Adaptor.js | 4 +- packages/salesforce/src/Utils.js | 11 + 3 files changed, 191 insertions(+), 419 deletions(-) diff --git a/packages/salesforce/ast.json b/packages/salesforce/ast.json index edf306eb4..5cb28c10f 100644 --- a/packages/salesforce/ast.json +++ b/packages/salesforce/ast.json @@ -1,73 +1,15 @@ { "operations": [ { - "name": "relationship", + "name": "bulk", "params": [ - "relationshipName", - "externalId", - "dataSource" + "sObjectName", + "operation", + "records", + "options" ], "docs": { - "description": "Adds a lookup relation or 'dome insert' to a record.", - "tags": [ - { - "title": "public", - "description": null, - "type": null - }, - { - "title": "example", - "description": "Data Sourced Value:\n relationship(\"relationship_name__r\", \"externalID on related object\", dataSource(\"path\"))\nFixed Value:\n relationship(\"relationship_name__r\", \"externalID on related object\", \"hello world\")" - }, - { - "title": "function", - "description": null, - "name": null - }, - { - "title": "param", - "description": "`__r` relationship field on the record.", - "type": { - "type": "NameExpression", - "name": "string" - }, - "name": "relationshipName" - }, - { - "title": "param", - "description": "Salesforce ExternalID field.", - "type": { - "type": "NameExpression", - "name": "string" - }, - "name": "externalId" - }, - { - "title": "param", - "description": "resolvable source.", - "type": { - "type": "NameExpression", - "name": "string" - }, - "name": "dataSource" - }, - { - "title": "returns", - "description": null, - "type": { - "type": "NameExpression", - "name": "object" - } - } - ] - }, - "valid": true - }, - { - "name": "describeAll", - "params": [], - "docs": { - "description": "Prints the total number of all available sObjects and pushes the result to `state.references`.", + "description": "Create and execute a bulk job.", "tags": [ { "title": "public", @@ -76,41 +18,13 @@ }, { "title": "example", - "description": "describeAll()" - }, - { - "title": "function", - "description": null, - "name": null - }, - { - "title": "returns", - "description": null, - "type": { - "type": "NameExpression", - "name": "Operation" - } - } - ] - }, - "valid": true - }, - { - "name": "describe", - "params": [ - "sObject" - ], - "docs": { - "description": "Prints an sObject metadata and pushes the result to state.references", - "tags": [ - { - "title": "public", - "description": null, - "type": null + "description": "bulk(\n \"Patient__c\",\n \"insert\",\n (state) => state.patients.map((x) => ({ Age__c: x.age, Name: x.name })),\n { failOnError: true }\n);", + "caption": "Bulk insert" }, { "title": "example", - "description": "describe('obj_name')" + "description": "bulk(\n \"vera__Beneficiary__c\",\n \"upsert\",\n [\n {\n vera__Reporting_Period__c: 2023,\n vera__Geographic_Area__c: \"Uganda\",\n \"vera__Indicator__r.vera__ExtId__c\": 1001,\n vera__Result_UID__c: \"1001_2023_Uganda\",\n },\n ],\n { extIdField: \"vera__Result_UID__c\" }\n);", + "caption": "Bulk upsert" }, { "title": "function", @@ -124,151 +38,98 @@ "type": "NameExpression", "name": "string" }, - "name": "sObject" - }, - { - "title": "returns", - "description": null, - "type": { - "type": "NameExpression", - "name": "Operation" - } - } - ] - }, - "valid": true - }, - { - "name": "retrieve", - "params": [ - "sObject", - "id", - "callback" - ], - "docs": { - "description": "Retrieves a Salesforce sObject(s).", - "tags": [ - { - "title": "public", - "description": null, - "type": null - }, - { - "title": "example", - "description": "retrieve('ContentVersion', '0684K0000020Au7QAE/VersionData');" - }, - { - "title": "function", - "description": null, - "name": null + "name": "sObjectName" }, { "title": "param", - "description": "The sObject to retrieve", + "description": "The bulk operation to be performed.Eg \"insert\" | \"update\" | \"upsert\"", "type": { "type": "NameExpression", "name": "string" }, - "name": "sObject" + "name": "operation" }, { "title": "param", - "description": "The id of the record", + "description": "an array of records, or a function which returns an array.", "type": { "type": "NameExpression", - "name": "string" + "name": "array" }, - "name": "id" + "name": "records" }, { "title": "param", - "description": "A callback to execute once the record is retrieved", + "description": "Options passed to the bulk api.", "type": { "type": "NameExpression", - "name": "function" + "name": "object" }, - "name": "callback" + "name": "options" }, { - "title": "returns", - "description": null, + "title": "param", + "description": "External id field.", "type": { - "type": "NameExpression", - "name": "Operation" - } - } - ] - }, - "valid": true - }, - { - "name": "query", - "params": [ - "qs", - "options", - "callback" - ], - "docs": { - "description": "Execute an SOQL query.\nNote that in an event of a query error,\nerror logs will be printed but the operation will not throw the error.\n\nThe Salesforce query API is subject to rate limits, {@link https://sforce.co/3W9zyaQ See for more details}.", - "tags": [ - { - "title": "public", - "description": null, - "type": null - }, - { - "title": "example", - "description": "query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`);" - }, - { - "title": "example", - "description": "query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`, { autoFetch: true });", - "caption": "Query more records if next records are available" - }, - { - "title": "function", - "description": null, - "name": null + "type": "OptionalType", + "expression": { + "type": "NameExpression", + "name": "string" + } + }, + "name": "options.extIdField" }, { "title": "param", - "description": "A query string. Must be less than `4000` characters in WHERE clause", + "description": "Skipping bulk operation if no records.", "type": { - "type": "NameExpression", - "name": "string" + "type": "OptionalType", + "expression": { + "type": "NameExpression", + "name": "boolean" + } }, - "name": "qs" + "name": "options.allowNoOp", + "default": "false" }, { "title": "param", - "description": "Options passed to the bulk api.", + "description": "Fail the operation on error.", "type": { - "type": "NameExpression", - "name": "object" + "type": "OptionalType", + "expression": { + "type": "NameExpression", + "name": "boolean" + } }, - "name": "options" + "name": "options.failOnError", + "default": "false" }, { "title": "param", - "description": "Fetch next records if available.", + "description": "Polling interval in milliseconds.", "type": { "type": "OptionalType", "expression": { "type": "NameExpression", - "name": "boolean" + "name": "integer" } }, - "name": "options.autoFetch", - "default": "false" + "name": "options.pollInterval", + "default": "6000" }, { "title": "param", - "description": "A callback to execute once the record is retrieved", + "description": "Polling timeout in milliseconds.", "type": { - "type": "NameExpression", - "name": "function" + "type": "OptionalType", + "expression": { + "type": "NameExpression", + "name": "integer" + } }, - "name": "callback" + "name": "options.pollTimeout", + "default": "240000" }, { "title": "returns", @@ -286,8 +147,7 @@ "name": "bulkQuery", "params": [ "qs", - "options", - "callback" + "options" ], "docs": { "description": "Execute an SOQL Bulk Query.\nThis function uses bulk query to efficiently query large data sets and reduce the number of API requests.\n`bulkQuery()` uses {@link https://sforce.co/4azgczz Bulk API v.2.0 Query} which is available in API version 47.0 and later.\nThis API is subject to {@link https://sforce.co/4b6kn6z rate limits}.", @@ -355,15 +215,6 @@ "name": "options.pollInterval", "default": "3000" }, - { - "title": "param", - "description": "A callback to execute once the record is retrieved", - "type": { - "type": "NameExpression", - "name": "function" - }, - "name": "callback" - }, { "title": "returns", "description": null, @@ -377,15 +228,13 @@ "valid": false }, { - "name": "bulk", + "name": "create", "params": [ - "sObject", - "operation", - "options", + "sObjectName", "records" ], "docs": { - "description": "Create and execute a bulk job.", + "description": "Create a new sObject record(s).", "tags": [ { "title": "public", @@ -394,13 +243,13 @@ }, { "title": "example", - "description": "bulk(\n \"Patient__c\",\n \"insert\",\n { failOnError: true },\n (state) => state.someArray.map((x) => ({ Age__c: x.age, Name: x.name }))\n);", - "caption": "Bulk insert" + "description": "create(\"Account\", { Name: \"My Account #1\" });", + "caption": "Single record creation" }, { "title": "example", - "description": "bulk(\n \"vera__Beneficiary__c\",\n \"upsert\",\n { extIdField: \"vera__Result_UID__c\" },\n [\n {\n vera__Reporting_Period__c: 2023,\n vera__Geographic_Area__c: \"Uganda\",\n \"vera__Indicator__r.vera__ExtId__c\": 1001,\n vera__Result_UID__c: \"1001_2023_Uganda\",\n },\n ]\n);", - "caption": "Bulk upsert" + "description": "create(\"Account\",[{ Name: \"My Account #1\" }, { Name: \"My Account #2\" }]);", + "caption": "Multiple records creation" }, { "title": "function", @@ -414,85 +263,68 @@ "type": "NameExpression", "name": "string" }, - "name": "sObject" + "name": "sObjectName" }, { "title": "param", - "description": "The bulk operation to be performed.Eg \"insert\" | \"update\" | \"upsert\"", + "description": "Field attributes for the new record.", "type": { "type": "NameExpression", - "name": "string" + "name": "object" }, - "name": "operation" + "name": "records" }, { - "title": "param", - "description": "Options passed to the bulk api.", + "title": "returns", + "description": null, "type": { "type": "NameExpression", - "name": "object" - }, - "name": "options" + "name": "Operation" + } + } + ] + }, + "valid": true + }, + { + "name": "describe", + "params": [ + "sObjectName" + ], + "docs": { + "description": "Fetches and prints metadata for an sObject and pushes the result to `state.data`.\nIf `sObjectName` is not specified, it will print the total number of all available sObjects and push the result to `state.data`.", + "tags": [ + { + "title": "public", + "description": null, + "type": null }, { - "title": "param", - "description": "Polling timeout in milliseconds.", - "type": { - "type": "OptionalType", - "expression": { - "type": "NameExpression", - "name": "integer" - } - }, - "name": "options.pollTimeout", - "default": "240000" + "title": "example", + "description": "describe()", + "caption": "Fetch metadata for all available sObjects" }, { - "title": "param", - "description": "Polling interval in milliseconds.", - "type": { - "type": "OptionalType", - "expression": { - "type": "NameExpression", - "name": "integer" - } - }, - "name": "options.pollInterval", - "default": "6000" + "title": "example", + "description": "describe('Account')", + "caption": "Fetch metadata for Account sObject" }, { - "title": "param", - "description": "External id field.", - "type": { - "type": "OptionalType", - "expression": { - "type": "NameExpression", - "name": "string" - } - }, - "name": "options.extIdField" + "title": "function", + "description": null, + "name": null }, { "title": "param", - "description": "Fail the operation on error.", + "description": "The API name of the sObject. If omitted, fetches metadata for all sObjects.", "type": { "type": "OptionalType", "expression": { "type": "NameExpression", - "name": "boolean" + "name": "string" } }, - "name": "options.failOnError", - "default": "false" - }, - { - "title": "param", - "description": "an array of records, or a function which returns an array.", - "type": { - "type": "NameExpression", - "name": "array" - }, - "name": "records" + "name": "sObjectName" }, { "title": "returns", @@ -504,13 +336,13 @@ } ] }, - "valid": false + "valid": true }, { "name": "destroy", "params": [ - "sObject", - "attrs", + "sObjectName", + "ids", "options" ], "docs": { @@ -523,7 +355,7 @@ }, { "title": "example", - "description": "destroy('obj_name', [\n '0060n00000JQWHYAA5',\n '0090n00000JQEWHYAA5\n], { failOnError: true })" + "description": "destroy('obj_name', [\n '0060n00000JQWHYAA5',\n '0090n00000JQEWHYAA5'\n], { failOnError: true })" }, { "title": "function", @@ -537,7 +369,7 @@ "type": "NameExpression", "name": "string" }, - "name": "sObject" + "name": "sObjectName" }, { "title": "param", @@ -546,7 +378,7 @@ "type": "NameExpression", "name": "object" }, - "name": "attrs" + "name": "ids" }, { "title": "param", @@ -570,13 +402,13 @@ "valid": true }, { - "name": "create", + "name": "insert", "params": [ - "sObject", - "attrs" + "sObjectName", + "records" ], "docs": { - "description": "Create a new sObject record(s).", + "description": "Alias for \"create(sObjectName, attrs)\".", "tags": [ { "title": "public", @@ -585,12 +417,12 @@ }, { "title": "example", - "description": "create(\"Account\", { Name: \"My Account #1\" });", + "description": "insert(\"Account\", { Name: \"My Account #1\" });", "caption": "Single record creation" }, { "title": "example", - "description": "create(\"Account\",[{ Name: \"My Account #1\" }, { Name: \"My Account #2\" }]);", + "description": "insert(\"Account\",[{ Name: \"My Account #1\" }, { Name: \"My Account #2\" }]);", "caption": "Multiple records creation" }, { @@ -605,7 +437,7 @@ "type": "NameExpression", "name": "string" }, - "name": "sObject" + "name": "sObjectName" }, { "title": "param", @@ -614,7 +446,7 @@ "type": "NameExpression", "name": "object" }, - "name": "attrs" + "name": "records" }, { "title": "returns", @@ -629,13 +461,13 @@ "valid": true }, { - "name": "insert", + "name": "query", "params": [ - "sObject", - "attrs" + "qs", + "options" ], "docs": { - "description": "Alias for \"create(sObject, attrs)\".", + "description": "Execute an SOQL query.\nNote that in an event of a query error,\nerror logs will be printed but the operation will not throw the error.\n\nThe Salesforce query API is subject to rate limits, {@link https://sforce.co/3W9zyaQ See for more details}.", "tags": [ { "title": "public", @@ -644,13 +476,12 @@ }, { "title": "example", - "description": "insert(\"Account\", { Name: \"My Account #1\" });", - "caption": "Single record creation" + "description": "query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`);" }, { "title": "example", - "description": "insert(\"Account\",[{ Name: \"My Account #1\" }, { Name: \"My Account #2\" }]);", - "caption": "Multiple records creation" + "description": "query(state=> `SELECT Id FROM Patient__c WHERE Health_ID__c = '${state.data.field1}'`, { autoFetch: true });", + "caption": "Query more records if next records are available" }, { "title": "function", @@ -659,21 +490,34 @@ }, { "title": "param", - "description": "API name of the sObject.", + "description": "A query string. Must be less than `4000` characters in WHERE clause", "type": { "type": "NameExpression", "name": "string" }, - "name": "sObject" + "name": "qs" }, { "title": "param", - "description": "Field attributes for the new record.", + "description": "Options passed to the bulk api.", "type": { "type": "NameExpression", "name": "object" }, - "name": "attrs" + "name": "options" + }, + { + "title": "param", + "description": "Fetch next records if available.", + "type": { + "type": "OptionalType", + "expression": { + "type": "NameExpression", + "name": "boolean" + } + }, + "name": "options.autoFetch", + "default": "false" }, { "title": "returns", @@ -685,14 +529,14 @@ } ] }, - "valid": true + "valid": false }, { "name": "upsert", "params": [ - "sObject", + "sObjectName", "externalId", - "attrs" + "records" ], "docs": { "description": "Create a new sObject record, or updates it if it already exists\nExternal ID field name must be specified in second argument.", @@ -724,11 +568,11 @@ "type": "NameExpression", "name": "string" }, - "name": "sObject" + "name": "sObjectName" }, { "title": "magic", - "description": "sObject - $.children[?(!@.meta.system)].name" + "description": "sObjectName - $.children[?(!@.meta.system)].name" }, { "title": "param", @@ -768,11 +612,11 @@ } ] }, - "name": "attrs" + "name": "records" }, { "title": "magic", - "description": "attrs - $.children[?(@.name==\"{{args.sObject}}\")].children[?(!@.meta.externalId)]" + "description": "records - $.children[?(@.name==\"{{args.sObject}}\")].children[?(!@.meta.externalId)]" }, { "title": "returns", @@ -787,15 +631,13 @@ "valid": true }, { - "name": "upsertIf", + "name": "update", "params": [ - "logical", - "sObject", - "externalId", - "attrs" + "sObjectName", + "records" ], "docs": { - "description": "Conditionally create a new sObject record, or updates it if it already exists\n\n**The `upsertIf()` function has been deprecated. Use `fnIf(condition,upsert())` instead.**", + "description": "Update an sObject record or records.", "tags": [ { "title": "public", @@ -804,22 +646,19 @@ }, { "title": "example", - "description": "upsertIf(true, 'obj_name', 'ext_id', {\n attr1: \"foo\",\n attr2: \"bar\"\n})" + "description": "update(\"Account\", {\n Id: \"0010500000fxbcuAAA\",\n Name: \"Updated Account #1\",\n});", + "caption": "Single record update" + }, + { + "title": "example", + "description": "update(\"Account\", [\n { Id: \"0010500000fxbcuAAA\", Name: \"Updated Account #1\" },\n { Id: \"0010500000fxbcvAAA\", Name: \"Updated Account #2\" },\n]);", + "caption": "Multiple records update" }, { "title": "function", "description": null, "name": null }, - { - "title": "param", - "description": "a logical statement that will be evaluated.", - "type": { - "type": "NameExpression", - "name": "boolean" - }, - "name": "logical" - }, { "title": "param", "description": "API name of the sObject.", @@ -827,16 +666,7 @@ "type": "NameExpression", "name": "string" }, - "name": "sObject" - }, - { - "title": "param", - "description": "ID.", - "type": { - "type": "NameExpression", - "name": "string" - }, - "name": "externalId" + "name": "sObjectName" }, { "title": "param", @@ -863,7 +693,7 @@ } ] }, - "name": "attrs" + "name": "records" }, { "title": "returns", @@ -878,13 +708,12 @@ "valid": true }, { - "name": "update", + "name": "toUTF8", "params": [ - "sObject", - "attrs" + "input" ], "docs": { - "description": "Update an sObject record or records.", + "description": "Transliterates unicode characters to their best ASCII representation", "tags": [ { "title": "public", @@ -893,61 +722,23 @@ }, { "title": "example", - "description": "update(\"Account\", {\n Id: \"0010500000fxbcuAAA\",\n Name: \"Updated Account #1\",\n});", - "caption": "Single record update" - }, - { - "title": "example", - "description": "update(\"Account\", [\n { Id: \"0010500000fxbcuAAA\", Name: \"Updated Account #1\" },\n { Id: \"0010500000fxbcvAAA\", Name: \"Updated Account #2\" },\n]);", - "caption": "Multiple records update" - }, - { - "title": "function", - "description": null, - "name": null + "description": "fn((state) => {\n const s = toUTF8(\"άνθρωποι\");\n console.log(s); // anthropoi\n return state;\n});" }, { "title": "param", - "description": "API name of the sObject.", + "description": "A string with unicode characters", "type": { "type": "NameExpression", "name": "string" }, - "name": "sObject" - }, - { - "title": "param", - "description": "Field attributes for the new object.", - "type": { - "type": "UnionType", - "elements": [ - { - "type": "NameExpression", - "name": "object" - }, - { - "type": "TypeApplication", - "expression": { - "type": "NameExpression", - "name": "Array" - }, - "applications": [ - { - "type": "NameExpression", - "name": "object" - } - ] - } - ] - }, - "name": "attrs" + "name": "input" }, { "title": "returns", - "description": null, + "description": "ASCII representation of input string", "type": { "type": "NameExpression", - "name": "Operation" + "name": "string" } } ] @@ -955,12 +746,13 @@ "valid": true }, { - "name": "reference", + "name": "retrieve", "params": [ - "position" + "sObjectName", + "id" ], "docs": { - "description": "Get a reference ID by an index.", + "description": "Retrieves a Salesforce sObject(s).", "tags": [ { "title": "public", @@ -969,7 +761,7 @@ }, { "title": "example", - "description": "reference(0)" + "description": "retrieve('ContentVersion', '0684K0000020Au7QAE/VersionData');" }, { "title": "function", @@ -978,57 +770,28 @@ }, { "title": "param", - "description": "Position for references array.", + "description": "The sObject to retrieve", "type": { "type": "NameExpression", - "name": "number" + "name": "string" }, - "name": "position" - }, - { - "title": "returns", - "description": null, - "type": { - "type": "NameExpression", - "name": "State" - } - } - ] - }, - "valid": true - }, - { - "name": "toUTF8", - "params": [ - "input" - ], - "docs": { - "description": "Transliterates unicode characters to their best ASCII representation", - "tags": [ - { - "title": "public", - "description": null, - "type": null - }, - { - "title": "example", - "description": "fn((state) => {\n const s = toUTF8(\"άνθρωποι\");\n console.log(s); // anthropoi\n return state;\n});" + "name": "sObjectName" }, { "title": "param", - "description": "A string with unicode characters", + "description": "The id of the record", "type": { "type": "NameExpression", "name": "string" }, - "name": "input" + "name": "id" }, { "title": "returns", - "description": "ASCII representation of input string", + "description": null, "type": { "type": "NameExpression", - "name": "string" + "name": "Operation" } } ] diff --git a/packages/salesforce/src/Adaptor.js b/packages/salesforce/src/Adaptor.js index 97e0b1852..0499a6d27 100644 --- a/packages/salesforce/src/Adaptor.js +++ b/packages/salesforce/src/Adaptor.js @@ -13,13 +13,11 @@ import { execute as commonExecute, - expandReferences, composeNextState, - field, chunk, } from '@openfn/language-common'; -import { expandReferences as newExpandReferences } from '@openfn/language-common/util'; +import { expandReferences } from '@openfn/language-common/util'; import * as util from './Utils'; import flatten from 'lodash/flatten'; diff --git a/packages/salesforce/src/Utils.js b/packages/salesforce/src/Utils.js index 62e47e85b..4b544af4b 100644 --- a/packages/salesforce/src/Utils.js +++ b/packages/salesforce/src/Utils.js @@ -1,4 +1,5 @@ import jsforce from 'jsforce'; +import { composeNextState } from '@openfn/language-common'; function getConnection(state, options) { const { apiVersion } = state.configuration; @@ -134,3 +135,13 @@ export async function pollJobResult(conn, job, pollInterval, pollTimeout) { attempt++; } } + +export const prepareNextState = (state, fullResult) => { + const { records, ...result } = fullResult; + + // console.log(fullResult); + return { + ...composeNextState(state, records || result), + result, + }; +}; From 7771b603a3fb7682e9eea8a67afa6e2ec815ce06 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Fri, 19 Jul 2024 18:31:47 +0300 Subject: [PATCH 05/14] update prepNextState function --- packages/salesforce/src/Utils.js | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/salesforce/src/Utils.js b/packages/salesforce/src/Utils.js index 4b544af4b..d12303ab4 100644 --- a/packages/salesforce/src/Utils.js +++ b/packages/salesforce/src/Utils.js @@ -136,12 +136,9 @@ export async function pollJobResult(conn, job, pollInterval, pollTimeout) { } } -export const prepareNextState = (state, fullResult) => { - const { records, ...result } = fullResult; - - // console.log(fullResult); +export const prepareNextState = (state, result, data) => { return { - ...composeNextState(state, records || result), + ...composeNextState(state, data ?? result), result, }; }; From 4766ba9d71323df025788e33af7d9f15d05b419d Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Fri, 19 Jul 2024 18:32:52 +0300 Subject: [PATCH 06/14] fix broken test --- packages/salesforce/test/Adaptor.test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/salesforce/test/Adaptor.test.js b/packages/salesforce/test/Adaptor.test.js index 1161fbbe5..7274fb416 100644 --- a/packages/salesforce/test/Adaptor.test.js +++ b/packages/salesforce/test/Adaptor.test.js @@ -26,7 +26,7 @@ describe('Adaptor', () => { .then(state => { expect(spy.args[0]).to.eql([sObject, fields]); expect(spy.called).to.eql(true); - expect(state.references[0]).to.eql({ Id: 10 }); + expect(state.data).to.eql({ Id: 10 }); }) .then(done) .catch(done); @@ -56,7 +56,7 @@ describe('Adaptor', () => { .then(state => { expect(spy.args[0]).to.eql([sObject, fields, externalId]); expect(spy.called).to.eql(true); - expect(state.references[0]).to.eql({ Id: 10 }); + expect(state.data).to.eql({ Id: 10 }); }) .then(done) .catch(done); From 59721be4a3709d74a82c06b63881cbf1bb03a9a1 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Mon, 22 Jul 2024 12:24:09 +0300 Subject: [PATCH 07/14] add changeset --- .changeset/gentle-oranges-crash.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .changeset/gentle-oranges-crash.md diff --git a/.changeset/gentle-oranges-crash.md b/.changeset/gentle-oranges-crash.md new file mode 100644 index 000000000..6c649b564 --- /dev/null +++ b/.changeset/gentle-oranges-crash.md @@ -0,0 +1,24 @@ +--- +'@openfn/language-salesforce': major +--- + +New API design for salesforce, including adding composeNextState and removing +old code. + +### Major Changes + +- Remove axios dependency +- Remove old/unused functions. `relationship`, `upsertIf`, `createIf`, + `reference`, `steps`, `beta` +- Standardize state mutation in all operation +- Change `bulk` signature to `bulk(operation, sObjectName, records, options)` +- Remove callback support + +### Minor Changes + +- Create a get and post function for all http requests in salesforce + +### Patch Changes + +- Change `cleanupState` to `removeConnection` and tagged it as private function +- Rename `attrs` to `records` From 50981d912081f6244f9fce99e8545f1be2bad74c Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Tue, 23 Jul 2024 09:09:53 +0300 Subject: [PATCH 08/14] wip: add migration guide --- .changeset/gentle-oranges-crash.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.changeset/gentle-oranges-crash.md b/.changeset/gentle-oranges-crash.md index 6c649b564..c9427c43b 100644 --- a/.changeset/gentle-oranges-crash.md +++ b/.changeset/gentle-oranges-crash.md @@ -9,7 +9,7 @@ old code. - Remove axios dependency - Remove old/unused functions. `relationship`, `upsertIf`, `createIf`, - `reference`, `steps`, `beta` + `reference`, `steps`, `beta`, `describeAll()` - Standardize state mutation in all operation - Change `bulk` signature to `bulk(operation, sObjectName, records, options)` - Remove callback support @@ -22,3 +22,5 @@ old code. - Change `cleanupState` to `removeConnection` and tagged it as private function - Rename `attrs` to `records` + +### Migration Guide From b4a9c4275996526b2389013de25b14e3b81132f1 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Tue, 23 Jul 2024 11:10:42 +0300 Subject: [PATCH 09/14] breakdown changeset into individual updates --- .changeset/gentle-oranges-crash.md | 21 ++++++++++++--------- .changeset/lucky-chicken-notice.md | 7 +++++++ .changeset/wise-trees-carry.md | 6 ++++++ 3 files changed, 25 insertions(+), 9 deletions(-) create mode 100644 .changeset/lucky-chicken-notice.md create mode 100644 .changeset/wise-trees-carry.md diff --git a/.changeset/gentle-oranges-crash.md b/.changeset/gentle-oranges-crash.md index c9427c43b..73c670856 100644 --- a/.changeset/gentle-oranges-crash.md +++ b/.changeset/gentle-oranges-crash.md @@ -5,8 +5,6 @@ New API design for salesforce, including adding composeNextState and removing old code. -### Major Changes - - Remove axios dependency - Remove old/unused functions. `relationship`, `upsertIf`, `createIf`, `reference`, `steps`, `beta`, `describeAll()` @@ -14,13 +12,18 @@ old code. - Change `bulk` signature to `bulk(operation, sObjectName, records, options)` - Remove callback support -### Minor Changes - -- Create a get and post function for all http requests in salesforce - -### Patch Changes +### Migration Guide -- Change `cleanupState` to `removeConnection` and tagged it as private function -- Rename `attrs` to `records` +- Use `describe()` instead of `describeAll()`. +- Use `fnIf(true, upsert())` instead of `upsertIf()` +- Use `fnIf(true, create())` instead of `createIf()` +- Use `bulk(operation, sObjectName, records, options )` instead of + `bulk(operation, sObject, options, records)` ### Migration Guide + +- Replace `describeAll()` with `describe()`. +- Replace `upsertIf()` with `fnIf(true, upsert())`. +- Replace `createIf()` with `fnIf(true, create())`. +- Replace `bulk(operation, sObject, options, records)` with + `bulk(operation, sObjectName, records, options)`. diff --git a/.changeset/lucky-chicken-notice.md b/.changeset/lucky-chicken-notice.md new file mode 100644 index 000000000..0adc131bb --- /dev/null +++ b/.changeset/lucky-chicken-notice.md @@ -0,0 +1,7 @@ +--- +'@openfn/language-salesforce': minor +--- + +- Create a `get()` and `post()` function for all http requests in salesforce + tions, records)` +- Update `describe()` to fetch all available sObjects metadata diff --git a/.changeset/wise-trees-carry.md b/.changeset/wise-trees-carry.md new file mode 100644 index 000000000..8bac0ed1f --- /dev/null +++ b/.changeset/wise-trees-carry.md @@ -0,0 +1,6 @@ +--- +'@openfn/language-salesforce': patch +--- + +- Change `cleanupState` to `removeConnection` and tagged it as private function +- Rename `attrs` to `records` From 77d89e023ab3908b9fd1a02d474e02fe2031add9 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Tue, 23 Jul 2024 11:39:54 +0300 Subject: [PATCH 10/14] remove duplicate --- .changeset/gentle-oranges-crash.md | 8 -------- 1 file changed, 8 deletions(-) diff --git a/.changeset/gentle-oranges-crash.md b/.changeset/gentle-oranges-crash.md index 73c670856..ba7d83753 100644 --- a/.changeset/gentle-oranges-crash.md +++ b/.changeset/gentle-oranges-crash.md @@ -14,14 +14,6 @@ old code. ### Migration Guide -- Use `describe()` instead of `describeAll()`. -- Use `fnIf(true, upsert())` instead of `upsertIf()` -- Use `fnIf(true, create())` instead of `createIf()` -- Use `bulk(operation, sObjectName, records, options )` instead of - `bulk(operation, sObject, options, records)` - -### Migration Guide - - Replace `describeAll()` with `describe()`. - Replace `upsertIf()` with `fnIf(true, upsert())`. - Replace `createIf()` with `fnIf(true, create())`. From d2c609d42a70df77140e5155f06bb9194b057391 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Tue, 23 Jul 2024 14:07:53 +0300 Subject: [PATCH 11/14] improve next state composition --- packages/salesforce/src/Utils.js | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/salesforce/src/Utils.js b/packages/salesforce/src/Utils.js index d12303ab4..1cb7e5ef8 100644 --- a/packages/salesforce/src/Utils.js +++ b/packages/salesforce/src/Utils.js @@ -136,9 +136,4 @@ export async function pollJobResult(conn, job, pollInterval, pollTimeout) { } } -export const prepareNextState = (state, result, data) => { - return { - ...composeNextState(state, data ?? result), - result, - }; -}; +export const prepareNextState = (state, data) => composeNextState(state, data); From 11c5c05588d38701678da6984b0848e2b84a4e47 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Tue, 23 Jul 2024 17:24:35 +0300 Subject: [PATCH 12/14] fix lookup test --- packages/salesforce/test/metadata/lookup.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/salesforce/test/metadata/lookup.test.js b/packages/salesforce/test/metadata/lookup.test.js index 7e8a96d47..f14993b65 100644 --- a/packages/salesforce/test/metadata/lookup.test.js +++ b/packages/salesforce/test/metadata/lookup.test.js @@ -15,7 +15,7 @@ describe('Salesforce lookup tests', async () => { // Unit tests of each query against the cached metadata describe('upsert', () => { it('sObject: should list non-system sObject names', () => { - const results = jp.query(data, queries.upsert.sObject); + const results = jp.query(data, queries.upsert.sObjectName); // Note that there are two sobjects in the model - this should correctly just return 1 expect(results).to.have.lengthOf(1); expect(results).to.include('vera__Beneficiary__c'); From 3f56c875ae5f6803988f7d01c6501920da42596e Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Thu, 1 Aug 2024 12:22:26 +0300 Subject: [PATCH 13/14] remove prepareNextState --- packages/salesforce/src/Utils.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/salesforce/src/Utils.js b/packages/salesforce/src/Utils.js index 1cb7e5ef8..62e47e85b 100644 --- a/packages/salesforce/src/Utils.js +++ b/packages/salesforce/src/Utils.js @@ -1,5 +1,4 @@ import jsforce from 'jsforce'; -import { composeNextState } from '@openfn/language-common'; function getConnection(state, options) { const { apiVersion } = state.configuration; @@ -135,5 +134,3 @@ export async function pollJobResult(conn, job, pollInterval, pollTimeout) { attempt++; } } - -export const prepareNextState = (state, data) => composeNextState(state, data); From ff1081b30dbf3526b950f5348669b4c5fc3b40d7 Mon Sep 17 00:00:00 2001 From: Emmanuel Evance Date: Thu, 1 Aug 2024 12:27:29 +0300 Subject: [PATCH 14/14] fix query autofetch --- packages/salesforce/ast.json | 12 +++- packages/salesforce/src/Adaptor.js | 98 +++++++++++++++++------------- 2 files changed, 66 insertions(+), 44 deletions(-) diff --git a/packages/salesforce/ast.json b/packages/salesforce/ast.json index 5cb28c10f..152355eca 100644 --- a/packages/salesforce/ast.json +++ b/packages/salesforce/ast.json @@ -464,7 +464,8 @@ "name": "query", "params": [ "qs", - "options" + "options", + "callback" ], "docs": { "description": "Execute an SOQL query.\nNote that in an event of a query error,\nerror logs will be printed but the operation will not throw the error.\n\nThe Salesforce query API is subject to rate limits, {@link https://sforce.co/3W9zyaQ See for more details}.", @@ -519,6 +520,15 @@ "name": "options.autoFetch", "default": "false" }, + { + "title": "param", + "description": "A callback to execute once the record is retrieved", + "type": { + "type": "NameExpression", + "name": "function" + }, + "name": "callback" + }, { "title": "returns", "description": null, diff --git a/packages/salesforce/src/Adaptor.js b/packages/salesforce/src/Adaptor.js index 0499a6d27..9d96cbf63 100644 --- a/packages/salesforce/src/Adaptor.js +++ b/packages/salesforce/src/Adaptor.js @@ -458,63 +458,75 @@ export function post(path, data, options = {}) { * @param {string} qs - A query string. Must be less than `4000` characters in WHERE clause * @param {object} options - Options passed to the bulk api. * @param {boolean} [options.autoFetch=false] - Fetch next records if available. + * @param {function} callback - A callback to execute once the record is retrieved * @returns {Operation} */ -export function query(qs, options = {}) { +export function query(qs, options = {}, callback = s => s) { return async state => { - let done = false; - let qResult = null; - let result = []; - const { connection } = state; const [resolvedQs, resolvedOptions] = expandReferences(state, qs, options); - const { autoFetch = false } = resolvedOptions; - console.log(`Executing query: ${resolvedQs}`); + const autoFetch = resolvedOptions.autoFetch || resolvedOptions.autofetch; + + if (autoFetch) { + console.log('autoFetch is enabled: all records will be downloaded'); + } + + const result = { + done: true, + totalSize: 0, + records: [], + }; + + const processRecords = async res => { + const { done, totalSize, records, nextRecordsUrl } = res; + + result.done = done; + result.totalSize = totalSize; + result.records.push(...records); + + if (!done && !autoFetch && nextRecordsUrl) { + result.nextRecordsUrl = nextRecordsUrl; + } + if (!done && autoFetch) { + console.log('Fetched records so far:', result.records.length); + console.log('Fetching next records...'); + + try { + const newResult = await connection.request({ url: nextRecordsUrl }); + await processRecords(newResult); + } catch (err) { + const { message, errorCode } = err; + console.error(`Error ${errorCode}: ${message}`); + throw err; + } + } + }; + try { - qResult = await connection.query(resolvedQs); + const qResult = await connection.query(resolvedQs); + if (qResult.totalSize > 0) { + console.log('Total records:', qResult.totalSize); + await processRecords(qResult); + console.log('Done ✔ retrieved records:', result.records.length); + } else { + console.log('No records found.'); + } } catch (err) { const { message, errorCode } = err; console.log(`Error ${errorCode}: ${message}`); throw err; } - if (qResult.totalSize > 0) { - console.log('Total records', qResult.totalSize); - - while (!done) { - result.push(qResult); - - if (qResult.done) { - done = true; - } else if (autoFetch) { - console.log( - 'Fetched records so far', - result.map(ref => ref.records).flat().length - ); - console.log('Fetching next records...'); - try { - qResult = await connection.request({ url: qResult.nextRecordsUrl }); - } catch (err) { - const { message, errorCode } = err; - console.log(`Error ${errorCode}: ${message}`); - throw err; - } - } else { - done = true; - } - } - - console.log( - 'Done ✔ retrieved records', - result.map(ref => ref.records).flat().length - ); - } else { - result.push(qResult); - console.log('No records found.'); - } + console.log( + 'Results retrieved and pushed to position [0] of the references array.' + ); - return composeNextState(state, result, result?.records); + const nextState = { + ...state, + references: [result, ...state.references], + }; + return callback(nextState); }; }