Skip to content

Commit

Permalink
Merge pull request #550 from unchainedshop/worker-lambda-compat
Browse files Browse the repository at this point in the history
Worker lambda compat
  • Loading branch information
pozylon authored Feb 28, 2023
2 parents a28777e + ebf1db0 commit 5c6c69c
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 114 deletions.
19 changes: 2 additions & 17 deletions packages/api/src/resolvers/mutations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ import removeBookmark from './bookmarks/removeBookmark.js';
import addWork from './worker/addWork.js';
import allocateWork from './worker/allocateWork.js';
import finishWork from './worker/finishWork.js';
import processNextWork from './worker/processNextWork.js';
import removeWork from './worker/removeWork.js';
import doWork from './worker/doWork.js';
import heartbeat from './users/heartbeat.js';
import createEnrollment from './enrollments/createEnrollment.js';
import terminateEnrollment from './enrollments/terminateEnrollment.js';
Expand Down Expand Up @@ -202,15 +202,12 @@ export default {
markPaymentCredentialsPreferred,
),
removePaymentCredentials: acl(actions.managePaymentCredentials)(removePaymentCredentials),

createLanguage: acl(actions.manageLanguages)(createLanguage),
updateLanguage: acl(actions.manageLanguages)(updateLanguage),
removeLanguage: acl(actions.manageLanguages)(removeLanguage),

createCountry: acl(actions.manageCountries)(createCountry),
updateCountry: acl(actions.manageCountries)(updateCountry),
removeCountry: acl(actions.manageCountries)(removeCountry),

