Skip to content

Commit

Permalink
feat: allow running aggregations on server side (#6)
Browse files Browse the repository at this point in the history
cray0000 authored Sep 6, 2024

Verified

This commit was signed with the committer’s verified signature.
1 parent 108c0d3 commit 26892c6
Showing 4 changed files with 50 additions and 16 deletions.
8 changes: 4 additions & 4 deletions packages/backend/features/serverAggregate.js
Original file line number Diff line number Diff line change
@@ -15,10 +15,10 @@ export default function serverAggregate (backend, { models = {}, ...options } =
collection,
aggregationName,
(queryParams, shareRequest) => {
const session = shareRequest.agent.connectSession
// TODO: rewrite to use $ here, or create a separate root $ for each user
// const model = global.__clients[userId].model
const context = { session, collection }
const session = shareRequest.agent.connectSession || {}
const isServer = shareRequest.agent.stream?.isServer
// should match the context in teamplay/orm/sub.js
const context = { session, collection, isServer }
return aggregation(queryParams, context)
}
)
13 changes: 7 additions & 6 deletions packages/server-aggregate/server.js
Original file line number Diff line number Diff line change
@@ -17,19 +17,20 @@ export default (backend, { customCheck } = {}) => {
const handleQuery = async (shareRequest) => {
const { query, collection } = shareRequest

const queryName = query.$aggregationName
let queryParams = query.$params

if (query.$aggregate) {
const { stream } = shareRequest.agent
// allow any aggregations initiated from the server code
if (stream?.isServer && !stream?.checkServerAccess) return
// deny any direct aggregations made from the client
throw new ShareDBAccessError(ERR_ACCESS_ONLY_SERVER_AGGREATE, `
access denied - only server-queries for $aggregate are allowed
access denied - only server-queries for $aggregate are allowed from the client
collection: '${collection}',
query: \n${JSON.stringify(query, null, 2)}
`)
}
if (!queryName && !queryParams) return

queryParams = queryParams || {}
const { $aggregationName: queryName, $params: queryParams = {} } = query
if (!queryName && !queryParams) return

const queryFunction = QUERIES[collection + '.' + queryName]

41 changes: 37 additions & 4 deletions packages/teamplay/orm/sub.js
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ import Signal, { SEGMENTS, isPublicCollectionSignal, isPublicDocumentSignal } fr
import { docSubscriptions } from './Doc.js'
import { querySubscriptions, getQuerySignal } from './Query.js'
import { aggregationSubscriptions, getAggregationSignal } from './Aggregation.js'
import isServer from '../utils/isServer.js'

export default function sub ($signal, params) {
// TODO: temporarily disable support for multiple subscriptions
@@ -20,24 +21,45 @@ export default function sub ($signal, params) {
if (arguments.length !== 2) throw Error(ERRORS.subQueryArguments(...arguments))
return query$($signal[SEGMENTS][0], params)
} else if (isClientAggregationFunction($signal)) {
params = $signal(sanitizeAggregationParams(params))
if (Array.isArray(params)) params = { $aggregate: params }
return aggregation$($signal.collection, params)
return getAggregationFromFunction($signal, $signal.collection, params)
} else if (isAggregationHeader($signal)) {
params = {
$aggregationName: $signal.name,
$params: sanitizeAggregationParams(params)
}
return aggregation$($signal.collection, params)
} else if (isAggregationFunction($signal)) {
throw Error(ERRORS.gotAggregationFunction($signal))
if (isServer) {
if (!params?.$collection) throw Error(ERRORS.subServerAggregationCollection($signal, params))
params = { ...params }
const collection = params.$collection
delete params.$collection
return getAggregationFromFunction($signal, collection, params)
} else {
throw Error(ERRORS.gotAggregationFunction($signal))
}
} else if (typeof $signal === 'function' && !($signal instanceof Signal)) {
return api$($signal, params)
} else {
throw Error('Invalid args passed for sub()')
}
}

function getAggregationFromFunction (fn, collection, params) {
params = sanitizeAggregationParams(params) // clones it, so mutation becomes safe
let session
if (params.$session) {
session = params.$session
delete params.$session
}
session ??= {}
// should match the context in @teamplay/backend/features/serverAggregate.js
const context = { collection, session, isServer }
params = fn(params, context)
if (Array.isArray(params)) params = { $aggregate: params }
return aggregation$(collection, params)
}

function doc$ ($doc) {
const promise = docSubscriptions.subscribe($doc)
if (!promise) return $doc
@@ -104,5 +126,16 @@ const ERRORS = {
Got:
${aggregationFn.toString()}
`,
subServerAggregationCollection: ($signal, params) => `
sub($$aggregation, params):
Server-side aggregation function must receive the collection name from the params.
Make sure you pass the collection name as $collection in the params object
when running aggregation from the server code:
sub($$aggregation, { $collection: 'collectionName', ...actualParams })
Got:
Aggregation: ${$signal}
Params: ${params}
`
}
4 changes: 2 additions & 2 deletions packages/teamplay/test/sub$.js
Original file line number Diff line number Diff line change
@@ -252,10 +252,10 @@ describe('$sub() function. Aggregations', () => {
afterEachTestGc()

it('subscribe to aggregation, modify it', async () => {
const $$activeGames = aggregation(gamesCollection, ({ active }) => {
const $$activeGames = aggregation(({ active }) => {
return [{ $match: { active } }]
})
const $activeGames = await sub($$activeGames, { active: true })
const $activeGames = await sub($$activeGames, { $collection: gamesCollection, active: true })
assert.equal($activeGames.get().length, 2)
assert.deepEqual(
sanitizeAggregations(_get(['$aggregations'])),

0 comments on commit 26892c6

Please sign in to comment.