From 26892c6742bf52a182ea8c6a9f888bc50826f745 Mon Sep 17 00:00:00 2001 From: Pavel Zhukov Date: Fri, 6 Sep 2024 21:32:13 +0300 Subject: [PATCH] feat: allow running aggregations on server side (#6) --- packages/backend/features/serverAggregate.js | 8 ++-- packages/server-aggregate/server.js | 13 ++++--- packages/teamplay/orm/sub.js | 41 ++++++++++++++++++-- packages/teamplay/test/sub$.js | 4 +- 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/packages/backend/features/serverAggregate.js b/packages/backend/features/serverAggregate.js index 6fe6c0d..dbaf247 100644 --- a/packages/backend/features/serverAggregate.js +++ b/packages/backend/features/serverAggregate.js @@ -15,10 +15,10 @@ export default function serverAggregate (backend, { models = {}, ...options } = collection, aggregationName, (queryParams, shareRequest) => { - const session = shareRequest.agent.connectSession - // TODO: rewrite to use $ here, or create a separate root $ for each user - // const model = global.__clients[userId].model - const context = { session, collection } + const session = shareRequest.agent.connectSession || {} + const isServer = shareRequest.agent.stream?.isServer + // should match the context in teamplay/orm/sub.js + const context = { session, collection, isServer } return aggregation(queryParams, context) } ) diff --git a/packages/server-aggregate/server.js b/packages/server-aggregate/server.js index a05c719..ea6255b 100644 --- a/packages/server-aggregate/server.js +++ b/packages/server-aggregate/server.js @@ -17,19 +17,20 @@ export default (backend, { customCheck } = {}) => { const handleQuery = async (shareRequest) => { const { query, collection } = shareRequest - const queryName = query.$aggregationName - let queryParams = query.$params - if (query.$aggregate) { + const { stream } = shareRequest.agent + // allow any aggregations initiated from the server code + if (stream?.isServer && !stream?.checkServerAccess) return + // deny any direct aggregations made from the client throw new ShareDBAccessError(ERR_ACCESS_ONLY_SERVER_AGGREATE, ` - access denied - only server-queries for $aggregate are allowed + access denied - only server-queries for $aggregate are allowed from the client collection: '${collection}', query: \n${JSON.stringify(query, null, 2)} `) } - if (!queryName && !queryParams) return - queryParams = queryParams || {} + const { $aggregationName: queryName, $params: queryParams = {} } = query + if (!queryName && !queryParams) return const queryFunction = QUERIES[collection + '.' + queryName] diff --git a/packages/teamplay/orm/sub.js b/packages/teamplay/orm/sub.js index 3d02922..3cd2bbe 100644 --- a/packages/teamplay/orm/sub.js +++ b/packages/teamplay/orm/sub.js @@ -3,6 +3,7 @@ import Signal, { SEGMENTS, isPublicCollectionSignal, isPublicDocumentSignal } fr import { docSubscriptions } from './Doc.js' import { querySubscriptions, getQuerySignal } from './Query.js' import { aggregationSubscriptions, getAggregationSignal } from './Aggregation.js' +import isServer from '../utils/isServer.js' export default function sub ($signal, params) { // TODO: temporarily disable support for multiple subscriptions @@ -20,9 +21,7 @@ export default function sub ($signal, params) { if (arguments.length !== 2) throw Error(ERRORS.subQueryArguments(...arguments)) return query$($signal[SEGMENTS][0], params) } else if (isClientAggregationFunction($signal)) { - params = $signal(sanitizeAggregationParams(params)) - if (Array.isArray(params)) params = { $aggregate: params } - return aggregation$($signal.collection, params) + return getAggregationFromFunction($signal, $signal.collection, params) } else if (isAggregationHeader($signal)) { params = { $aggregationName: $signal.name, @@ -30,7 +29,15 @@ export default function sub ($signal, params) { } return aggregation$($signal.collection, params) } else if (isAggregationFunction($signal)) { - throw Error(ERRORS.gotAggregationFunction($signal)) + if (isServer) { + if (!params?.$collection) throw Error(ERRORS.subServerAggregationCollection($signal, params)) + params = { ...params } + const collection = params.$collection + delete params.$collection + return getAggregationFromFunction($signal, collection, params) + } else { + throw Error(ERRORS.gotAggregationFunction($signal)) + } } else if (typeof $signal === 'function' && !($signal instanceof Signal)) { return api$($signal, params) } else { @@ -38,6 +45,21 @@ export default function sub ($signal, params) { } } +function getAggregationFromFunction (fn, collection, params) { + params = sanitizeAggregationParams(params) // clones it, so mutation becomes safe + let session + if (params.$session) { + session = params.$session + delete params.$session + } + session ??= {} + // should match the context in @teamplay/backend/features/serverAggregate.js + const context = { collection, session, isServer } + params = fn(params, context) + if (Array.isArray(params)) params = { $aggregate: params } + return aggregation$(collection, params) +} + function doc$ ($doc) { const promise = docSubscriptions.subscribe($doc) if (!promise) return $doc @@ -104,5 +126,16 @@ const ERRORS = { Got: ${aggregationFn.toString()} + `, + subServerAggregationCollection: ($signal, params) => ` + sub($$aggregation, params): + Server-side aggregation function must receive the collection name from the params. + Make sure you pass the collection name as $collection in the params object + when running aggregation from the server code: + sub($$aggregation, { $collection: 'collectionName', ...actualParams }) + + Got: + Aggregation: ${$signal} + Params: ${params} ` } diff --git a/packages/teamplay/test/sub$.js b/packages/teamplay/test/sub$.js index 265ec08..d92977e 100644 --- a/packages/teamplay/test/sub$.js +++ b/packages/teamplay/test/sub$.js @@ -252,10 +252,10 @@ describe('$sub() function. Aggregations', () => { afterEachTestGc() it('subscribe to aggregation, modify it', async () => { - const $$activeGames = aggregation(gamesCollection, ({ active }) => { + const $$activeGames = aggregation(({ active }) => { return [{ $match: { active } }] }) - const $activeGames = await sub($$activeGames, { active: true }) + const $activeGames = await sub($$activeGames, { $collection: gamesCollection, active: true }) assert.equal($activeGames.get().length, 2) assert.deepEqual( sanitizeAggregations(_get(['$aggregations'])),