createProduct: acl(actions.manageProducts)(createProduct),
publishProduct: acl(actions.manageProducts)(publishProduct),
unpublishProduct: acl(actions.manageProducts)(unpublishProduct),
Expand All @@ -237,11 +234,9 @@ export default {
createProductVariationOption: acl(actions.manageProducts)(createProductVariationOption),
addProductAssignment: acl(actions.manageProducts)(addProductAssignment),
removeProductAssignment: acl(actions.manageProducts)(removeProductAssignment),

createCurrency: acl(actions.manageCurrencies)(createCurrency),
updateCurrency: acl(actions.manageCurrencies)(updateCurrency),
removeCurrency: acl(actions.manageCurrencies)(removeCurrency),

createCart: acl(actions.createCart)(createCart),
addCartProduct: acl(actions.updateCart)(addCartProduct),
addMultipleCartProducts: acl(actions.updateCart)(addMultipleCartProducts),
Expand All @@ -265,32 +260,26 @@ export default {
rejectOrder: acl(actions.markOrderRejected)(rejectOrder),
payOrder: acl(actions.markOrderPaid)(payOrder),
deliverOrder: acl(actions.markOrderDelivered)(deliverOrder),

createEnrollment: acl(actions.createEnrollment)(createEnrollment),
terminateEnrollment: acl(actions.updateEnrollment)(terminateEnrollment),
activateEnrollment: acl(actions.updateEnrollment)(activateEnrollment),
updateEnrollment: acl(actions.updateEnrollment)(updateEnrollment),

createPaymentProvider: acl(actions.managePaymentProviders)(createPaymentProvider),
updatePaymentProvider: acl(actions.managePaymentProviders)(updatePaymentProvider),
removePaymentProvider: acl(actions.managePaymentProviders)(removePaymentProvider),

createDeliveryProvider: acl(actions.manageDeliveryProviders)(createDeliveryProvider),
updateDeliveryProvider: acl(actions.manageDeliveryProviders)(updateDeliveryProvider),
removeDeliveryProvider: acl(actions.manageDeliveryProviders)(removeDeliveryProvider),

createWarehousingProvider: acl(actions.manageWarehousingProviders)(createWarehousingProvider),
updateWarehousingProvider: acl(actions.manageWarehousingProviders)(updateWarehousingProvider),
removeWarehousingProvider: acl(actions.manageWarehousingProviders)(removeWarehousingProvider),
exportToken: acl(actions.updateToken)(exportToken),

createFilter: acl(actions.manageFilters)(createFilter),
updateFilter: acl(actions.manageFilters)(updateFilter),
removeFilter: acl(actions.manageFilters)(removeFilter),
updateFilterTexts: acl(actions.manageFilters)(updateFilterTexts),
removeFilterOption: acl(actions.manageFilters)(removeFilterOption),
createFilterOption: acl(actions.manageFilters)(createFilterOption),

createAssortment: acl(actions.manageAssortments)(createAssortment),
addAssortmentMedia: acl(actions.manageAssortments)(addAssortmentMedia),
prepareAssortmentMediaUpload: acl(actions.manageAssortments)(prepareAssortmentMediaUpload),
Expand All @@ -310,27 +299,23 @@ export default {
addAssortmentFilter: acl(actions.manageAssortments)(addAssortmentFilter),
removeAssortmentFilter: acl(actions.manageAssortments)(removeAssortmentFilter),
reorderAssortmentFilters: acl(actions.manageAssortments)(reorderAssortmentFilters),

createProductReview: acl(actions.reviewProduct)(createProductReview),
updateProductReview: acl(actions.updateProductReview)(updateProductReview),
removeProductReview: acl(actions.updateProductReview)(removeProductReview),
addProductReviewVote: acl(actions.voteProductReview)(addProductReviewVote),
removeProductReviewVote: acl(actions.voteProductReview)(removeProductReviewVote),

requestQuotation: acl(actions.requestQuotation)(requestQuotation),
rejectQuotation: acl(actions.answerQuotation)(rejectQuotation),
verifyQuotation: acl(actions.manageQuotations)(verifyQuotation),
makeQuotationProposal: acl(actions.manageQuotations)(makeQuotationProposal),

bookmark: acl(actions.bookmarkProduct)(bookmark),
createBookmark: acl(actions.manageBookmarks)(createBookmark),
removeBookmark: acl(actions.manageBookmarks)(removeBookmark),

addWork: acl(actions.manageWorker)(addWork),
allocateWork: acl(actions.manageWorker)(allocateWork),
finishWork: acl(actions.manageWorker)(finishWork),
removeWork: acl(actions.manageWorker)(removeWork),
doWork: acl(actions.manageWorker)(doWork),
processNextWork: acl(actions.manageWorker)(processNextWork),
signPaymentProviderForCredentialRegistration: acl(actions.registerPaymentCredentials)(
signPaymentProviderForCredentialRegistration,
),
Expand Down
17 changes: 0 additions & 17 deletions packages/api/src/resolvers/mutations/worker/doWork.ts

This file was deleted.

20 changes: 20 additions & 0 deletions packages/api/src/resolvers/mutations/worker/processNextWork.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Context, Root } from '@unchainedshop/types/api.js';
import { log } from '@unchainedshop/logger';

export default async function processNextWork(
root: Root,
data: {
worker?: string;
},
context: Context,
) {
const { worker } = data;

log(`mutation processNextWork ${worker}`, {
userId: context.userId,
});

const work = await context.modules.worker.processNextWork(worker, context);

return work;
}
11 changes: 8 additions & 3 deletions packages/api/src/schema/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,8 @@ export default [
removeBookmark(bookmarkId: ID!): Bookmark!
"""
Add work to the work queue. Each type has its own input shape
Add work to the work queue. Each type has its own input shape. If you pinpoint the worker by setting it
during creation, the work will be only run by the worker who identifies as that worker.
"""
addWork(
type: WorkType!
Expand All @@ -818,6 +819,7 @@ export default [
originalWorkId: ID
scheduled: Timestamp
retries: Int! = 20
worker: String
): Work
"""
Expand All @@ -827,9 +829,12 @@ export default [
allocateWork(types: [WorkType], worker: String): Work
"""
Trigger a registered plugin for "type" to actually do the work with given "input".
This will pick up non-external work, execute, await result and finish
it up on the target system. This function allows you to do work queue "ticks"
from outside instead of waiting for default Cron and Event Listener to trigger
and can be helpful in serverless environments.
"""
doWork(type: WorkType!, input: JSON): WorkOutput
processNextWork(worker: String): Work
"""
Register a work attempt manually.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,6 @@ export const configureAccountsModule = async ({

disableTOTP: async (userId, code) => {
await accountsPassword.twoFactor.unset(userId, code);
// https://github.com/accounts-js/accounts/issues/1181
const wait = async (time: number) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(true);
}, time);
});
};
await wait(500);
return true;
},

Expand Down
87 changes: 58 additions & 29 deletions packages/core-worker/src/module/configureWorkerModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,32 @@ export const configureWorkerModule = async ({

const mutations = generateDbMutations<Work>(WorkQueue, WorkQueueSchema) as ModuleMutations<Work>;

const allocateWork: WorkerModule['allocateWork'] = async ({ types, worker = UNCHAINED_WORKER_ID }) => {
// Find a work item that is scheduled for now and is not started.
// Also:
// - Restrict by types and worker if provided
// - Sort by default queue order
const query = buildQuerySelector({
status: [WorkStatus.NEW],
scheduled: { end: new Date() },
worker: { $in: [null, worker] },
...(types ? { type: { $in: types } } : {}),
});
const result = await WorkQueue.findOneAndUpdate(
query,
{
$set: { started: new Date(), worker },
},
{ sort: buildSortOptions(defaultSort), returnDocument: 'after' },
);

WorkerDirector.events.emit(WorkerEventTypes.ALLOCATED, {
work: result.value,
});

return result.value;
};

const finishWork: WorkerModule['finishWork'] = async (
workId,
{
Expand Down Expand Up @@ -147,6 +173,26 @@ export const configureWorkerModule = async ({
return work;
};

const processNextWork: WorkerModule['processNextWork'] = async (workerId, unchainedAPI) => {
const work = await allocateWork({
types: WorkerDirector.getActivePluginTypes(false),
worker: workerId,
});

if (work) {
const output = await WorkerDirector.doWork(work, unchainedAPI);

return finishWork(work._id, {
...output,
finished: work.finished || new Date(),
started: work.started,
worker: workerId,
});
}

return null;
};

return {
// Queries
activeWorkTypes: async () => {
Expand Down Expand Up @@ -216,7 +262,15 @@ export const configureWorkerModule = async ({
},

// Mutations
addWork: async ({ type, input, priority = 0, scheduled, originalWorkId, retries = 20 }) => {
addWork: async ({
type,
input,
priority = 0,
scheduled,
originalWorkId,
worker = null,
retries = 20,
}) => {
if (!WorkerDirector.getAdapter(type)) {
throw new Error(`No plugin registered for type ${type}`);
}
Expand All @@ -230,6 +284,7 @@ export const configureWorkerModule = async ({
originalWorkId,
retries,
created,
worker,
});

logger.info(`${type} scheduled @ ${new Date(scheduled || created).toISOString()}`, {
Expand Down Expand Up @@ -260,31 +315,7 @@ export const configureWorkerModule = async ({
return work;
},

allocateWork: async ({ types, worker = UNCHAINED_WORKER_ID }) => {
// Find a work item that is scheduled for now and is not started.
// Also:
// - Restrict by types and worker if provided
// - Sort by default queue order
const query = buildQuerySelector({
status: [WorkStatus.NEW],
scheduled: { end: new Date() },
worker: { $in: [null, worker] },
...(types ? { type: { $in: types } } : {}),
});
const result = await WorkQueue.findOneAndUpdate(
query,
{
$set: { started: new Date(), worker },
},
{ sort: buildSortOptions(defaultSort), returnDocument: 'after' },
);

WorkerDirector.events.emit(WorkerEventTypes.ALLOCATED, {
work: result.value,
});

return result.value;
},
allocateWork,

ensureOneWork: async ({ type, input, priority = 0, scheduled, originalWorkId, retries = 20 }) => {
const created = new Date();
Expand Down Expand Up @@ -335,9 +366,7 @@ export const configureWorkerModule = async ({
}
},

doWork: (work, unchainedAPI) => {
return WorkerDirector.doWork(work, unchainedAPI);
},
processNextWork,

finishWork,

Expand Down
23 changes: 2 additions & 21 deletions packages/core-worker/src/workers/BaseWorker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IWorker, Work } from '@unchainedshop/types/worker.js';
import { IWorker } from '@unchainedshop/types/worker.js';
import later from '@breejs/later';
import { log } from '@unchainedshop/logger';
import os from 'os';
Expand Down Expand Up @@ -75,27 +75,8 @@ export const BaseWorker: IWorker<WorkerParams> = {

const processRecursively = async (recursionCounter = 0) => {
if (maxWorkItemCount && maxWorkItemCount < recursionCounter) return null;

const work = await unchainedAPI.modules.worker.allocateWork({
types: WorkerDirector.getActivePluginTypes(false),
worker: workerId,
});

let doneWork: Work | null = null;

if (work) {
const output = await unchainedAPI.modules.worker.doWork(work, unchainedAPI);

doneWork = await unchainedAPI.modules.worker.finishWork(work._id, {
...output,
finished: work.finished || new Date(),
started: work.started,
worker: workerId,
});
}

const doneWork = await unchainedAPI.modules.worker.processNextWork(workerId, unchainedAPI);
if (doneWork) return processRecursively(recursionCounter + 1);

return null;
};

Expand Down
3 changes: 2 additions & 1 deletion packages/types/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface WorkData {
priority?: number;
retries?: number;
scheduled?: Date;
worker?: string;
}

export interface WorkResult<Result> {
Expand Down Expand Up @@ -87,7 +88,7 @@ export type WorkerModule = {

allocateWork: (doc: { types: Array<string>; worker: string }) => Promise<Work>;

doWork: (work: Work, unchainedAPI: UnchainedCore) => Promise<WorkResult<any>>;
processNextWork: (worker: string, unchainedAPI: UnchainedCore) => Promise<Work>;

rescheduleWork: (work: Work, scheduled: Date, unchainedAPI: UnchainedCore) => Promise<Work>;

Expand Down
Loading

0 comments on commit 5c6c69c

Please sign in to comment.