From 900fe1b6df13679f36111e8a33d55bbc535efc84 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Wed, 31 Jan 2024 11:13:49 +0100 Subject: [PATCH] Add persist API (#1575) The goal is to remove the database dependency for the runner. Writing customers data (batchSave/.../activityLogs/lastSyncDate) will go through this API --- .github/workflows/build-images.yaml | 7 + .github/workflows/deploy.yaml | 19 +- package-lock.json | 301 +++++++++++++++- package.json | 6 +- packages/cli/docker/docker-compose.yaml | 18 + packages/jobs/lib/runner/render.runner.ts | 5 +- packages/persist/.gitignore | 3 + packages/persist/Dockerfile | 19 + packages/persist/lib/app.ts | 11 + .../lib/controllers/persist.controller.ts | 340 ++++++++++++++++++ packages/persist/lib/server.ts | 87 +++++ packages/persist/lib/server.unit.test.ts | 196 ++++++++++ packages/persist/lib/tracer.ts | 11 + packages/persist/lib/utils/result.ts | 8 + packages/persist/nodemon.json | 6 + packages/persist/package.json | 34 ++ packages/persist/tsconfig.json | 9 + packages/shared/lib/models/Activity.ts | 3 +- packages/shared/lib/sdk/sync.ts | 167 +++++++-- .../shared/lib/services/sync/run.service.ts | 6 +- packages/shared/lib/utils/utils.ts | 4 + scripts/publish.sh | 2 +- tsconfig.build.json | 3 +- 23 files changed, 1213 insertions(+), 52 deletions(-) create mode 100644 packages/persist/.gitignore create mode 100644 packages/persist/Dockerfile create mode 100644 packages/persist/lib/app.ts create mode 100644 packages/persist/lib/controllers/persist.controller.ts create mode 100644 packages/persist/lib/server.ts create mode 100644 packages/persist/lib/server.unit.test.ts create mode 100644 packages/persist/lib/tracer.ts create mode 100644 packages/persist/lib/utils/result.ts create mode 100644 packages/persist/nodemon.json create mode 100644 packages/persist/package.json create mode 100644 packages/persist/tsconfig.json diff --git a/.github/workflows/build-images.yaml b/.github/workflows/build-images.yaml index 66f33f7bb6..3948e83b67 100644 --- a/.github/workflows/build-images.yaml +++ b/.github/workflows/build-images.yaml @@ -4,6 +4,13 @@ concurrency: group: pulls/${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true jobs: + nango-persist: + uses: ./.github/workflows/push-container.yaml + secrets: inherit + with: + package: persist + run-cmd: ts-build + tags: -t nangohq/nango-persist:$GITHUB_SHA ${{ github.ref == 'refs/heads/master' && '-t nangohq/nango-persist:enterprise-$GITHUB_SHA -t nangohq/nango-persist:enterprise -t nangohq/nango-persist:hosted-$GITHUB_SHA -t nangohq/nango-persist:hosted' || '' }} nango-jobs: uses: ./.github/workflows/push-container.yaml secrets: inherit diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index bafd24fc09..e55fd3db6a 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -20,6 +20,7 @@ on: - server - jobs - runner + - persist jobs: deploy_server: @@ -76,4 +77,20 @@ jobs: RUNNER_OWNER_ID: ${{ secrets.RENDER_RUNNER_OWNER_ID }} run: | bash ./scripts/deploy/runners.bash - + depoy_persist: + if: inputs.service == 'persist' + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - name: Push tag + run: | + docker buildx imagetools create nangohq/nango-persist:${{ github.sha }} --tag nangohq/nango-persist:${{ inputs.stage }} + - name: Deploy persist + run: | + SERVICE_ID=${{ fromJson('{ production: "srv-cmt150ol6cac73apstq0", staging: "srv-cmsfiqqcn0vc73bhcod0" }')[inputs.stage] }} + curl --request POST "https://api.render.com/v1/services/$SERVICE_ID/deploys" --header "authorization: Bearer ${{ secrets.RENDER_API_KEY }}" diff --git a/package-lock.json b/package-lock.json index 39719bf461..f4d81c310e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "packages/node-client", "packages/server", "packages/runner", + "packages/persist", "packages/jobs" ], "dependencies": { @@ -2825,6 +2826,18 @@ "node": ">=12.0.0" } }, + "node_modules/@datadog/native-appsec": { + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/@datadog/native-appsec/-/native-appsec-7.0.0.tgz", + "integrity": "sha512-bywstWFW2hWxzPuS0+mFMVHHL0geulx5yQFtsjfszaH2LTAgk2D+Rt40MKbAoZ8q3tRw2dy6aYQ7svO3ca8jpA==", + "hasInstallScript": true, + "dependencies": { + "node-gyp-build": "^3.9.0" + }, + "engines": { + "node": ">=14" + } + }, "node_modules/@datadog/native-iast-rewriter": { "version": "2.2.1", "license": "Apache-2.0", @@ -2872,6 +2885,22 @@ "node": ">=12" } }, + "node_modules/@datadog/pprof": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/@datadog/pprof/-/pprof-5.0.0.tgz", + "integrity": "sha512-vhNan4SBuNWLpexunDJQ+hNbRAgWdk2qy5Iyh7Nn94uSSHXigAJMAvu4jwMKKQKFfchtobOkWT8GQUWW3tgpFg==", + "hasInstallScript": true, + "dependencies": { + "delay": "^5.0.0", + "node-gyp-build": "<4.0", + "p-limit": "^3.1.0", + "pprof-format": "^2.0.7", + "source-map": "^0.7.4" + }, + "engines": { + "node": ">=14" + } + }, "node_modules/@datadog/sketches-js": { "version": "2.1.0", "license": "Apache-2.0" @@ -3418,6 +3447,10 @@ "resolved": "packages/jobs", "link": true }, + "node_modules/@nangohq/nango-persist": { + "resolved": "packages/persist", + "link": true + }, "node_modules/@nangohq/nango-runner": { "resolved": "packages/runner", "link": true @@ -5135,7 +5168,6 @@ }, "node_modules/@types/body-parser": { "version": "1.19.2", - "dev": true, "license": "MIT", "dependencies": { "@types/connect": "*", @@ -5180,7 +5212,6 @@ }, "node_modules/@types/connect": { "version": "3.4.35", - "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -5260,7 +5291,6 @@ }, "node_modules/@types/express": { "version": "4.17.14", - "dev": true, "license": "MIT", "dependencies": { "@types/body-parser": "*", @@ -5271,7 +5301,6 @@ }, "node_modules/@types/express-serve-static-core": { "version": "4.17.31", - "dev": true, "license": "MIT", "dependencies": { "@types/node": "*", @@ -5374,7 +5403,6 @@ }, "node_modules/@types/mime": { "version": "3.0.1", - "dev": true, "license": "MIT" }, "node_modules/@types/minimatch": { @@ -5382,6 +5410,15 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/mock-knex": { + "version": "0.4.8", + "resolved": "https://registry.npmjs.org/@types/mock-knex/-/mock-knex-0.4.8.tgz", + "integrity": "sha512-xRoaH9GmsgP5JBdMadzJSg/63HCifgJZsWmCJ5Z1rA36Fg3Y7Yb03dMzMIk5sHnBWcPkWqY/zyDO4nStI+Frbg==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/ms": { "version": "0.7.31", "dev": true, @@ -5473,12 +5510,10 @@ }, "node_modules/@types/qs": { "version": "6.9.7", - "dev": true, "license": "MIT" }, "node_modules/@types/range-parser": { "version": "1.2.4", - "dev": true, "license": "MIT" }, "node_modules/@types/readdir-glob": { @@ -5496,7 +5531,6 @@ }, "node_modules/@types/serve-static": { "version": "1.15.0", - "dev": true, "license": "MIT", "dependencies": { "@types/mime": "*", @@ -6509,10 +6543,11 @@ "license": "MIT" }, "node_modules/axios": { - "version": "1.6.1", - "license": "MIT", + "version": "1.6.7", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.6.7.tgz", + "integrity": "sha512-/hDJGff6/c7u0hDkvkGxR/oy6CbCs8ziCsC7SqmhjfozqiJGc8Z11wrv9z9lYfY4K8l+H9TpjcMDX0xOZmx+RA==", "dependencies": { - "follow-redirects": "^1.15.0", + "follow-redirects": "^1.15.4", "form-data": "^4.0.0", "proxy-from-env": "^1.1.0" } @@ -7584,6 +7619,15 @@ "type": "^1.0.1" } }, + "node_modules/data-uri-to-buffer": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/data-uri-to-buffer/-/data-uri-to-buffer-4.0.1.tgz", + "integrity": "sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A==", + "dev": true, + "engines": { + "node": ">= 12" + } + }, "node_modules/datauri": { "version": "4.1.0", "license": "MIT", @@ -7625,6 +7669,94 @@ "node": ">=12.17" } }, + "node_modules/dd-trace": { + "version": "5.2.0", + "resolved": "https://registry.npmjs.org/dd-trace/-/dd-trace-5.2.0.tgz", + "integrity": "sha512-Z5ql3ZKzVW3DPstHPkTPcIPvKljHNtzTYY/WuZRlgT4XK7rMaN0j5nA8LlUh7m+tOPWs05IiKngbYVZjsqhRgA==", + "hasInstallScript": true, + "dependencies": { + "@datadog/native-appsec": "7.0.0", + "@datadog/native-iast-rewriter": "2.2.2", + "@datadog/native-iast-taint-tracking": "1.6.4", + "@datadog/native-metrics": "^2.0.0", + "@datadog/pprof": "5.0.0", + "@datadog/sketches-js": "^2.1.0", + "@opentelemetry/api": "^1.0.0", + "@opentelemetry/core": "^1.14.0", + "crypto-randomuuid": "^1.0.0", + "dc-polyfill": "^0.1.2", + "ignore": "^5.2.4", + "import-in-the-middle": "^1.7.3", + "int64-buffer": "^0.1.9", + "ipaddr.js": "^2.1.0", + "istanbul-lib-coverage": "3.2.0", + "jest-docblock": "^29.7.0", + "koalas": "^1.0.2", + "limiter": "1.1.5", + "lodash.sortby": "^4.7.0", + "lru-cache": "^7.14.0", + "methods": "^1.1.2", + "module-details-from-path": "^1.0.3", + "msgpack-lite": "^0.1.26", + "node-abort-controller": "^3.1.1", + "opentracing": ">=0.12.1", + "path-to-regexp": "^0.1.2", + "pprof-format": "^2.0.7", + "protobufjs": "^7.2.5", + "retry": "^0.13.1", + "semver": "^7.5.4", + "tlhunter-sorted-set": "^0.1.0" + }, + "engines": { + "node": ">=18" + } + }, + "node_modules/dd-trace/node_modules/@datadog/native-iast-rewriter": { + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/@datadog/native-iast-rewriter/-/native-iast-rewriter-2.2.2.tgz", + "integrity": "sha512-13ZBhJpjZ/tiV6rYfyAf/ITye9cyd3x12M/2NKhD4Ivev4N4uKBREAjpArOtzKtPXZ5b6oXwVV4ofT1SHoYyzA==", + "dependencies": { + "lru-cache": "^7.14.0", + "node-gyp-build": "^4.5.0" + }, + "engines": { + "node": ">= 10" + } + }, + "node_modules/dd-trace/node_modules/ipaddr.js": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.1.0.tgz", + "integrity": "sha512-LlbxQ7xKzfBusov6UMi4MFpEg0m+mAm9xyNGEduwXMEDuf4WfzB/RZwMVYEd7IKGvh4IUkEXYxtAVu9T3OelJQ==", + "engines": { + "node": ">= 10" + } + }, + "node_modules/dd-trace/node_modules/lru-cache": { + "version": "7.18.3", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-7.18.3.tgz", + "integrity": "sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==", + "engines": { + "node": ">=12" + } + }, + "node_modules/dd-trace/node_modules/node-gyp-build": { + "version": "4.8.0", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.8.0.tgz", + "integrity": "sha512-u6fs2AEUljNho3EYTJNBfImO5QTo/J/1Etd+NVdCj7qWKUSN/bSLkZwhDv7I+w/MSC6qJ4cknepkAYykDdK8og==", + "bin": { + "node-gyp-build": "bin.js", + "node-gyp-build-optional": "optional.js", + "node-gyp-build-test": "build-test.js" + } + }, + "node_modules/dd-trace/node_modules/retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", + "engines": { + "node": ">= 4" + } + }, "node_modules/debug": { "version": "4.3.4", "license": "MIT", @@ -7849,6 +7981,29 @@ "node": ">= 0.8" } }, + "node_modules/encoding": { + "version": "0.1.13", + "resolved": "https://registry.npmjs.org/encoding/-/encoding-0.1.13.tgz", + "integrity": "sha512-ETBauow1T35Y/WZMkio9jiM0Z5xjHHmJ4XmjZOq1l/dXz3lr2sRn87nJy20RupqSh1F2m3HHPSp8ShIPQJrJ3A==", + "optional": true, + "peer": true, + "dependencies": { + "iconv-lite": "^0.6.2" + } + }, + "node_modules/encoding/node_modules/iconv-lite": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", + "integrity": "sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==", + "optional": true, + "peer": true, + "dependencies": { + "safer-buffer": ">= 2.1.2 < 3.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/end-of-stream": { "version": "1.4.4", "dev": true, @@ -8494,6 +8649,38 @@ "version": "4.2.3", "license": "MIT" }, + "node_modules/fetch-blob": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/fetch-blob/-/fetch-blob-3.2.0.tgz", + "integrity": "sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/jimmywarting" + }, + { + "type": "paypal", + "url": "https://paypal.me/jimmywarting" + } + ], + "dependencies": { + "node-domexception": "^1.0.0", + "web-streams-polyfill": "^3.0.3" + }, + "engines": { + "node": "^12.20 || >= 14.13" + } + }, + "node_modules/fetch-blob/node_modules/web-streams-polyfill": { + "version": "3.3.2", + "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.3.2.tgz", + "integrity": "sha512-3pRGuxRF5gpuZc0W+EpwQRmCD7gRqcDOMt688KmdlDAgAyaB1XlN0zq2njfDNm44XVdIouE7pZ6GzbdyH47uIQ==", + "dev": true, + "engines": { + "node": ">= 8" + } + }, "node_modules/fetch-har": { "version": "8.1.5", "license": "ISC", @@ -8739,6 +8926,18 @@ "node": ">= 12.20" } }, + "node_modules/formdata-polyfill": { + "version": "4.0.10", + "resolved": "https://registry.npmjs.org/formdata-polyfill/-/formdata-polyfill-4.0.10.tgz", + "integrity": "sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==", + "dev": true, + "dependencies": { + "fetch-blob": "^3.1.2" + }, + "engines": { + "node": ">=12.20.0" + } + }, "node_modules/forwarded": { "version": "0.2.0", "license": "MIT", @@ -9148,8 +9347,9 @@ } }, "node_modules/import-in-the-middle": { - "version": "1.4.2", - "license": "Apache-2.0", + "version": "1.7.3", + "resolved": "https://registry.npmjs.org/import-in-the-middle/-/import-in-the-middle-1.7.3.tgz", + "integrity": "sha512-R2I11NRi0lI3jD2+qjqyVlVEahsejw7LDnYEbGb47QEFjczE3bZYsmWheCTQA+LFs2DzOQxR7Pms7naHW1V4bQ==", "dependencies": { "acorn": "^8.8.2", "acorn-import-assertions": "^1.9.0", @@ -10215,6 +10415,29 @@ "ufo": "^1.1.2" } }, + "node_modules/mock-knex": { + "version": "0.4.13", + "resolved": "https://registry.npmjs.org/mock-knex/-/mock-knex-0.4.13.tgz", + "integrity": "sha512-UmZlxiJH7bBdzjSWcrLJ1tnLfPNL7GfJO1IWL4sHWfMzLqdA3VAVWhotq1YiyE5NwVcrQdoXj3TGGjhTkBeIcQ==", + "dev": true, + "dependencies": { + "bluebird": "^3.4.1", + "lodash": "^4.14.2", + "semver": "^5.3.0" + }, + "peerDependencies": { + "knex": "> 0.8" + } + }, + "node_modules/mock-knex/node_modules/semver": { + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", + "dev": true, + "bin": { + "semver": "bin/semver" + } + }, "node_modules/module-details-from-path": { "version": "1.0.3", "license": "MIT" @@ -12931,6 +13154,11 @@ "node": ">=14.0.0" } }, + "node_modules/tlhunter-sorted-set": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/tlhunter-sorted-set/-/tlhunter-sorted-set-0.1.0.tgz", + "integrity": "sha512-eGYW4bjf1DtrHzUYxYfAcSytpOkA44zsr7G2n3PV7yOUR23vmkGe3LL4R+1jL9OsXtbsFOwe8XtbCrabeaEFnw==" + }, "node_modules/tmp": { "version": "0.0.33", "license": "MIT", @@ -14188,6 +14416,16 @@ "url": "https://github.com/sponsors/colinhacks" } }, + "node_modules/zod-express": { + "version": "0.0.8", + "resolved": "https://registry.npmjs.org/zod-express/-/zod-express-0.0.8.tgz", + "integrity": "sha512-zR0EQ6P12zox7v5piKTQCNIhgraVv8ev3Gkt5wxUlxt/3KUkEVrDyCYpfE0O5B2mTGAcfUktdP42dEV6rEKZYA==", + "peerDependencies": { + "@types/express": "^4.17.12", + "express": "^4.18.2", + "zod": "^3.21.4" + } + }, "packages/cli": { "name": "nango", "version": "0.37.9", @@ -14638,6 +14876,43 @@ "node": ">=16.7" } }, + "packages/persist": { + "name": "@nangohq/nango-persist", + "version": "1.0.0", + "license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY", + "dependencies": { + "@nangohq/shared": "^0.37.8", + "dd-trace": "^5.2.0", + "express": "^4.18.2", + "zod": "^3.22.4", + "zod-express": "^0.0.8" + }, + "devDependencies": { + "@types/mock-knex": "^0.4.8", + "@types/node": "^18.7.6", + "mock-knex": "^0.4.13", + "node-fetch": "^3.3.2", + "typescript": "^5.3.3" + } + }, + "packages/persist/node_modules/node-fetch": { + "version": "3.3.2", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-3.3.2.tgz", + "integrity": "sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==", + "dev": true, + "dependencies": { + "data-uri-to-buffer": "^4.0.0", + "fetch-blob": "^3.1.4", + "formdata-polyfill": "^4.0.10" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/node-fetch" + } + }, "packages/runner": { "name": "@nangohq/nango-runner", "version": "1.0.0", diff --git a/package.json b/package.json index 8f43b89873..8b19e081fd 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "packages/node-client", "packages/server", "packages/runner", + "packages/persist", "packages/jobs" ], "scripts": { @@ -18,7 +19,7 @@ "lint": "eslint . --ext .ts,.tsx", "lint:fix": "eslint . --ext .ts,.tsx --fix", "install:all": "npm i && cd packages/webapp && npm i && cd ../..", - "ts-build": "tsc -b --clean packages/shared packages/server packages/cli packages/runner packages/jobs && tsc -b tsconfig.build.json && npm run postbuild -ws --if-present && tsc -b packages/webapp/tsconfig.json", + "ts-build": "tsc -b --clean packages/shared packages/server packages/cli packages/runner packages/jobs packages/persist && tsc -b tsconfig.build.json && npm run postbuild -ws --if-present && tsc -b packages/webapp/tsconfig.json", "docker-build": "docker build -f packages/server/Dockerfile -t nango-server:latest .", "webapp-build:hosted": "cd ./packages/webapp && npm run build:hosted && cd ../..", "webapp-build:staging": "cd ./packages/webapp && npm run build:staging && cd ../..", @@ -33,13 +34,14 @@ "build:enterprise": "npm run install:all && npm run ts-build && npm run webapp-build:enterprise", "server:dev:watch": "cd ./packages/server && npm run dev", "jobs:dev:watch": "npm run dev -w @nangohq/nango-jobs", + "persist:dev:watch": "npm run dev -w @nangohq/nango-persist", "webapp:dev:watch": "cd ./packages/webapp && npm run start:hosted", "cli:watch": "cd ./packages/cli && npm run dev", "prepare": "husky install", "build:watch": "tsc -b -w tsconfig.build.json", "dev:watch": "concurrently --kill-others \"npm run build:watch\" \"npm run webapp-build:watch\" \"npm run prettier-watch\" \"npm run shared:watch\" \"npm run cli:watch\"", "watch:dev": "npm run dev:watch", - "dev:watch:apps": "concurrently --kill-others \"npm run server:dev:watch\" \"npm run webapp:dev:watch\" \"npm run jobs:dev:watch\"", + "dev:watch:apps": "concurrently --kill-others \"npm run server:dev:watch\" \"npm run webapp:dev:watch\" \"npm run jobs:dev:watch\" \"npm run persist:dev:watch\"", "watch:dev:apps": "npm run dev:watch:apps", "dw": "npm run dev:watch", "dwa": "npm run dev:watch:apps", diff --git a/packages/cli/docker/docker-compose.yaml b/packages/cli/docker/docker-compose.yaml index 5c69ff3159..265f3de1e3 100644 --- a/packages/cli/docker/docker-compose.yaml +++ b/packages/cli/docker/docker-compose.yaml @@ -68,6 +68,24 @@ services: networks: - nango + nango-persist: + image: nangohq/nango-persist:production + container_name: nango-persist + restart: always + ports: + - '${PERSIST_PORT:-3007}:${PERSIST_PORT:-3007}' + environment: + - NANGO_ENCRYPTION_KEY=${NANGO_ENCRYPTION_KEY} + - NANGO_DB_USER=${NANGO_DB_USER} + - NANGO_DB_PASSWORD=${NANGO_DB_PASSWORD} + - NANGO_DB_HOST=${NANGO_DB_HOST} + - NANGO_DB_NAME=${NANGO_DB_NAME} + - NANGO_DB_SSL=${NANGO_DB_SSL} + depends_on: + - nango-db + networks: + - nango + temporal: image: temporalio/auto-setup container_name: temporal diff --git a/packages/jobs/lib/runner/render.runner.ts b/packages/jobs/lib/runner/render.runner.ts index f7058b6eb1..9f179a6e83 100644 --- a/packages/jobs/lib/runner/render.runner.ts +++ b/packages/jobs/lib/runner/render.runner.ts @@ -1,7 +1,7 @@ import type { Runner } from './runner.js'; import { RunnerType } from './runner.js'; import { getRunnerClient } from '@nangohq/nango-runner'; -import { getEnv } from '@nangohq/shared'; +import { getEnv, getPersistAPIUrl } from '@nangohq/shared'; import api from 'api'; import tracer from '../tracer.js'; @@ -75,7 +75,8 @@ export class RenderRunner implements Runner { { key: 'NODE_OPTIONS', value: '--max-old-space-size=384' }, { key: 'RUNNER_ID', value: runnerId }, { key: 'NOTIFY_IDLE_ENDPOINT', value: `${jobsServiceUrl}/idle` }, - { key: 'IDLE_MAX_DURATION_MS', value: `${25 * 60 * 60 * 1000}` } // 25 hours + { key: 'IDLE_MAX_DURATION_MS', value: `${25 * 60 * 60 * 1000}` }, // 25 hours + { key: 'PERSIST_SERVICE_URL', value: getPersistAPIUrl() } ] }); svc = res.data.service; diff --git a/packages/persist/.gitignore b/packages/persist/.gitignore new file mode 100644 index 0000000000..8c3aba0aad --- /dev/null +++ b/packages/persist/.gitignore @@ -0,0 +1,3 @@ +tsconfig.tsbuildinfo +dist/* +node_modules diff --git a/packages/persist/Dockerfile b/packages/persist/Dockerfile new file mode 100644 index 0000000000..8f556f5bc6 --- /dev/null +++ b/packages/persist/Dockerfile @@ -0,0 +1,19 @@ +FROM node:18-slim + +RUN apt-get update \ + && apt-get install -y ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +ENV SERVER_RUN_MODE=DOCKERIZED + +WORKDIR /nango + +COPY packages/node-client/ packages/node-client/ +COPY packages/shared/ packages/shared/ +COPY packages/persist/ packages/persist/ +COPY package*.json ./ + +RUN npm pkg delete scripts.prepare +RUN npm install --omit=dev + +CMD [ "node", "/nango/packages/persist/dist/app.js", "80", "dockerized-persist" ] diff --git a/packages/persist/lib/app.ts b/packages/persist/lib/app.ts new file mode 100644 index 0000000000..8c827210cb --- /dev/null +++ b/packages/persist/lib/app.ts @@ -0,0 +1,11 @@ +import { server } from './server.js'; + +try { + const port = parseInt(process.env['NANGO_PERSIST_PORT'] || '') || 3007; + server.listen(port, () => { + console.log(`🚀 Persist API ready at http://localhost:${port}`); + }); +} catch (err) { + console.error(`Persist API error: ${err}`); + process.exit(1); +} diff --git a/packages/persist/lib/controllers/persist.controller.ts b/packages/persist/lib/controllers/persist.controller.ts new file mode 100644 index 0000000000..3c862a68c6 --- /dev/null +++ b/packages/persist/lib/controllers/persist.controller.ts @@ -0,0 +1,340 @@ +import type { Request, Response } from 'express'; +import { + setLastSyncDate, + createActivityLogMessage, + LogLevel, + errorManager, + ErrorSourceEnum, + LogActionEnum, + updateSyncJobResult, + DataResponse, + dataService, + syncDataService, + getSyncConfigByJobId, + DataRecord, + UpsertResponse +} from '@nangohq/shared'; +import tracer from '../tracer.js'; +import type { Span } from 'dd-trace'; +import { Result, ok, err } from '../utils/result.js'; + +type persistType = 'save' | 'delete' | 'update'; +type RecordRequest = Request< + { + environmentId: number; + connectionId: string; + syncId: string; + syncJobId: number; + }, + any, + { + model: string; + records: Record[]; + providerConfigKey: string; + nangoConnectionId: number; + activityLogId: number; + trackDeletes: boolean; + lastSyncDate: Date; + }, + any, + Record +>; + +class PersistController { + public async saveLastSyncDate(req: Request<{ syncId: string }, any, { lastSyncDate: Date }, any, Record>, res: Response) { + const { + params: { syncId }, + body: { lastSyncDate } + } = req; + const result = await setLastSyncDate(syncId, lastSyncDate); + if (result) { + res.status(201).send(); + } else { + res.status(500).json({ error: `Failed to save last sync date '${lastSyncDate}' for sync '${syncId}'` }); + } + } + + public async saveActivityLog( + req: Request<{ environmentId: number }, any, { activityLogId: number; level: LogLevel; msg: string }, any, Record>, + res: Response + ) { + const { + params: { environmentId }, + body: { activityLogId, level, msg } + } = req; + const result = await createActivityLogMessage( + { + level, + environment_id: environmentId, + activity_log_id: activityLogId, + content: msg, + timestamp: Date.now() + }, + false + ); + if (result) { + res.status(201).send(); + } else { + res.status(500).json({ error: `Failed to save log ${activityLogId}` }); + } + } + + public async saveRecords(req: RecordRequest, res: Response) { + const { + params: { environmentId, connectionId, syncId, syncJobId }, + body: { model, records, providerConfigKey, nangoConnectionId, trackDeletes, lastSyncDate, activityLogId } + } = req; + const persist = async (dataRecords: DataRecord[]) => { + return await dataService.upsert( + dataRecords, + '_nango_sync_data_records', + 'external_id', + nangoConnectionId, + model, + activityLogId, + environmentId, + trackDeletes, + false + ); + }; + const result = await PersistController.persistRecords({ + persistType: 'save', + environmentId, + connectionId, + providerConfigKey, + nangoConnectionId, + syncId, + syncJobId, + model, + records, + trackDeletes, + lastSyncDate, + activityLogId, + softDelete: false, + persistFunction: persist + }); + PersistController.sendRes(res, result, 'Failed to save records'); + } + + public async deleteRecords(req: RecordRequest, res: Response) { + const { + params: { environmentId, connectionId, syncId, syncJobId }, + body: { model, records, providerConfigKey, nangoConnectionId, trackDeletes, lastSyncDate, activityLogId } + } = req; + const persist = async (dataRecords: DataRecord[]) => { + return await dataService.upsert( + dataRecords, + '_nango_sync_data_records', + 'external_id', + nangoConnectionId, + model, + activityLogId, + environmentId, + trackDeletes, + true + ); + }; + const result = await PersistController.persistRecords({ + persistType: 'delete', + environmentId, + connectionId, + providerConfigKey, + nangoConnectionId, + syncId, + syncJobId, + model, + records, + trackDeletes, + lastSyncDate, + activityLogId, + softDelete: true, + persistFunction: persist + }); + PersistController.sendRes(res, result, 'Failed to delete records'); + } + + public async updateRecords(req: RecordRequest, res: Response) { + const { + params: { environmentId, connectionId, syncId, syncJobId }, + body: { model, records, providerConfigKey, nangoConnectionId, trackDeletes, lastSyncDate, activityLogId } + } = req; + const persist = async (dataRecords: DataRecord[]) => { + return await dataService.updateRecord( + dataRecords, + '_nango_sync_data_records', + 'external_id', + nangoConnectionId, + model, + activityLogId, + environmentId + ); + }; + const result = await PersistController.persistRecords({ + persistType: 'update', + environmentId, + connectionId, + providerConfigKey, + nangoConnectionId, + syncId, + syncJobId, + model, + records, + trackDeletes, + lastSyncDate, + activityLogId, + softDelete: false, + persistFunction: persist + }); + PersistController.sendRes(res, result, 'Failed to update records'); + } + + private static async persistRecords({ + persistType, + environmentId, + connectionId, + providerConfigKey, + nangoConnectionId, + syncId, + syncJobId, + model, + records, + trackDeletes, + lastSyncDate, + activityLogId, + softDelete, + persistFunction + }: { + persistType: persistType; + environmentId: number; + connectionId: string; + providerConfigKey: string; + nangoConnectionId: number; + syncId: string; + syncJobId: number; + model: string; + records: Record[]; + trackDeletes: boolean; + lastSyncDate: Date; + activityLogId: number; + softDelete: boolean; + persistFunction: (records: DataRecord[]) => Promise; + }): Promise> { + const active = tracer.scope().active(); + const span = tracer.startSpan('persistRecords', { + childOf: active as Span, + tags: { + persistType, + environmentId, + connectionId, + providerConfigKey, + nangoConnectionId, + syncId, + syncJobId, + model, + activityLogId + } + }); + const { + success, + error, + response: formattedRecords + } = syncDataService.formatDataRecords( + records as unknown as DataResponse[], + nangoConnectionId, + model, + syncId, + syncJobId, + lastSyncDate, + trackDeletes, + softDelete + ); + + if (!success || formattedRecords === null) { + await createActivityLogMessage({ + level: 'error', + environment_id: environmentId, + activity_log_id: activityLogId, + content: `There was an issue with the batch ${persistType}. ${error?.message}`, + timestamp: Date.now() + }); + const errMsg = `Failed to ${persistType} records ${activityLogId}`; + span.setTag('error', errMsg).finish(); + return err(errMsg); + } + const syncConfig = await getSyncConfigByJobId(syncJobId); + + if (syncConfig && !syncConfig?.models.includes(model)) { + const errMsg = `The model '${model}' is not included in the declared sync models: ${syncConfig.models}.`; + span.setTag('error', errMsg).finish(); + return err(errMsg); + } + + const persistResult = await persistFunction(formattedRecords); + + if (persistResult.success) { + const { summary } = persistResult; + const updatedResults = { + [model]: { + added: summary?.addedKeys.length as number, + updated: summary?.updatedKeys.length as number, + deleted: summary?.deletedKeys?.length as number + } + }; + + span.addTags({ + 'records.count': records.length, + 'records.sizeInBytes': Buffer.byteLength(JSON.stringify(records), 'utf8') + }); + + await createActivityLogMessage({ + level: 'info', + environment_id: environmentId, + activity_log_id: activityLogId, + content: `Batch ${persistType} was a success and resulted in ${JSON.stringify(updatedResults, null, 2)}`, + timestamp: Date.now() + }); + + await updateSyncJobResult(syncJobId, updatedResults, model); + span.finish(); + return ok(void 0); + } else { + const content = `There was an issue with the batch ${persistType}. ${persistResult?.error}`; + + await createActivityLogMessage({ + level: 'error', + environment_id: environmentId, + activity_log_id: activityLogId, + content, + timestamp: Date.now() + }); + + await errorManager.report(content, { + environmentId: environmentId, + source: ErrorSourceEnum.CUSTOMER, + operation: LogActionEnum.SYNC, + metadata: { + connectionId: connectionId, + providerConfigKey: providerConfigKey, + syncId: syncId, + nangoConnectionId: nangoConnectionId, + syncJobId: syncJobId + } + }); + const errMsg = persistResult?.error!; + span.setTag('error', errMsg).finish(); + return err(errMsg); + } + } + + private static sendRes(res: Response, result: Result, errorMsg: string) { + if (result.ok) { + res.status(201).send(); + } else { + res.status(500).json({ + error: `${errorMsg}: ${result.error.message}` + }); + } + } +} + +export default new PersistController(); diff --git a/packages/persist/lib/server.ts b/packages/persist/lib/server.ts new file mode 100644 index 0000000000..ff8d06bdc4 --- /dev/null +++ b/packages/persist/lib/server.ts @@ -0,0 +1,87 @@ +import express from 'express'; +import type { Request, Response, NextFunction } from 'express'; +import { validateRequest } from 'zod-express'; +import { z } from 'zod'; +import persistController from './controllers/persist.controller.js'; +import './tracer.js'; +import { logLevelValues } from '@nangohq/shared'; + +export const server = express(); +server.use(express.json()); + +server.use((req: Request, res: Response, next: NextFunction) => { + next(); + console.log(`[Persist] ${req.method} ${req.path} ${res.statusCode}`); +}); + +server.get('/health', (_req: Request, res: Response) => { + res.json({ status: 'ok' }); +}); +server.put( + '/sync/:syncId', + validateRequest({ + params: z.object({ + syncId: z.string() + }), + body: z.object({ + lastSyncDate: z + .string() + .datetime() + .transform((value) => new Date(value)) + .pipe(z.date()) as unknown as z.ZodDate + }) + }), + persistController.saveLastSyncDate +); + +server.post( + '/environment/:environmentId/log', + validateRequest({ + params: z.object({ + environmentId: z.string().transform(Number).pipe(z.number().int().positive()) as unknown as z.ZodNumber + }), + body: z.object({ + activityLogId: z.number(), + level: z.enum(logLevelValues), + msg: z.string() + }) + }), + persistController.saveActivityLog +); + +const validateRecordsRequest = validateRequest({ + params: z.object({ + environmentId: z.string().transform(Number).pipe(z.number().int().positive()) as unknown as z.ZodNumber, + connectionId: z.string(), + syncId: z.string(), + syncJobId: z.string().transform(Number).pipe(z.number().int().positive()) as unknown as z.ZodNumber + }), + body: z.object({ + model: z.string(), + records: z.any().array().nonempty(), + providerConfigKey: z.string(), + nangoConnectionId: z.number(), + activityLogId: z.number(), + lastSyncDate: z + .string() + .datetime() + .transform((value) => new Date(value)) + .pipe(z.date()) as unknown as z.ZodDate, + trackDeletes: z.boolean() + }) +}); +server.post('/environment/:environmentId/connection/:connectionId/sync/:syncId/job/:syncJobId/records', validateRecordsRequest, persistController.saveRecords); +server.delete( + '/environment/:environmentId/connection/:connectionId/sync/:syncId/job/:syncJobId/records', + validateRecordsRequest, + persistController.deleteRecords +); +server.put('/environment/:environmentId/connection/:connectionId/sync/:syncId/job/:syncJobId/records', validateRecordsRequest, persistController.updateRecords); + +server.use((err: Error, _req: Request, res: Response, next: NextFunction) => { + if (err) { + res.status(500).json({ error: err.message }); + } else { + next(); + } +}); diff --git a/packages/persist/lib/server.unit.test.ts b/packages/persist/lib/server.unit.test.ts new file mode 100644 index 0000000000..8ad0ad7cc8 --- /dev/null +++ b/packages/persist/lib/server.unit.test.ts @@ -0,0 +1,196 @@ +import { expect, describe, it, beforeAll, afterAll, beforeEach, afterEach } from 'vitest'; +import { server } from './server.js'; +import fetch from 'node-fetch'; +import { db, SyncConfig } from '@nangohq/shared'; +import mockDb, { QueryDetails } from 'mock-knex'; + +describe('Persist API', () => { + const port = 3096; + const serverUrl = `http://localhost:${port}`; + let dbTracker: mockDb.Tracker; + + beforeAll(() => { + server.listen(port); + mockDb.mock(db.knex); + dbTracker = mockDb.getTracker(); + dbTracker.install(); + }); + afterAll(() => { + mockDb.unmock(db.knex); + }); + + beforeEach(() => { + dbTracker.install(); + }); + afterEach(() => { + dbTracker.uninstall(); + }); + + it('should server /health', async () => { + const response = await fetch(`${serverUrl}/health`); + expect(response.status).toEqual(200); + expect(await response.json()).toEqual({ status: 'ok' }); + }); + + it('should set last sync date', async () => { + const syncId = 'abc'; + dbTracker.on('query', (query) => { + query.response(true); + }); + const response = await fetch(`${serverUrl}/sync/${syncId}`, { + method: 'PUT', + body: JSON.stringify({ + lastSyncDate: new Date() + }), + headers: { + 'Content-Type': 'application/json' + } + }); + expect(response.status).toEqual(201); + }); + + it('should log', async () => { + dbTracker.on('query', (query) => { + query.response([{ id: 1 }]); + }); + const response = await fetch(`${serverUrl}/environment/123/log`, { + method: 'POST', + body: JSON.stringify({ activityLogId: 456, level: 'info', msg: 'Hello, world!' }), + headers: { + 'Content-Type': 'application/json' + } + }); + expect(response.status).toEqual(201); + }); + + describe('save records', () => { + it('should error if no records', async () => { + const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, { + method: 'POST', + body: JSON.stringify({ + model: 'MyModel', + records: [], + providerConfigKey: 'provider', + nangoConnectionId: 456, + lastSyncDate: new Date(), + trackDeletes: false, + softDelete: true + }), + headers: { + 'Content-Type': 'application/json' + } + }); + expect(response.status).toEqual(400); + const respBody = (await response.json()) as any[]; + expect(respBody[0]['errors']['issues'][0]['path'][0]).toEqual('records'); + expect(respBody[0]['errors']['issues'][0]['message']).toEqual('Array must contain at least 1 element(s)'); + }); + + it('should save records', async () => { + const model = 'MyModel'; + const records = [ + { id: 1, name: 'r1' }, + { id: 2, name: 'r2' } + ]; + dbTracker.on('query', DBTracker.persistQueries(model)); + const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, { + method: 'POST', + body: JSON.stringify({ + model, + records: records, + providerConfigKey: 'provider', + nangoConnectionId: 456, + activityLogId: 12, + lastSyncDate: new Date(), + trackDeletes: false + }), + headers: { + 'Content-Type': 'application/json' + } + }); + expect(response.status).toEqual(201); + }); + }); + + it('should delete records ', async () => { + const model = 'MyModel'; + const records = [ + { id: 1, name: 'r1' }, + { id: 2, name: 'r2' } + ]; + dbTracker.on('query', DBTracker.persistQueries(model)); + const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, { + method: 'DELETE', + body: JSON.stringify({ + model, + records: records, + providerConfigKey: 'provider', + nangoConnectionId: 456, + activityLogId: 12, + lastSyncDate: new Date(), + trackDeletes: false + }), + headers: { + 'Content-Type': 'application/json' + } + }); + expect(response.status).toEqual(201); + }); + + it('should update records ', async () => { + const model = 'MyModel'; + const records = [ + { id: 1, name: 'r1' }, + { id: 2, name: 'r2' } + ]; + dbTracker.on('query', DBTracker.persistQueries(model)); + const response = await fetch(`${serverUrl}/environment/123/connection/myconn/sync/abc/job/101/records`, { + method: 'PUT', + body: JSON.stringify({ + model, + records: records, + providerConfigKey: 'provider', + nangoConnectionId: 456, + activityLogId: 12, + lastSyncDate: new Date(), + trackDeletes: false, + softDelete: true + }), + headers: { + 'Content-Type': 'application/json' + } + }); + expect(response.status).toEqual(201); + }); +}); + +class DBTracker { + public static persistQueries(model: string) { + return (query: QueryDetails, step: number) => { + const steps = [ + () => { + query.response({ models: [model] } as SyncConfig); + }, + () => { + query.response([]); + }, + () => { + query.response([]); + }, + () => { + query.response([]); + }, + () => { + query.response([]); + }, + () => { + query.response({ id: 1 }); + }, + () => { + query.response([]); + } + ]; + steps[step - 1]?.(); + }; + } +} diff --git a/packages/persist/lib/tracer.ts b/packages/persist/lib/tracer.ts new file mode 100644 index 0000000000..fc092f8ba9 --- /dev/null +++ b/packages/persist/lib/tracer.ts @@ -0,0 +1,11 @@ +import tracer from 'dd-trace'; +import { isCloud } from '@nangohq/shared'; + +if (isCloud()) { + tracer.init({ + service: 'nango-persist' + }); + tracer.use('express'); +} + +export default tracer; diff --git a/packages/persist/lib/utils/result.ts b/packages/persist/lib/utils/result.ts new file mode 100644 index 0000000000..4a06d8a221 --- /dev/null +++ b/packages/persist/lib/utils/result.ts @@ -0,0 +1,8 @@ +export type Result = { ok: true; value: T } | { ok: false; error: E }; +export function ok(value: T): Result { + return { ok: true, value }; +} +export function err(errMsg: string): Result { + const e = new Error(errMsg) as E; + return { ok: false, error: e }; +} diff --git a/packages/persist/nodemon.json b/packages/persist/nodemon.json new file mode 100644 index 0000000000..4471f18643 --- /dev/null +++ b/packages/persist/nodemon.json @@ -0,0 +1,6 @@ +{ + "watch": ["lib", "../shared/lib", "../../.env"], + "ext": "ts,json", + "ignore": ["lib/**/*.test.ts"], + "exec": "tsc && tsx -r dotenv/config lib/app.ts dotenv_config_path=./../../.env" +} \ No newline at end of file diff --git a/packages/persist/package.json b/packages/persist/package.json new file mode 100644 index 0000000000..9138f6f326 --- /dev/null +++ b/packages/persist/package.json @@ -0,0 +1,34 @@ +{ + "name": "@nangohq/nango-persist", + "version": "1.0.0", + "description": "Write customers data like records and activity logs", + "type": "module", + "main": "dist/app.js", + "typings": "dist/index.d.ts", + "scripts": { + "build": "rimraf ./dist && tsc", + "start": "node dist/app.js", + "dev": "nodemon" + }, + "keywords": [], + "repository": { + "type": "git", + "url": "git+https://github.com/NangoHQ/nango.git", + "directory": "packages/persist" + }, + "license": "SEE LICENSE IN LICENSE FILE IN GIT REPOSITORY", + "dependencies": { + "@nangohq/shared": "^0.37.8", + "dd-trace": "^5.2.0", + "express": "^4.18.2", + "zod": "^3.22.4", + "zod-express": "^0.0.8" + }, + "devDependencies": { + "@types/mock-knex": "^0.4.8", + "@types/node": "^18.7.6", + "mock-knex": "^0.4.13", + "node-fetch": "^3.3.2", + "typescript": "^5.3.3" + } +} diff --git a/packages/persist/tsconfig.json b/packages/persist/tsconfig.json new file mode 100644 index 0000000000..dd2652e614 --- /dev/null +++ b/packages/persist/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "lib", + "outDir": "dist" + }, + "references": [{ "path": "../shared" }], + "include": ["lib/**/*"] +} \ No newline at end of file diff --git a/packages/shared/lib/models/Activity.ts b/packages/shared/lib/models/Activity.ts index 2faa344551..def9ac7ad7 100644 --- a/packages/shared/lib/models/Activity.ts +++ b/packages/shared/lib/models/Activity.ts @@ -1,6 +1,7 @@ import type { HTTP_VERB, Timestamps } from './Generic.js'; -export type LogLevel = 'info' | 'debug' | 'error' | 'warn' | 'http' | 'verbose' | 'silly'; +export const logLevelValues = ['info', 'debug', 'error', 'warn', 'http', 'verbose', 'silly'] as const; +export type LogLevel = typeof logLevelValues[number]; export type LogAction = | 'account' | 'action' diff --git a/packages/shared/lib/sdk/sync.ts b/packages/shared/lib/sdk/sync.ts index cd2e81509c..182fdc3ccd 100644 --- a/packages/shared/lib/sdk/sync.ts +++ b/packages/shared/lib/sdk/sync.ts @@ -13,8 +13,9 @@ import { Nango } from '@nangohq/node'; import configService from '../services/config.service.js'; import paginateService from '../services/paginate.service.js'; import proxyService from '../services/proxy.service.js'; - -type LogLevel = 'info' | 'debug' | 'error' | 'warn' | 'http' | 'verbose' | 'silly'; +import axios from 'axios'; +import { getPersistAPIUrl } from '../utils/utils.js'; +import type { LogLevel } from '../models/Activity.js'; interface ParamEncoder { (value: any, defaultEncoder: (value: any) => any): any; @@ -214,9 +215,9 @@ export interface NangoProps { dryRun?: boolean; track_deletes?: boolean; attributes?: object | undefined; - logMessages?: unknown[] | undefined; stubbedMetadata?: Metadata | undefined; + usePersistAPI?: boolean; } interface UserLogParameters { @@ -237,6 +238,7 @@ export class NangoAction { environmentId?: number; syncJobId?: number; dryRun?: boolean; + usePersistAPI: boolean; public connectionId?: string; public providerConfigKey?: string; @@ -284,6 +286,8 @@ export class NangoAction { if (config.attributes) { this.attributes = config.attributes; } + + this.usePersistAPI = config.usePersistAPI || false; } public async proxy(config: ProxyConfiguration): Promise> { @@ -414,6 +418,22 @@ export class NangoAction { throw new Error('There is no current activity log stream to log to'); } + if (this.usePersistAPI) { + const response = await persistApi({ + method: 'POST', + url: `/environment/${this.environmentId}/log`, + data: { + activityLogId: this.activityLogId, + level: userDefinedLevel?.level ?? 'info', + msg: content + } + }); + if (response.status > 299) { + throw new Error(`cannot write log with activityLogId '${this.activityLogId}'`); + } + return; + } + await createActivityLogMessage( { level: userDefinedLevel?.level ?? 'info', @@ -511,6 +531,8 @@ export class NangoSync extends NangoAction { logMessages?: unknown[] | undefined = []; stubbedMetadata?: Metadata | undefined = undefined; + private batchSize = 1000; + constructor(config: NangoProps) { super(config); @@ -540,9 +562,18 @@ export class NangoSync extends NangoAction { if (date.toString() === 'Invalid Date') { throw new Error('Invalid Date'); } - const result = await setLastSyncDate(this.syncId as string, date); - - return result; + if (this.usePersistAPI) { + const response = await persistApi({ + method: 'PUT', + url: `/sync/${this.syncId}`, + data: { + lastSyncDate: date + } + }); + return response.status <= 299; + } else { + return await setLastSyncDate(this.syncId as string, date); + } } /** @@ -561,8 +592,37 @@ export class NangoSync extends NangoAction { return true; } - if (!this.nangoConnectionId || !this.syncId || !this.activityLogId || !this.syncJobId) { - throw new Error('Nango Connection Id, Sync Id, Activity Log Id and Sync Job Id are all required'); + if (!this.environmentId || !this.nangoConnectionId || !this.syncId || !this.activityLogId || !this.syncJobId) { + throw new Error('Nango environment Id, Connection Id, Sync Id, Activity Log Id and Sync Job Id are all required'); + } + + if (this.dryRun) { + this.logMessages?.push(`A batch save call would save the following data to the ${model} model:`); + this.logMessages?.push(...results); + return null; + } + + if (this.usePersistAPI) { + for (let i = 0; i < results.length; i += this.batchSize) { + const batch = results.slice(i, i + this.batchSize); + const response = await persistApi({ + method: 'POST', + url: `/environment/${this.environmentId}/connection/${this.connectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`, + data: { + model, + records: batch, + providerConfigKey: this.providerConfigKey, + nangoConnectionId: this.nangoConnectionId, + activityLogId: this.activityLogId, + lastSyncDate: this.lastSyncDate, + trackDeletes: this.track_deletes + } + }); + if (response.status > 299) { + return false; + } + } + return true; } const { @@ -593,12 +653,6 @@ export class NangoSync extends NangoAction { throw error; } - if (this.dryRun) { - this.logMessages?.push(`A batch save call would save the following data to the ${model} model:`); - this.logMessages?.push(...results); - return null; - } - const syncConfig = await getSyncConfigByJobId(this.syncJobId as number); if (syncConfig && !syncConfig?.models.includes(model)) { @@ -675,8 +729,37 @@ export class NangoSync extends NangoAction { return true; } - if (!this.nangoConnectionId || !this.syncId || !this.activityLogId || !this.syncJobId) { - throw new Error('Nango Connection Id, Sync Id, Activity Log Id and Sync Job Id are all required'); + if (!this.environmentId || !this.nangoConnectionId || !this.syncId || !this.activityLogId || !this.syncJobId) { + throw new Error('Nango environment Id, Connection Id, Sync Id, Activity Log Id and Sync Job Id are all required'); + } + + if (this.dryRun) { + this.logMessages?.push(`A batch delete call would delete the following data:`); + this.logMessages?.push(...results); + return null; + } + + if (this.usePersistAPI) { + for (let i = 0; i < results.length; i += this.batchSize) { + const batch = results.slice(i, i + this.batchSize); + const response = await persistApi({ + method: 'DELETE', + url: `/environment/${this.environmentId}/connection/${this.connectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`, + data: { + model, + records: batch, + providerConfigKey: this.providerConfigKey, + nangoConnectionId: this.nangoConnectionId, + activityLogId: this.activityLogId, + lastSyncDate: this.lastSyncDate, + trackDeletes: this.track_deletes + } + }); + if (response.status > 299) { + return false; + } + } + return true; } const { @@ -708,12 +791,6 @@ export class NangoSync extends NangoAction { throw error; } - if (this.dryRun) { - this.logMessages?.push(`A batch delete call would delete the following data:`); - this.logMessages?.push(...results); - return null; - } - const syncConfig = await getSyncConfigByJobId(this.syncJobId as number); if (syncConfig && !syncConfig?.models.includes(model)) { @@ -791,8 +868,37 @@ export class NangoSync extends NangoAction { return true; } - if (!this.nangoConnectionId || !this.activityLogId) { - throw new Error('Nango Connection Id, and Activity Log Id both required'); + if (!this.environmentId || !this.nangoConnectionId || !this.syncId || !this.activityLogId || !this.syncJobId) { + throw new Error('Nango environment Id, Connection Id, Sync Id, Activity Log Id and Sync Job Id are all required'); + } + + if (this.dryRun) { + this.logMessages?.push(`A batch update call would update the following data to the ${model} model:`); + this.logMessages?.push(...results); + return null; + } + + if (this.usePersistAPI) { + for (let i = 0; i < results.length; i += this.batchSize) { + const batch = results.slice(i, i + this.batchSize); + const response = await persistApi({ + method: 'PUT', + url: `/environment/${this.environmentId}/connection/${this.connectionId}/sync/${this.syncId}/job/${this.syncJobId}/records`, + data: { + model, + records: batch, + providerConfigKey: this.providerConfigKey, + nangoConnectionId: this.nangoConnectionId, + activityLogId: this.activityLogId, + lastSyncDate: this.lastSyncDate, + trackDeletes: this.track_deletes + } + }); + if (response.status > 299) { + return false; + } + } + return true; } const { @@ -823,12 +929,6 @@ export class NangoSync extends NangoAction { throw error; } - if (this.dryRun) { - this.logMessages?.push(`A batch update call would update the following data to the ${model} model:`); - this.logMessages?.push(...results); - return null; - } - const responseResults = await updateRecord( formattedResults, '_nango_sync_data_records', @@ -898,3 +998,10 @@ export class NangoSync extends NangoAction { return super.getMetadata(); } } + +const persistApi = axios.create({ + baseURL: getPersistAPIUrl(), + validateStatus: (_status) => { + return true; + } +}); diff --git a/packages/shared/lib/services/sync/run.service.ts b/packages/shared/lib/services/sync/run.service.ts index d5226e0d76..5e209e012c 100644 --- a/packages/shared/lib/services/sync/run.service.ts +++ b/packages/shared/lib/services/sync/run.service.ts @@ -23,6 +23,7 @@ import type { UpsertResponse, UpsertSummary } from '../../models/Data.js'; import { LogActionEnum } from '../../models/Activity.js'; import type { Environment } from '../../models/Environment'; import type { Metadata } from '../../models/Connection'; +import featureflags from '../../utils/featureflags.js'; interface SyncRunConfig { integrationService: IntegrationServiceInterface; @@ -263,6 +264,8 @@ export default class SyncRun { } } + const usePersistAPIGlobally = await featureflags.isEnabled('use-persist-api', 'global', false); + const usePersistAPI = await featureflags.isEnabled('use-persist-api', `${environment?.account_id}`, false); const nangoProps = { host: optionalHost || getApiUrl(), accountId: environment?.account_id as number, @@ -279,7 +282,8 @@ export default class SyncRun { attributes: syncData.attributes, track_deletes: trackDeletes as boolean, logMessages: this.logMessages, - stubbedMetadata: this.stubbedMetadata + stubbedMetadata: this.stubbedMetadata, + usePersistAPI: usePersistAPIGlobally || usePersistAPI }; if (this.debug) { diff --git a/packages/shared/lib/utils/utils.ts b/packages/shared/lib/utils/utils.ts index cd6af45fab..c1f9a4e22d 100644 --- a/packages/shared/lib/utils/utils.ts +++ b/packages/shared/lib/utils/utils.ts @@ -82,6 +82,10 @@ export function getServerPort() { } } +export function getPersistAPIUrl() { + return process.env['PERSIST_SERVICE_URL'] || 'http://localhost:3007'; +} + export function isDev() { return process.env['NODE_ENV'] === NodeEnv.Dev; } diff --git a/scripts/publish.sh b/scripts/publish.sh index 3ad5e188c7..5f2f4ae570 100755 --- a/scripts/publish.sh +++ b/scripts/publish.sh @@ -34,7 +34,7 @@ npm install "@nangohq/node@$VERSION" -w @nangohq/shared # Shared node scripts/flows.js bump_and_npm_publish "@nangohq/shared" "$VERSION" -npm install "@nangohq/shared@$VERSION" -w nango -w @nangohq/nango-server -w @nangohq/nango-jobs -w @nangohq/nango-runner +npm install "@nangohq/shared@$VERSION" -w nango -w @nangohq/nango-server -w @nangohq/nango-jobs -w @nangohq/nango-runner -w @nangohq/persist # CLI bump_and_npm_publish "nango" "$VERSION" diff --git a/tsconfig.build.json b/tsconfig.build.json index b53a3f5ee6..d75c280c70 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -7,6 +7,7 @@ { "path": "packages/server" }, { "path": "packages/frontend" }, { "path": "packages/runner" }, - { "path": "packages/jobs" } + { "path": "packages/jobs" }, + { "path": "packages/persist" } ] } \ No newline at end of file