diff --git a/packages/sanity/package.json b/packages/sanity/package.json index cbc8d39d151..3e514785e8e 100644 --- a/packages/sanity/package.json +++ b/packages/sanity/package.json @@ -252,6 +252,7 @@ "rimraf": "^3.0.2", "rxjs": "^7.8.0", "rxjs-exhaustmap-with-trailing": "^2.1.1", + "rxjs-mergemap-array": "^0.1.0", "sanity-diff-patch": "^3.0.2", "scroll-into-view-if-needed": "^3.0.3", "semver": "^7.3.5", diff --git a/packages/sanity/src/core/preview/createGlobalListener.ts b/packages/sanity/src/core/preview/createGlobalListener.ts index ac76417b386..7be43f93e31 100644 --- a/packages/sanity/src/core/preview/createGlobalListener.ts +++ b/packages/sanity/src/core/preview/createGlobalListener.ts @@ -19,6 +19,7 @@ export function createGlobalListener(client: SanityClient) { includePreviousRevision: false, includeMutations: false, visibility: 'query', + effectFormat: 'mendoza', tag: 'preview.global', }, ) diff --git a/packages/sanity/src/core/preview/createObserveDocument.ts b/packages/sanity/src/core/preview/createObserveDocument.ts new file mode 100644 index 00000000000..7cf8de5082e --- /dev/null +++ b/packages/sanity/src/core/preview/createObserveDocument.ts @@ -0,0 +1,89 @@ +import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client' +import {type SanityDocument} from '@sanity/types' +import {memoize, uniq} from 'lodash' +import {type RawPatch} from 'mendoza' +import {EMPTY, finalize, type Observable, of} from 'rxjs' +import {concatMap, map, scan, shareReplay} from 'rxjs/operators' + +import {type ApiConfig} from './types' +import {applyMutationEventEffects} from './utils/applyMendozaPatch' +import {debounceCollect} from './utils/debounceCollect' + +export function createObserveDocument({ + mutationChannel, + client, +}: { + client: SanityClient + mutationChannel: Observable +}) { + const getBatchFetcher = memoize( + function getBatchFetcher(apiConfig: {dataset: string; projectId: string}) { + const _client = client.withConfig(apiConfig) + + function batchFetchDocuments(ids: [string][]) { + return _client.observable + .fetch(`*[_id in $ids]`, {ids: uniq(ids.flat())}, {tag: 'preview.observe-document'}) + .pipe( + // eslint-disable-next-line max-nested-callbacks + map((result) => ids.map(([id]) => result.find((r: {_id: string}) => r._id === id))), + ) + } + return debounceCollect(batchFetchDocuments, 100) + }, + (apiConfig) => apiConfig.dataset + apiConfig.projectId, + ) + + const MEMO: Record> = {} + + function observeDocument(id: string, apiConfig?: ApiConfig) { + const _apiConfig = apiConfig || { + dataset: client.config().dataset!, + projectId: client.config().projectId!, + } + const fetchDocument = getBatchFetcher(_apiConfig) + return mutationChannel.pipe( + concatMap((event) => { + if (event.type === 'welcome') { + return fetchDocument(id).pipe(map((document) => ({type: 'sync' as const, document}))) + } + return event.documentId === id ? of(event) : EMPTY + }), + scan((current: SanityDocument | undefined, event) => { + if (event.type === 'sync') { + return event.document + } + if (event.type === 'mutation') { + return applyMutationEvent(current, event) + } + //@ts-expect-error - this should never happen + throw new Error(`Unexpected event type: "${event.type}"`) + }, undefined), + ) + } + return function memoizedObserveDocument(id: string, apiConfig?: ApiConfig) { + const key = apiConfig ? `${id}-${JSON.stringify(apiConfig)}` : id + if (!(key in MEMO)) { + MEMO[key] = observeDocument(id, apiConfig).pipe( + finalize(() => delete MEMO[key]), + shareReplay({bufferSize: 1, refCount: true}), + ) + } + return MEMO[key] + } +} + +function applyMutationEvent(current: SanityDocument | undefined, event: MutationEvent) { + if (event.previousRev !== current?._rev) { + console.warn('Document out of sync, skipping mutation') + return current + } + if (!event.effects) { + throw new Error( + 'Mutation event is missing effects. Is the listener set up with effectFormat=mendoza?', + ) + } + return applyMutationEventEffects( + current, + event as {effects: {apply: RawPatch}; previousRev: string; resultRev: string}, + ) +} diff --git a/packages/sanity/src/core/preview/documentPreviewStore.ts b/packages/sanity/src/core/preview/documentPreviewStore.ts index 154c7e56c26..9a3a6d862f0 100644 --- a/packages/sanity/src/core/preview/documentPreviewStore.ts +++ b/packages/sanity/src/core/preview/documentPreviewStore.ts @@ -1,14 +1,21 @@ -import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client' +import { + type MutationEvent, + type QueryParams, + type SanityClient, + type WelcomeEvent, +} from '@sanity/client' import {type PrepareViewOptions, type SanityDocument} from '@sanity/types' -import {type Observable} from 'rxjs' +import {combineLatest, type Observable} from 'rxjs' import {distinctUntilChanged, filter, map} from 'rxjs/operators' import {isRecord} from '../util' import {createPreviewAvailabilityObserver} from './availability' import {createGlobalListener} from './createGlobalListener' +import {createObserveDocument} from './createObserveDocument' import {createPathObserver} from './createPathObserver' import {createPreviewObserver} from './createPreviewObserver' import {createObservePathsDocumentPair} from './documentPair' +import {createDocumentIdSetObserver, type DocumentIdSetObserverState} from './liveDocumentIdSet' import {createObserveFields} from './observeFields' import { type ApiConfig, @@ -56,6 +63,43 @@ export interface DocumentPreviewStore { id: string, paths: PreviewPath[], ) => Observable> + + /** + * Observes a set of document IDs that matches the given groq-filter. The document ids are returned in ascending order and will update in real-time + * Whenever a document appears or disappears from the set, a new array with the updated set of IDs will be pushed to subscribers. + * The query is performed once, initially, and thereafter the set of ids are patched based on the `appear` and `disappear` + * transitions on the received listener events. + * This provides a lightweight way of subscribing to a list of ids for simple cases where you just want to subscribe to a set of documents ids + * that matches a particular filter. + * @hidden + * @beta + * @param filter - A groq filter to use for the document set + * @param params - Parameters to use with the groq filter + * @param options - Options for the observer + */ + unstable_observeDocumentIdSet: ( + filter: string, + params?: QueryParams, + options?: { + /** + * Where to insert new items into the set. Defaults to 'sorted' which is based on the lexicographic order of the id + */ + insert?: 'sorted' | 'prepend' | 'append' + }, + ) => Observable + + /** + * Observe a complete document with the given ID + * @hidden + * @beta + */ + unstable_observeDocument: (id: string) => Observable + /** + * Observe a list of complete documents with the given IDs + * @hidden + * @beta + */ + unstable_observeDocuments: (ids: string[]) => Observable<(SanityDocument | undefined)[]> } /** @internal */ @@ -79,6 +123,7 @@ export function createDocumentPreviewStore({ map((event) => (event.type === 'welcome' ? {type: 'connected' as const} : event)), ) + const observeDocument = createObserveDocument({client, mutationChannel: globalListener}) const observeFields = createObserveFields({client: versionedClient, invalidationChannel}) const observePaths = createPathObserver({observeFields}) @@ -92,6 +137,10 @@ export function createDocumentPreviewStore({ ) } + const observeDocumentIdSet = createDocumentIdSetObserver( + versionedClient.withConfig({apiVersion: '2024-07-22'}), + ) + const observeForPreview = createPreviewObserver({observeDocumentTypeFromId, observePaths}) const observeDocumentPairAvailability = createPreviewAvailabilityObserver( versionedClient, @@ -110,6 +159,10 @@ export function createDocumentPreviewStore({ observeForPreview, observeDocumentTypeFromId, + unstable_observeDocumentIdSet: observeDocumentIdSet, + unstable_observeDocument: observeDocument, + unstable_observeDocuments: (ids: string[]) => + combineLatest(ids.map((id) => observeDocument(id))), unstable_observeDocumentPairAvailability: observeDocumentPairAvailability, unstable_observePathsDocumentPair: observePathsDocumentPair, } diff --git a/packages/sanity/src/core/preview/liveDocumentIdSet.ts b/packages/sanity/src/core/preview/liveDocumentIdSet.ts new file mode 100644 index 00000000000..49d8401cde1 --- /dev/null +++ b/packages/sanity/src/core/preview/liveDocumentIdSet.ts @@ -0,0 +1,112 @@ +import {type QueryParams, type SanityClient} from '@sanity/client' +import {sortedIndex} from 'lodash' +import {of} from 'rxjs' +import {distinctUntilChanged, filter, map, mergeMap, scan, tap} from 'rxjs/operators' + +export type DocumentIdSetObserverState = { + status: 'reconnecting' | 'connected' + documentIds: string[] +} + +interface LiveDocumentIdSetOptions { + insert?: 'sorted' | 'prepend' | 'append' +} + +export function createDocumentIdSetObserver(client: SanityClient) { + return function observe( + queryFilter: string, + params?: QueryParams, + options: LiveDocumentIdSetOptions = {}, + ) { + const {insert: insertOption = 'sorted'} = options + + const query = `*[${queryFilter}]._id` + function fetchFilter() { + return client.observable + .fetch(query, params, { + tag: 'preview.observe-document-set.fetch', + }) + .pipe( + tap((result) => { + if (!Array.isArray(result)) { + throw new Error( + `Expected query to return array of documents, but got ${typeof result}`, + ) + } + }), + ) + } + return client.observable + .listen(query, params, { + visibility: 'transaction', + events: ['welcome', 'mutation', 'reconnect'], + includeResult: false, + includeMutations: false, + tag: 'preview.observe-document-set.listen', + }) + .pipe( + mergeMap((event) => { + return event.type === 'welcome' + ? fetchFilter().pipe(map((result) => ({type: 'fetch' as const, result}))) + : of(event) + }), + scan( + ( + state: DocumentIdSetObserverState | undefined, + event, + ): DocumentIdSetObserverState | undefined => { + if (event.type === 'reconnect') { + return { + documentIds: state?.documentIds || [], + ...state, + status: 'reconnecting' as const, + } + } + if (event.type === 'fetch') { + return {...state, status: 'connected' as const, documentIds: event.result} + } + if (event.type === 'mutation') { + if (event.transition === 'update') { + // ignore updates, as we're only interested in documents appearing and disappearing from the set + return state + } + if (event.transition === 'appear') { + return { + status: 'connected', + documentIds: insert(state?.documentIds || [], event.documentId, insertOption), + } + } + if (event.transition === 'disappear') { + return { + status: 'connected', + documentIds: state?.documentIds + ? state.documentIds.filter((id) => id !== event.documentId) + : [], + } + } + } + return state + }, + undefined, + ), + distinctUntilChanged(), + filter( + (state: DocumentIdSetObserverState | undefined): state is DocumentIdSetObserverState => + state !== undefined, + ), + ) + } +} + +function insert(array: T[], element: T, strategy: 'sorted' | 'prepend' | 'append') { + let index + if (strategy === 'prepend') { + index = 0 + } else if (strategy === 'append') { + index = array.length + } else { + index = sortedIndex(array, element) + } + + return array.toSpliced(index, 0, element) +} diff --git a/packages/sanity/src/core/preview/useLiveDocumentIdSet.ts b/packages/sanity/src/core/preview/useLiveDocumentIdSet.ts new file mode 100644 index 00000000000..2fa3eff626d --- /dev/null +++ b/packages/sanity/src/core/preview/useLiveDocumentIdSet.ts @@ -0,0 +1,47 @@ +import {type QueryParams} from '@sanity/client' +import {useMemo} from 'react' +import {useObservable} from 'react-rx' +import {scan} from 'rxjs/operators' + +import {useDocumentPreviewStore} from '../store/_legacy/datastores' +import {type DocumentIdSetObserverState} from './liveDocumentIdSet' + +const INITIAL_STATE = {status: 'loading' as const, documentIds: []} + +export type LiveDocumentSetState = + | {status: 'loading'; documentIds: string[]} + | DocumentIdSetObserverState + +/** + * @internal + * @beta + * Returns document ids that matches the provided GROQ-filter, and loading state + * The document ids are returned in ascending order and will update in real-time + * Whenever a document appears or disappears from the set, a new array with the updated set of IDs will be returned. + * This provides a lightweight way of subscribing to a list of ids for simple cases where you just want the documents ids + * that matches a particular filter. + */ +export function useLiveDocumentIdSet( + filter: string, + params?: QueryParams, + options: { + // how to insert new document ids. Defaults to `sorted` + insert?: 'sorted' | 'prepend' | 'append' + } = {}, +) { + const documentPreviewStore = useDocumentPreviewStore() + const observable = useMemo( + () => + documentPreviewStore.unstable_observeDocumentIdSet(filter, params, options).pipe( + scan( + (currentState: LiveDocumentSetState, nextState) => ({ + ...currentState, + ...nextState, + }), + INITIAL_STATE, + ), + ), + [documentPreviewStore, filter, params, options], + ) + return useObservable(observable, INITIAL_STATE) +} diff --git a/packages/sanity/src/core/preview/useLiveDocumentSet.ts b/packages/sanity/src/core/preview/useLiveDocumentSet.ts new file mode 100644 index 00000000000..16c5c27be24 --- /dev/null +++ b/packages/sanity/src/core/preview/useLiveDocumentSet.ts @@ -0,0 +1,34 @@ +import {type QueryParams} from '@sanity/client' +import {type SanityDocument} from '@sanity/types' +import {useMemo} from 'react' +import {useObservable} from 'react-rx' +import {map} from 'rxjs/operators' +import {mergeMapArray} from 'rxjs-mergemap-array' + +import {useDocumentPreviewStore} from '../store' + +const INITIAL_VALUE = {loading: true, documents: []} + +/** + * @internal + * @beta + * + * Observes a set of documents matching the filter and returns an array of complete documents + * A new array will be pushed whenever a document in the set changes + * Document ids are returned in ascending order + * Any sorting beyond that must happen client side + */ +export function useLiveDocumentSet( + groqFilter: string, + params?: QueryParams, +): {loading: boolean; documents: SanityDocument[]} { + const documentPreviewStore = useDocumentPreviewStore() + const observable = useMemo(() => { + return documentPreviewStore.unstable_observeDocumentIdSet(groqFilter, params).pipe( + map((state) => (state.documentIds || []) as string[]), + mergeMapArray((id) => documentPreviewStore.unstable_observeDocument(id)), + map((docs) => ({loading: false, documents: docs as SanityDocument[]})), + ) + }, [documentPreviewStore, groqFilter, params]) + return useObservable(observable, INITIAL_VALUE) +} diff --git a/packages/sanity/src/core/preview/utils/applyMendozaPatch.ts b/packages/sanity/src/core/preview/utils/applyMendozaPatch.ts new file mode 100644 index 00000000000..9644eb60c33 --- /dev/null +++ b/packages/sanity/src/core/preview/utils/applyMendozaPatch.ts @@ -0,0 +1,44 @@ +import {type SanityDocument} from '@sanity/types' +import {applyPatch, type RawPatch} from 'mendoza' + +function omitRev(document: SanityDocument | undefined) { + if (document === undefined) { + return undefined + } + const {_rev, ...doc} = document + return doc +} + +/** + * + * @param document - The document to apply the patch to + * @param patch - The mendoza patch to apply + * @param baseRev - The revision of the document that the patch is calculated from. This is used to ensure that the patch is applied to the correct revision of the document + */ +export function applyMendozaPatch( + document: SanityDocument | undefined, + patch: RawPatch, + baseRev: string, +): SanityDocument | undefined { + if (baseRev !== document?._rev) { + throw new Error( + 'Invalid document revision. The provided patch is calculated from a different revision than the current document', + ) + } + const next = applyPatch(omitRev(document), patch) + return next === null ? undefined : next +} + +export function applyMutationEventEffects( + document: SanityDocument | undefined, + event: {effects: {apply: RawPatch}; previousRev: string; resultRev: string}, +) { + if (!event.effects) { + throw new Error( + 'Mutation event is missing effects. Is the listener set up with effectFormat=mendoza?', + ) + } + const next = applyMendozaPatch(document, event.effects.apply, event.previousRev) + // next will be undefined in case of deletion + return next ? {...next, _rev: event.resultRev} : undefined +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bfe3d0c8b7f..1d931ce341a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1659,6 +1659,9 @@ importers: rxjs-exhaustmap-with-trailing: specifier: ^2.1.1 version: 2.1.1(rxjs@7.8.1) + rxjs-mergemap-array: + specifier: ^0.1.0 + version: 0.1.0(rxjs@7.8.1) sanity-diff-patch: specifier: ^3.0.2 version: 3.0.2 @@ -10019,6 +10022,12 @@ packages: peerDependencies: rxjs: 7.x + rxjs-mergemap-array@0.1.0: + resolution: {integrity: sha512-19fXxPXN4X8LPWu7fg/nyX+nr0G97qSNXhEvF32cdgWuoyUVQ4MrFr+UL4HGip6iO5kbZOL4puAjPeQ/D5qSlA==} + engines: {node: '>=18.0.0'} + peerDependencies: + rxjs: 7.x + rxjs@6.6.7: resolution: {integrity: sha512-hTdwr+7yYNIT5n4AMYp85KA6yw2Va0FLa3Rguvbpa4W3I5xynaBZo41cM3XM+4Q6fRMj3sBYIR1VAmZMXYJvRQ==} engines: {npm: '>=2.0.0'} @@ -21463,6 +21472,10 @@ snapshots: dependencies: rxjs: 7.8.1 + rxjs-mergemap-array@0.1.0(rxjs@7.8.1): + dependencies: + rxjs: 7.8.1 + rxjs@6.6.7: dependencies: tslib: 1.14.1