From e7288751163f2551ebf2069ea5ef6f66efadf32b Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 1 Nov 2024 15:56:35 +0000 Subject: [PATCH] Collections: 0.3.0 dev (#809) * collections: fix an issue where the query isn't sent to the server * collections: update GET request structure * chanteset * Implement custom stream parser * lint * collections: add mor flexibility to the stream parser * collections: write the cursor back * version: collections 0.3.0 * remove comment --- packages/collections/CHANGELOG.md | 10 ++ packages/collections/package.json | 2 +- packages/collections/src/collections.js | 151 +++++++++++++---- packages/collections/src/mock.js | 39 ++++- packages/collections/test/Adaptor.test.js | 159 +++++++++++++++++- packages/collections/test/mock/server.test.js | 44 +++-- packages/satusehat/ast.json | 53 ++++-- 7 files changed, 371 insertions(+), 87 deletions(-) diff --git a/packages/collections/CHANGELOG.md b/packages/collections/CHANGELOG.md index d1f83daf5..08fe91fdb 100644 --- a/packages/collections/CHANGELOG.md +++ b/packages/collections/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/language-collections +## 0.3.0 + +### Minor Changes + +- 1e472ed: Update new GET request structure Fix streaming API + +### Patch Changes + +- 32e5a03: Fix an issue where the query object isn't getting sent to the server + ## 0.2.0 ### Minor Changes diff --git a/packages/collections/package.json b/packages/collections/package.json index d50c80235..4c9c6cb8d 100644 --- a/packages/collections/package.json +++ b/packages/collections/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/language-collections", - "version": "0.2.0", + "version": "0.3.0", "description": "OpenFn collections adaptor", "type": "module", "exports": { diff --git a/packages/collections/src/collections.js b/packages/collections/src/collections.js index 43ff57743..d431c7eb9 100644 --- a/packages/collections/src/collections.js +++ b/packages/collections/src/collections.js @@ -5,6 +5,7 @@ import chain from 'stream-chain'; import parser from 'stream-json'; import Pick from 'stream-json/filters/Pick'; import streamArray from 'stream-json/streamers/StreamArray'; +import streamValues from 'stream-json/streamers/StreamValues'; import { createServer } from './mock'; @@ -37,6 +38,8 @@ export const setMockClient = mockClient => { * @property {string} createdAfter - matches values that were created after the end of the provided date * @property {string} updatedBefore - matches values that were updated before the start of the provided date * @property {string} updatedAfter - matches values that were updated after the end of the provided date* + * @property {number} limit - limit the maximum amount of results. Defaults to 1000. + * @property {string} cursor - set the cursor position to start searching from a specific index. */ /** @@ -44,11 +47,12 @@ export const setMockClient = mockClient => { * For large datasets, we recommend using each(), which streams data. * You can pass a specific key as a string to only fetch one item, or pass a query * with a key-pattern or a date filter. + * If not all matching values are returned, the cursor position is written to state.data * @public * @function * @param {string} name - The name of the collection to fetch from * @param {string|QueryOptions} query - A string key or key pattern (with wildcards '*') to fetch, or a query object - * @state data - the downloaded values as an array unless a specific string was specified + * @state data - the downloaded values as an array unless a specific key was specified, in which case state.data is the value * @example Get a specific value from a collection * collections.get('my-collection', '556e0a62') * @example Get a range of values from a collection with a key pattern @@ -64,34 +68,40 @@ export function get(name, query = {}) { } const { key, ...rest } = expandQuery(resolvedQuery); - const response = await request( - state, - getClient(state), - `${resolvedName}/${key}`, - { query: rest } - ); + let q; + let path = resolvedName; + if (key.match(/\*/) || Object.keys(rest).length) { + // request many + q = resolvedQuery; + } else { + // request one + path = `${resolvedName}/${key}`; + } + + const response = await request(state, getClient(state), path, { query: q }); let data; - if (!key.match(/\*/) || Object.keys(resolvedQuery).length === 0) { - // If one specific item was requested, write it straight to state.data - const body = await response.body.json(); - const item = body.items[0]; - if (item) { - item.value = JSON.parse(item.value); - } - data = item; - console.log(`Fetched "${key}" from collection "${name}"`); - } else { + if (q) { // build a response array data = []; console.log(`Downloading data from collection "${name}"...`); - await streamResponse(response, item => { + const cursor = await streamResponse(response, item => { item.value = JSON.parse(item.value); data.push(item); }); + data.cursor = cursor; console.log(`Fetched "${data.length}" values from collection "${name}"`); + } else { + // If one specific item was requested, write it straight to state.data + const body = await response.body.json(); + if (body.value) { + data = JSON.parse(body.value); + console.log(`Fetched "${key}" from collection "${name}"`); + } else { + data = {}; + console.warn(`Key "${key}" not found in collection "${name}"`); + } } - state.data = data; return state; }; @@ -211,6 +221,7 @@ export function remove(name, query = {}, options = {}) { * @param {string} name - The name of the collection to remove from * @param {string|QueryOptions} query - A string key or key pattern (with wildcards '*') to remove, or a query object * @param {function} callback - A callback invoked for each item `(state, value, key) => void` + * @state data.cursor - if values are still left on the server, a cursor string will be written to state.data * @example Iterate over a range of values with wildcards * collections.each('my-collection', 'record-2024*-appointment-*', (state, value, key) => { * state.cumulativeCost += value.cost; @@ -224,34 +235,100 @@ export function each(name, query = {}, callback = () => {}) { return async state => { const [resolvedName, resolvedQuery] = expandReferences(state, name, query); - const { key, ...rest } = expandQuery(resolvedQuery); + let q; + if (typeof resolvedQuery === 'string') { + q = { key: resolvedQuery }; + } else { + q = resolvedQuery; + } - const response = await request( - state, - getClient(state), - `${resolvedName}/${key}`, - { query: rest } - ); + const { key, ...rest } = expandQuery(resolvedQuery); + const response = await request(state, getClient(state), resolvedName, { + query: q, + }); + console.log(`each response`, response.statusCode) - await streamResponse(response, async ({ value, key }) => { + const cursor = await streamResponse(response, async ({ value, key }) => { await callback(state, JSON.parse(value), key); }); + state.data = { + cursor + }; + return state; }; } export const streamResponse = async (response, onValue) => { - const pipeline = chain([ - response.body, - parser(), - new Pick({ filter: 'items' }), - new streamArray(), - ]); - - for await (const { key, value } of pipeline) { - await onValue(value); + const pipeline = chain([response.body, parser()]); + + let isInsideItems = false; + let cursor; + const it = pipeline.iterator(); + + const waitFor = async (...names) => { + while (true) { + const next = await it.next(); + if (next.done) { + return; + } + if (names.includes(next.value.name)) { + return next; + } + } + }; + + for await (const token of it) { + // This block finds the cursor key and extracts it + if (!isInsideItems && token.name === 'startKey') { + const next = await waitFor('keyValue'); + + if (next.value.value === 'cursor') { + const strValue = await waitFor('stringChunk', 'nullValue'); + if (strValue.name === 'nullValue') { + continue + } + cursor = strValue.value.value; + } + + if (next.value.value === 'items') { + isInsideItems = true; + await waitFor('startArray'); + } + } + + // This lock will parse a key/value pair + // the streamer make a lot of assumptuions about this data structure + // So if it ever changes, we'll need to come back and modify it + // TODO can we leverage json-stream to just generically parse an object at this point? + if (isInsideItems && token.name === 'startObject') { + let key; + let value; + + while (!key || !value) { + const nextKey = await waitFor('keyValue'); + if (nextKey.value.value === 'key') { + key = await waitFor('stringValue'); + } + if (nextKey.value.value === 'value') { + value = await waitFor('stringValue'); + } + } + + await onValue({ + key: key.value.value, + value: value.value.value, + }); + + waitFor('endObject'); + } + if (isInsideItems && token.name === 'endArray') { + // This doesn't really matter but, just for the record, let's close out the array + isInsideItems = false; + } } + return cursor }; export const expandQuery = query => { @@ -299,7 +376,7 @@ export const request = (state, client, path, options = {}) => { Object.assign(headers, options?.headers); const { headers: _h, query: _q, ...otherOptions } = options; - const query = parseQuery(options.query); + const query = parseQuery(options); const args = { path: nodepath.join(basePath, path), headers, diff --git a/packages/collections/src/mock.js b/packages/collections/src/mock.js index 6770e2cf9..b4447cbac 100644 --- a/packages/collections/src/mock.js +++ b/packages/collections/src/mock.js @@ -16,8 +16,7 @@ export function API() { collections = api.collections = {}; }; - // Note that the mock allows data in any format, - // but the real API only takes strings + // This is a string store: values are expected to be strings const upsert = (name, key, value) => { if (!(name in collections)) { throw new Error(COLLECTION_NOT_FOUND); @@ -88,6 +87,8 @@ const parsePath = path => { // eslint-disable-next-line no-param-reassign path = `/${path}`; } + // eslint-disable-next-line no-param-reassign + path = path.split('?')[0]; let [_, _collections, name, key] = path.split('/'); return { name, key }; }; @@ -118,12 +119,38 @@ export function createServer(url = 'https://app.openfn.org') { try { let { name, key } = parsePath(req.path); - if (!key) { - key = '*'; + + let body; + let statusCode = 200; + + if (key) { + // get one + const result = api.byKey(name, key); + if (!result) { + body = {}; + statusCode = 204; + } else { + body = { + key, + value: result, + }; + } + } else { + // get many + + // TODO a little confused about undici's handling of query + const params = new URLSearchParams(req.query || req.path.split('?')[1]); + key = params.get('key') ?? '*'; + + const { items } = api.fetch(name, key); + body = { + cursor: ['xxx'], // TODO what will we do about cursor? + items, + }; } - const body = api.fetch(name, key); + return { - statusCode: 200, + statusCode, data: JSON.stringify(body), responseOptions: { headers: { 'Content-Type': 'application/json' }, diff --git a/packages/collections/test/Adaptor.test.js b/packages/collections/test/Adaptor.test.js index a632fd4c5..e7bb7beed 100644 --- a/packages/collections/test/Adaptor.test.js +++ b/packages/collections/test/Adaptor.test.js @@ -1,6 +1,7 @@ import { expect } from 'chai'; +import { MockAgent} from 'undici'; import { createServer } from '../src/mock.js'; -import { setMockClient } from '../src/collections.js'; +import { setMockClient, streamResponse } from '../src/collections.js'; import * as collections from '../src/collections.js'; const client = createServer(); @@ -70,7 +71,7 @@ describe('each', () => { expect(count).to.eql(3); }); - it('should iterate over some items', async () => { + it('should iterate over some items with a key pattern', async () => { const { state } = init([ ['az', { id: 'a' }], ['bz', { id: 'b' }], @@ -88,6 +89,24 @@ describe('each', () => { expect(count).to.eql(1); }); + it('should iterate over some with an object query', async () => { + const { state } = init([ + ['az', { id: 'a' }], + ['bz', { id: 'b' }], + ['cz', { id: 'c' }], + ]); + + let count = 0; + + await collections.each(COLLECTION, { key: 'b*' }, (_state, value, key) => { + count++; + expect(key).to.eql('bz'); + expect(value).to.eql({ id: 'b' }); + })(state); + + expect(count).to.eql(1); + }); + it('should support an async callback', async () => { const { state } = init([ ['az', { id: 'a' }], @@ -100,6 +119,17 @@ describe('each', () => { expect(Date.now() - start).to.be.greaterThan(100); }); + + it('should write the cursor back to state', async () => { + const { state } = init([ + ['az', { id: 'a' }], + ['cz', { id: 'c' }], + ]); + + await collections.each(COLLECTION, '*')(state); + + expect(state.data.cursor).to.equal('xxx') + }); }); describe('get', () => { @@ -122,8 +152,7 @@ describe('get', () => { const result = await collections.get(COLLECTION, 'x')(state); expect(result.data).to.eql({ - key: 'x', - value: { id: 'x' }, + id: 'x', }); }); @@ -133,8 +162,7 @@ describe('get', () => { const result = await collections.get(COLLECTION, { key: 'x' })(state); expect(result.data).to.eql({ - key: 'x', - value: { id: 'x' }, + id: 'x', }); }); @@ -170,6 +198,19 @@ describe('get', () => { ]); }); + it('should write the cursor to state', async () => { + const { state } = init([ + ['a-1', { id: 'a' }], + ['b-2', { id: 'b' }], + ['c-3', { id: 'c' }], + ]); + + const result = await collections.get(COLLECTION, 'b*')(state); + + // TODO this is a dummy mock cursor + expect(result.data.cursor).to.eql('xxx'); + }); + it('should expand references', async () => { const { state } = init([ ['a-1', { id: 'a' }], @@ -312,3 +353,109 @@ describe('utils', () => { }); }); }); + +describe('streamResponse', () => { + let client; + + before(() => { + const mockAgent = new MockAgent({ connections: 1 }); + mockAgent.disableNetConnect(); + client = mockAgent.get('https://app.openfn.org'); + }); + + it('should stream a response with an item and cursor', async () => { + client.intercept({ path: '/collections/my-collection' }).reply(200, { + cursor: 'b', + items: [{ + key: 'a', + value: "str" + }], + }); + + + const response = await client.request({ + method: 'GET', + path: '/collections/my-collection' + }); + + let callbackValue; + const cursor = await streamResponse(response, ({ key, value }) => { + callbackValue = value; + }) + + expect(callbackValue).to.eql('str') + expect(cursor).to.equal('b') + }) + + it('should stream a response with an item and null cursor', async () => { + client.intercept({ path: '/collections/my-collection' }).reply(200, { + cursor: null, + items: [{ + key: 'a', + value: "str" + }], + }); + + + const response = await client.request({ + method: 'GET', + path: '/collections/my-collection' + }); + + let callbackValue; + const cursor = await streamResponse(response, ({ key, value }) => { + callbackValue = value; + }) + + expect(callbackValue).to.eql('str') + expect(cursor).to.equal(null) + }) + + it('should handle the cursor key coming last', async () => { + client.intercept({ path: '/collections/my-collection' }).reply(200, { + items: [{ + key: 'a', + value: "str" + }], + cursor: 'b', + }); + + + const response = await client.request({ + method: 'GET', + path: '/collections/my-collection' + }); + + let callbackValue; + const cursor = await streamResponse(response, ({ key, value }) => { + callbackValue = value; + }) + + expect(callbackValue).to.eql('str') + expect(cursor).to.equal('b') + }) + + it('should handle key value pairs in a different order', async () => { + client.intercept({ path: '/collections/my-collection' }).reply(200, { + items: [{ + value: "str", + key: 'a', + }], + cursor: 'b', + }); + + + const response = await client.request({ + method: 'GET', + path: '/collections/my-collection' + }); + + let callbackValue; + const cursor = await streamResponse(response, ({ key, value }) => { + callbackValue = value; + }) + + expect(callbackValue).to.eql('str') + expect(cursor).to.equal('b') + }) +}) \ No newline at end of file diff --git a/packages/collections/test/mock/server.test.js b/packages/collections/test/mock/server.test.js index 0f51853e0..559127e1a 100644 --- a/packages/collections/test/mock/server.test.js +++ b/packages/collections/test/mock/server.test.js @@ -41,62 +41,60 @@ describe('GET', () => { expect(response.statusCode).to.equal(403); }); - it('should consume all results as a stream', async () => { + it('/collection/name/key should return a single item', async () => { api.createCollection('my-collection'); api.upsert('my-collection', 'x', { id: 'x' }); - api.upsert('my-collection', 'y', { id: 'y' }); - api.upsert('my-collection', 'z', { id: 'z' }); const response = await request({ method: 'GET', - path: 'collections/my-collection', + path: 'collections/my-collection/x', }); - const results = []; - await streamResponse(response, item => results.push(item)); + const item = await response.body.json(); - expect(results).to.eql([ - { key: 'x', value: { id: 'x' } }, - { key: 'y', value: { id: 'y' } }, - { key: 'z', value: { id: 'z' } }, - ]); + expect(response.statusCode).to.eql(200); + expect(item).to.eql({ key: 'x', value: { id: 'x' } }); }); - it('should consume a single result as a stream', async () => { + it('/collection/name should stream all results', async () => { api.createCollection('my-collection'); - api.upsert('my-collection', 'x', { id: 'x' }); - api.upsert('my-collection', 'y', { id: 'y' }); - api.upsert('my-collection', 'z', { id: 'z' }); + api.upsert('my-collection', 'x', 'xx'); + api.upsert('my-collection', 'y', 'yy'); + api.upsert('my-collection', 'z', 'zz'); const response = await request({ method: 'GET', - path: 'collections/my-collection/y', + path: 'collections/my-collection', }); const results = []; await streamResponse(response, item => results.push(item)); - expect(results).to.eql([{ key: 'y', value: { id: 'y' } }]); + expect(results).to.eql([ + { key: 'x', value: 'xx' }, + { key: 'y', value: 'yy' }, + { key: 'z', value: 'zz' }, + ]); }); - it('should consume results as a stream with a wildcard', async () => { + it('/collection/name?key=* should stream some results', async () => { api.createCollection('my-collection'); - api.upsert('my-collection', 'ax', { id: 'x' }); - api.upsert('my-collection', 'ay', { id: 'y' }); - api.upsert('my-collection', 'az', { id: 'z' }); + api.upsert('my-collection', 'ax', 'x'); + api.upsert('my-collection', 'ay', 'y'); + api.upsert('my-collection', 'az', 'z'); const response = await request({ method: 'GET', - path: 'collections/my-collection/*z', + path: 'collections/my-collection?key=*z', }); const results = []; await streamResponse(response, item => results.push(item)); - expect(results).to.eql([{ key: 'az', value: { id: 'z' } }]); + expect(results).to.eql([{ key: 'az', value: 'z' }]); }); }); diff --git a/packages/satusehat/ast.json b/packages/satusehat/ast.json index 6da6c2c98..339166aa4 100644 --- a/packages/satusehat/ast.json +++ b/packages/satusehat/ast.json @@ -8,7 +8,7 @@ "callback" ], "docs": { - "description": "Make a GET request to Satusehat", + "description": "Make a GET request to Satusehat. Use this to fetch resources directly from the Satusehat REST API.\nYou can pass Satusehat query parameters as an object of key value pairs, which will map to parameters\nin the URL.", "tags": [ { "title": "public", @@ -17,7 +17,13 @@ }, { "title": "example", - "description": "get(\"Organization\", {\"name\": \"somename\"})" + "description": "get(\"Organization/abcde\")", + "caption": "Get a resource by Id. Equivalent to GET `/Organization/abcde`" + }, + { + "title": "example", + "description": "get('/Patient', {\n identifier:'https://fhir.kemkes.go.id/id/nik|9271060312000001'\n});", + "caption": "Get resources with a query. Equivalent to GET `/Patient?identifier=https://fhir.kemkes.go.id/id/nik|9271060312000001`" }, { "title": "function", @@ -35,7 +41,7 @@ }, { "title": "param", - "description": "Optional request params such as name.", + "description": "Optional object of query parameters to include in the request", "type": { "type": "NameExpression", "name": "object" @@ -58,6 +64,10 @@ "type": "NameExpression", "name": "Operation" } + }, + { + "title": "state", + "description": "{SatusehatHttpState}" } ] }, @@ -72,11 +82,12 @@ "callback" ], "docs": { - "description": "Make a POST request to Satusehat", + "description": "Make a POST request to Satusehat. Use this to send resources directly to Satusehat REST API.\nYou can pass Satusehat body data as a JSON FHIR object.", "tags": [ { "title": "example", - "description": "post(\n \"Organization\",\n { \"resourceType\": \"Organization\", \"active\": true,\n }\n);" + "description": "post(\n \"Encounter\",\n { \"resourceType\": \"Encounter\",\n ...state.data,\n }\n);", + "caption": "Create a resource encounter" }, { "title": "function", @@ -99,7 +110,7 @@ }, { "title": "param", - "description": "Object or JSON which defines data that will be used to create a given instance of resource", + "description": "JSON FHIR object to create a resource", "type": { "type": "NameExpression", "name": "object" @@ -108,7 +119,7 @@ }, { "title": "param", - "description": "Optional request params.", + "description": "Optional object of query parameters to include in the request", "type": { "type": "NameExpression", "name": "Object" @@ -134,6 +145,10 @@ "type": "NameExpression", "name": "Operation" } + }, + { + "title": "state", + "description": "{SatusehatHttpState}" } ] }, @@ -148,11 +163,12 @@ "callback" ], "docs": { - "description": "Make a PUT request to Satusehat", + "description": "Make a PUT request to Satusehat. Use this to directly update resources on Satusehat REST API.\nYou can pass Satusehat body data as a JSON FHIR object. You can also pass Satusehat query parameters as an object of key value pairs, which will map to parameters\nin the URL.", "tags": [ { "title": "example", - "description": "put(\n \"Organization/123\",\n { \"resourceType\": \"Organization\", \"active\": false,\n }\n);" + "description": "put(\n \"Organization/abcde\",\n { \"resourceType\": \"Organization\", \"active\": false,\n }\n);", + "caption": "Update a resource. Equivalent to PUT `/Organization/abcde`" }, { "title": "function", @@ -175,7 +191,7 @@ }, { "title": "param", - "description": "Object or JSON which defines data that will be used to update a given instance of resource", + "description": "JSON FHIR object to update the resource", "type": { "type": "NameExpression", "name": "object" @@ -184,7 +200,7 @@ }, { "title": "param", - "description": "Optional request params.", + "description": "Optional object of query parameters to include in the request", "type": { "type": "NameExpression", "name": "Object" @@ -210,6 +226,10 @@ "type": "NameExpression", "name": "Operation" } + }, + { + "title": "state", + "description": "{SatusehatHttpState}" } ] }, @@ -224,11 +244,12 @@ "callback" ], "docs": { - "description": "Make a PATCH request to Satusehat", + "description": "Make a PATCH request to Satusehat. Use this to directly update resources on Satusehat REST API.\nYou can pass Satusehat an array of objects which contains `op`, `path`, and `value` as the body. You can also pass Satusehat query parameters as an object of key value pairs, which will map to parameters\nin the URL.", "tags": [ { "title": "example", - "description": "patch(\n \"Organization/123\",\n [{\n\"op\": \"replace\", // Operation - `replace` is the only one used to change a specific property or element\n \"path\": \"/language\", // Path - The name of property/element of resource to be replaced\n \"value\": \"id\" // Value- The value to be replaced\n}]\n\n);" + "description": "patch(\n \"Organization/abcde\",\n [{\n\"op\": \"replace\", // Operation - `replace` is the only one used to change a specific property or element\n \"path\": \"/language\", // Path - The name of property/element of resource to be replaced\n \"value\": \"id\" // Value- The value to be replaced\n}]\n\n);", + "caption": "Update a resource. Equivalent to PATCH `/Organization/abcde`" }, { "title": "function", @@ -260,7 +281,7 @@ }, { "title": "param", - "description": "Optional request params.", + "description": "Optional object of query parameters to include in the request.", "type": { "type": "NameExpression", "name": "Object" @@ -286,6 +307,10 @@ "type": "NameExpression", "name": "Operation" } + }, + { + "title": "state", + "description": "{SatusehatHttpState}" } ] },