From d92a9963bafd31b8776a5f7131311669751ef43b Mon Sep 17 00:00:00 2001 From: Pascal Kaufmann Date: Tue, 28 Feb 2023 01:53:19 +0100 Subject: [PATCH 1/3] Remove doWork --- packages/api/src/resolvers/mutations/index.ts | 2 - .../src/resolvers/mutations/worker/doWork.ts | 17 ---- packages/api/src/schema/mutation.ts | 9 +- .../src/module/configureWorkerModule.ts | 87 ++++++++++++------- .../core-worker/src/workers/BaseWorker.ts | 23 +---- packages/types/worker.ts | 3 +- tests/worker.test.js | 20 +---- 7 files changed, 68 insertions(+), 93 deletions(-) delete mode 100644 packages/api/src/resolvers/mutations/worker/doWork.ts diff --git a/packages/api/src/resolvers/mutations/index.ts b/packages/api/src/resolvers/mutations/index.ts index 4aa63931d4..ec2995c5d0 100755 --- a/packages/api/src/resolvers/mutations/index.ts +++ b/packages/api/src/resolvers/mutations/index.ts @@ -131,7 +131,6 @@ import addWork from './worker/addWork.js'; import allocateWork from './worker/allocateWork.js'; import finishWork from './worker/finishWork.js'; import removeWork from './worker/removeWork.js'; -import doWork from './worker/doWork.js'; import heartbeat from './users/heartbeat.js'; import createEnrollment from './enrollments/createEnrollment.js'; import terminateEnrollment from './enrollments/terminateEnrollment.js'; @@ -330,7 +329,6 @@ export default { allocateWork: acl(actions.manageWorker)(allocateWork), finishWork: acl(actions.manageWorker)(finishWork), removeWork: acl(actions.manageWorker)(removeWork), - doWork: acl(actions.manageWorker)(doWork), signPaymentProviderForCredentialRegistration: acl(actions.registerPaymentCredentials)( signPaymentProviderForCredentialRegistration, ), diff --git a/packages/api/src/resolvers/mutations/worker/doWork.ts b/packages/api/src/resolvers/mutations/worker/doWork.ts deleted file mode 100644 index 1e38b17260..0000000000 --- a/packages/api/src/resolvers/mutations/worker/doWork.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { log } from '@unchainedshop/logger'; -import { Context, Root } from '@unchainedshop/types/api.js'; -import { Work } from '@unchainedshop/types/worker.js'; -import { WorkTypeInvalidError } from '../../../errors.js'; - -export default async function doWork(root: Root, work: Work, context: Context) { - const { modules, userId } = context; - const { type, input } = work; - - log(`mutation doWork ${type} ${input}`, { - userId, - }); - - if (!type) throw new WorkTypeInvalidError({ type }); - - return modules.worker.doWork(work, context); -} diff --git a/packages/api/src/schema/mutation.ts b/packages/api/src/schema/mutation.ts index 0b0053ad1f..9ae7900717 100644 --- a/packages/api/src/schema/mutation.ts +++ b/packages/api/src/schema/mutation.ts @@ -809,7 +809,8 @@ export default [ removeBookmark(bookmarkId: ID!): Bookmark! """ - Add work to the work queue. Each type has its own input shape + Add work to the work queue. Each type has its own input shape. If you pinpoint the worker by setting it + during creation, the work will be only run by the worker who identifies as that worker. """ addWork( type: WorkType! @@ -818,6 +819,7 @@ export default [ originalWorkId: ID scheduled: Timestamp retries: Int! = 20 + worker: String ): Work """ @@ -826,11 +828,6 @@ export default [ """ allocateWork(types: [WorkType], worker: String): Work - """ - Trigger a registered plugin for "type" to actually do the work with given "input". - """ - doWork(type: WorkType!, input: JSON): WorkOutput - """ Register a work attempt manually. Note: Usually, work attempts are handled internally by the inbuilt cron diff --git a/packages/core-worker/src/module/configureWorkerModule.ts b/packages/core-worker/src/module/configureWorkerModule.ts index 817d9778d8..60fa0b74a6 100644 --- a/packages/core-worker/src/module/configureWorkerModule.ts +++ b/packages/core-worker/src/module/configureWorkerModule.ts @@ -98,6 +98,32 @@ export const configureWorkerModule = async ({ const mutations = generateDbMutations(WorkQueue, WorkQueueSchema) as ModuleMutations; + const allocateWork: WorkerModule['allocateWork'] = async ({ types, worker = UNCHAINED_WORKER_ID }) => { + // Find a work item that is scheduled for now and is not started. + // Also: + // - Restrict by types and worker if provided + // - Sort by default queue order + const query = buildQuerySelector({ + status: [WorkStatus.NEW], + scheduled: { end: new Date() }, + worker: { $in: [null, worker] }, + ...(types ? { type: { $in: types } } : {}), + }); + const result = await WorkQueue.findOneAndUpdate( + query, + { + $set: { started: new Date(), worker }, + }, + { sort: buildSortOptions(defaultSort), returnDocument: 'after' }, + ); + + WorkerDirector.events.emit(WorkerEventTypes.ALLOCATED, { + work: result.value, + }); + + return result.value; + }; + const finishWork: WorkerModule['finishWork'] = async ( workId, { @@ -147,6 +173,26 @@ export const configureWorkerModule = async ({ return work; }; + const processWork: WorkerModule['processWork'] = async (workerId, unchainedAPI) => { + const work = await allocateWork({ + types: WorkerDirector.getActivePluginTypes(false), + worker: workerId, + }); + + if (work) { + const output = await WorkerDirector.doWork(work, unchainedAPI); + + return finishWork(work._id, { + ...output, + finished: work.finished || new Date(), + started: work.started, + worker: workerId, + }); + } + + return null; + }; + return { // Queries activeWorkTypes: async () => { @@ -216,7 +262,15 @@ export const configureWorkerModule = async ({ }, // Mutations - addWork: async ({ type, input, priority = 0, scheduled, originalWorkId, retries = 20 }) => { + addWork: async ({ + type, + input, + priority = 0, + scheduled, + originalWorkId, + worker = null, + retries = 20, + }) => { if (!WorkerDirector.getAdapter(type)) { throw new Error(`No plugin registered for type ${type}`); } @@ -230,6 +284,7 @@ export const configureWorkerModule = async ({ originalWorkId, retries, created, + worker, }); logger.info(`${type} scheduled @ ${new Date(scheduled || created).toISOString()}`, { @@ -260,31 +315,7 @@ export const configureWorkerModule = async ({ return work; }, - allocateWork: async ({ types, worker = UNCHAINED_WORKER_ID }) => { - // Find a work item that is scheduled for now and is not started. - // Also: - // - Restrict by types and worker if provided - // - Sort by default queue order - const query = buildQuerySelector({ - status: [WorkStatus.NEW], - scheduled: { end: new Date() }, - worker: { $in: [null, worker] }, - ...(types ? { type: { $in: types } } : {}), - }); - const result = await WorkQueue.findOneAndUpdate( - query, - { - $set: { started: new Date(), worker }, - }, - { sort: buildSortOptions(defaultSort), returnDocument: 'after' }, - ); - - WorkerDirector.events.emit(WorkerEventTypes.ALLOCATED, { - work: result.value, - }); - - return result.value; - }, + allocateWork, ensureOneWork: async ({ type, input, priority = 0, scheduled, originalWorkId, retries = 20 }) => { const created = new Date(); @@ -335,9 +366,7 @@ export const configureWorkerModule = async ({ } }, - doWork: (work, unchainedAPI) => { - return WorkerDirector.doWork(work, unchainedAPI); - }, + processWork, finishWork, diff --git a/packages/core-worker/src/workers/BaseWorker.ts b/packages/core-worker/src/workers/BaseWorker.ts index 88c57162ec..ebd1409932 100644 --- a/packages/core-worker/src/workers/BaseWorker.ts +++ b/packages/core-worker/src/workers/BaseWorker.ts @@ -1,4 +1,4 @@ -import { IWorker, Work } from '@unchainedshop/types/worker.js'; +import { IWorker } from '@unchainedshop/types/worker.js'; import later from '@breejs/later'; import { log } from '@unchainedshop/logger'; import os from 'os'; @@ -75,27 +75,8 @@ export const BaseWorker: IWorker = { const processRecursively = async (recursionCounter = 0) => { if (maxWorkItemCount && maxWorkItemCount < recursionCounter) return null; - - const work = await unchainedAPI.modules.worker.allocateWork({ - types: WorkerDirector.getActivePluginTypes(false), - worker: workerId, - }); - - let doneWork: Work | null = null; - - if (work) { - const output = await unchainedAPI.modules.worker.doWork(work, unchainedAPI); - - doneWork = await unchainedAPI.modules.worker.finishWork(work._id, { - ...output, - finished: work.finished || new Date(), - started: work.started, - worker: workerId, - }); - } - + const doneWork = await unchainedAPI.modules.worker.processWork(workerId, unchainedAPI); if (doneWork) return processRecursively(recursionCounter + 1); - return null; }; diff --git a/packages/types/worker.ts b/packages/types/worker.ts index 6a64f92728..689c60afa2 100644 --- a/packages/types/worker.ts +++ b/packages/types/worker.ts @@ -48,6 +48,7 @@ export interface WorkData { priority?: number; retries?: number; scheduled?: Date; + worker?: string; } export interface WorkResult { @@ -87,7 +88,7 @@ export type WorkerModule = { allocateWork: (doc: { types: Array; worker: string }) => Promise; - doWork: (work: Work, unchainedAPI: UnchainedCore) => Promise>; + processWork: (worker: string, unchainedAPI: UnchainedCore) => Promise; rescheduleWork: (work: Work, scheduled: Date, unchainedAPI: UnchainedCore) => Promise; diff --git a/tests/worker.test.js b/tests/worker.test.js index c5c22ebee2..e47b809680 100644 --- a/tests/worker.test.js +++ b/tests/worker.test.js @@ -284,22 +284,6 @@ describe('Worker Module', () => { expect(allocateWorkResult.errors).toBeUndefined(); - const doWorkResult = await graphqlFetchAsAdminUser({ - query: /* GraphQL */ ` - mutation doWork($type: WorkType!, $input: JSON) { - doWork(type: $type, input: $input) { - result - error - success - } - } - `, - variables: { type: 'HEARTBEAT' }, - }); - - expect(doWorkResult.errors).toBeUndefined(); - expect(doWorkResult.data.doWork.success).toBe(true); - const finishWorkResult = await graphqlFetchAsAdminUser({ query: /* GraphQL */ ` mutation finishWork( @@ -322,7 +306,9 @@ describe('Worker Module', () => { `, variables: { workId: addWorkResult.data.addWork._id, - ...doWorkResult.data.doWork, + success: true, + result: {}, + worker: "TEST-GRAPHQL" }, }); From 3fbea7ee1711be8ab0dc1b6b097acfe2db3c983d Mon Sep 17 00:00:00 2001 From: Pascal Kaufmann Date: Tue, 28 Feb 2023 02:08:10 +0100 Subject: [PATCH 2/3] Add processNextWork mutation --- packages/api/src/resolvers/mutations/index.ts | 17 ++-------------- .../mutations/worker/processNextWork.ts | 20 +++++++++++++++++++ packages/api/src/schema/mutation.ts | 8 ++++++++ .../src/module/configureWorkerModule.ts | 4 ++-- .../core-worker/src/workers/BaseWorker.ts | 2 +- packages/types/worker.ts | 2 +- 6 files changed, 34 insertions(+), 19 deletions(-) create mode 100644 packages/api/src/resolvers/mutations/worker/processNextWork.ts diff --git a/packages/api/src/resolvers/mutations/index.ts b/packages/api/src/resolvers/mutations/index.ts index ec2995c5d0..8de80a8552 100755 --- a/packages/api/src/resolvers/mutations/index.ts +++ b/packages/api/src/resolvers/mutations/index.ts @@ -130,6 +130,7 @@ import removeBookmark from './bookmarks/removeBookmark.js'; import addWork from './worker/addWork.js'; import allocateWork from './worker/allocateWork.js'; import finishWork from './worker/finishWork.js'; +import processNextWork from './worker/processNextWork.js'; import removeWork from './worker/removeWork.js'; import heartbeat from './users/heartbeat.js'; import createEnrollment from './enrollments/createEnrollment.js'; @@ -201,15 +202,12 @@ export default { markPaymentCredentialsPreferred, ), removePaymentCredentials: acl(actions.managePaymentCredentials)(removePaymentCredentials), - createLanguage: acl(actions.manageLanguages)(createLanguage), updateLanguage: acl(actions.manageLanguages)(updateLanguage), removeLanguage: acl(actions.manageLanguages)(removeLanguage), - createCountry: acl(actions.manageCountries)(createCountry), updateCountry: acl(actions.manageCountries)(updateCountry), removeCountry: acl(actions.manageCountries)(removeCountry), - createProduct: acl(actions.manageProducts)(createProduct), publishProduct: acl(actions.manageProducts)(publishProduct), unpublishProduct: acl(actions.manageProducts)(unpublishProduct), @@ -236,11 +234,9 @@ export default { createProductVariationOption: acl(actions.manageProducts)(createProductVariationOption), addProductAssignment: acl(actions.manageProducts)(addProductAssignment), removeProductAssignment: acl(actions.manageProducts)(removeProductAssignment), - createCurrency: acl(actions.manageCurrencies)(createCurrency), updateCurrency: acl(actions.manageCurrencies)(updateCurrency), removeCurrency: acl(actions.manageCurrencies)(removeCurrency), - createCart: acl(actions.createCart)(createCart), addCartProduct: acl(actions.updateCart)(addCartProduct), addMultipleCartProducts: acl(actions.updateCart)(addMultipleCartProducts), @@ -264,32 +260,26 @@ export default { rejectOrder: acl(actions.markOrderRejected)(rejectOrder), payOrder: acl(actions.markOrderPaid)(payOrder), deliverOrder: acl(actions.markOrderDelivered)(deliverOrder), - createEnrollment: acl(actions.createEnrollment)(createEnrollment), terminateEnrollment: acl(actions.updateEnrollment)(terminateEnrollment), activateEnrollment: acl(actions.updateEnrollment)(activateEnrollment), updateEnrollment: acl(actions.updateEnrollment)(updateEnrollment), - createPaymentProvider: acl(actions.managePaymentProviders)(createPaymentProvider), updatePaymentProvider: acl(actions.managePaymentProviders)(updatePaymentProvider), removePaymentProvider: acl(actions.managePaymentProviders)(removePaymentProvider), - createDeliveryProvider: acl(actions.manageDeliveryProviders)(createDeliveryProvider), updateDeliveryProvider: acl(actions.manageDeliveryProviders)(updateDeliveryProvider), removeDeliveryProvider: acl(actions.manageDeliveryProviders)(removeDeliveryProvider), - createWarehousingProvider: acl(actions.manageWarehousingProviders)(createWarehousingProvider), updateWarehousingProvider: acl(actions.manageWarehousingProviders)(updateWarehousingProvider), removeWarehousingProvider: acl(actions.manageWarehousingProviders)(removeWarehousingProvider), exportToken: acl(actions.updateToken)(exportToken), - createFilter: acl(actions.manageFilters)(createFilter), updateFilter: acl(actions.manageFilters)(updateFilter), removeFilter: acl(actions.manageFilters)(removeFilter), updateFilterTexts: acl(actions.manageFilters)(updateFilterTexts), removeFilterOption: acl(actions.manageFilters)(removeFilterOption), createFilterOption: acl(actions.manageFilters)(createFilterOption), - createAssortment: acl(actions.manageAssortments)(createAssortment), addAssortmentMedia: acl(actions.manageAssortments)(addAssortmentMedia), prepareAssortmentMediaUpload: acl(actions.manageAssortments)(prepareAssortmentMediaUpload), @@ -309,26 +299,23 @@ export default { addAssortmentFilter: acl(actions.manageAssortments)(addAssortmentFilter), removeAssortmentFilter: acl(actions.manageAssortments)(removeAssortmentFilter), reorderAssortmentFilters: acl(actions.manageAssortments)(reorderAssortmentFilters), - createProductReview: acl(actions.reviewProduct)(createProductReview), updateProductReview: acl(actions.updateProductReview)(updateProductReview), removeProductReview: acl(actions.updateProductReview)(removeProductReview), addProductReviewVote: acl(actions.voteProductReview)(addProductReviewVote), removeProductReviewVote: acl(actions.voteProductReview)(removeProductReviewVote), - requestQuotation: acl(actions.requestQuotation)(requestQuotation), rejectQuotation: acl(actions.answerQuotation)(rejectQuotation), verifyQuotation: acl(actions.manageQuotations)(verifyQuotation), makeQuotationProposal: acl(actions.manageQuotations)(makeQuotationProposal), - bookmark: acl(actions.bookmarkProduct)(bookmark), createBookmark: acl(actions.manageBookmarks)(createBookmark), removeBookmark: acl(actions.manageBookmarks)(removeBookmark), - addWork: acl(actions.manageWorker)(addWork), allocateWork: acl(actions.manageWorker)(allocateWork), finishWork: acl(actions.manageWorker)(finishWork), removeWork: acl(actions.manageWorker)(removeWork), + processNextWork: acl(actions.manageWorker)(processNextWork), signPaymentProviderForCredentialRegistration: acl(actions.registerPaymentCredentials)( signPaymentProviderForCredentialRegistration, ), diff --git a/packages/api/src/resolvers/mutations/worker/processNextWork.ts b/packages/api/src/resolvers/mutations/worker/processNextWork.ts new file mode 100644 index 0000000000..34b0f28e06 --- /dev/null +++ b/packages/api/src/resolvers/mutations/worker/processNextWork.ts @@ -0,0 +1,20 @@ +import { Context, Root } from '@unchainedshop/types/api.js'; +import { log } from '@unchainedshop/logger'; + +export default async function processNextWork( + root: Root, + data: { + worker?: string; + }, + context: Context, +) { + const { worker } = data; + + log(`mutation processNextWork ${worker}`, { + userId: context.userId, + }); + + const work = await context.modules.worker.processNextWork(worker, context); + + return work; +} diff --git a/packages/api/src/schema/mutation.ts b/packages/api/src/schema/mutation.ts index 9ae7900717..ce9df752a4 100644 --- a/packages/api/src/schema/mutation.ts +++ b/packages/api/src/schema/mutation.ts @@ -828,6 +828,14 @@ export default [ """ allocateWork(types: [WorkType], worker: String): Work + """ + This will pick up non-external work, execute, await result and finish + it up on the target system. This function allows you to do work queue "ticks" + from outside instead of waiting for default Cron and Event Listener to trigger + and can be helpful in serverless environments. + """ + processNextWork(worker: String): Work + """ Register a work attempt manually. Note: Usually, work attempts are handled internally by the inbuilt cron diff --git a/packages/core-worker/src/module/configureWorkerModule.ts b/packages/core-worker/src/module/configureWorkerModule.ts index 60fa0b74a6..4d6a42b3bd 100644 --- a/packages/core-worker/src/module/configureWorkerModule.ts +++ b/packages/core-worker/src/module/configureWorkerModule.ts @@ -173,7 +173,7 @@ export const configureWorkerModule = async ({ return work; }; - const processWork: WorkerModule['processWork'] = async (workerId, unchainedAPI) => { + const processNextWork: WorkerModule['processNextWork'] = async (workerId, unchainedAPI) => { const work = await allocateWork({ types: WorkerDirector.getActivePluginTypes(false), worker: workerId, @@ -366,7 +366,7 @@ export const configureWorkerModule = async ({ } }, - processWork, + processNextWork, finishWork, diff --git a/packages/core-worker/src/workers/BaseWorker.ts b/packages/core-worker/src/workers/BaseWorker.ts index ebd1409932..fee152a812 100644 --- a/packages/core-worker/src/workers/BaseWorker.ts +++ b/packages/core-worker/src/workers/BaseWorker.ts @@ -75,7 +75,7 @@ export const BaseWorker: IWorker = { const processRecursively = async (recursionCounter = 0) => { if (maxWorkItemCount && maxWorkItemCount < recursionCounter) return null; - const doneWork = await unchainedAPI.modules.worker.processWork(workerId, unchainedAPI); + const doneWork = await unchainedAPI.modules.worker.processNextWork(workerId, unchainedAPI); if (doneWork) return processRecursively(recursionCounter + 1); return null; }; diff --git a/packages/types/worker.ts b/packages/types/worker.ts index 689c60afa2..ea4af14f16 100644 --- a/packages/types/worker.ts +++ b/packages/types/worker.ts @@ -88,7 +88,7 @@ export type WorkerModule = { allocateWork: (doc: { types: Array; worker: string }) => Promise; - processWork: (worker: string, unchainedAPI: UnchainedCore) => Promise; + processNextWork: (worker: string, unchainedAPI: UnchainedCore) => Promise; rescheduleWork: (work: Work, scheduled: Date, unchainedAPI: UnchainedCore) => Promise; From ebf1db05e07d7d9ed3680f5527ebdf645d1a2e5c Mon Sep 17 00:00:00 2001 From: Pascal Kaufmann Date: Tue, 28 Feb 2023 02:12:52 +0100 Subject: [PATCH 3/3] Fix remove accounts-js workaround for disableTOTP --- .../src/module/configureAccountsModule.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/packages/core-accountsjs/src/module/configureAccountsModule.ts b/packages/core-accountsjs/src/module/configureAccountsModule.ts index ebd882911a..ac2f814b7f 100644 --- a/packages/core-accountsjs/src/module/configureAccountsModule.ts +++ b/packages/core-accountsjs/src/module/configureAccountsModule.ts @@ -213,15 +213,6 @@ export const configureAccountsModule = async ({ disableTOTP: async (userId, code) => { await accountsPassword.twoFactor.unset(userId, code); - // https://github.com/accounts-js/accounts/issues/1181 - const wait = async (time: number) => { - return new Promise((resolve) => { - setTimeout(() => { - resolve(true); - }, time); - }); - }; - await wait(500); return true; },