Skip to content

Commit

Permalink
Collections: 0.3.0 dev (#809)
Browse files Browse the repository at this point in the history
* collections: fix an issue where the query isn't sent to the server

* collections: update GET request structure

* chanteset

* Implement custom stream parser

* lint

* collections: add mor flexibility to the stream parser

* collections: write the cursor back

* version: collections 0.3.0

* remove comment
  • Loading branch information
josephjclark authored Nov 1, 2024
1 parent c165da2 commit e728875
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 87 deletions.
10 changes: 10 additions & 0 deletions packages/collections/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/language-collections

## 0.3.0

### Minor Changes

- 1e472ed: Update new GET request structure Fix streaming API

### Patch Changes

- 32e5a03: Fix an issue where the query object isn't getting sent to the server

## 0.2.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/collections/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/language-collections",
"version": "0.2.0",
"version": "0.3.0",
"description": "OpenFn collections adaptor",
"type": "module",
"exports": {
Expand Down
151 changes: 114 additions & 37 deletions packages/collections/src/collections.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import chain from 'stream-chain';
import parser from 'stream-json';
import Pick from 'stream-json/filters/Pick';
import streamArray from 'stream-json/streamers/StreamArray';
import streamValues from 'stream-json/streamers/StreamValues';

import { createServer } from './mock';

Expand Down Expand Up @@ -37,18 +38,21 @@ export const setMockClient = mockClient => {
* @property {string} createdAfter - matches values that were created after the end of the provided date
* @property {string} updatedBefore - matches values that were updated before the start of the provided date
* @property {string} updatedAfter - matches values that were updated after the end of the provided date*
* @property {number} limit - limit the maximum amount of results. Defaults to 1000.
* @property {string} cursor - set the cursor position to start searching from a specific index.
*/

/**
* Fetch one or more values from a collection.
* For large datasets, we recommend using each(), which streams data.
* You can pass a specific key as a string to only fetch one item, or pass a query
* with a key-pattern or a date filter.
* If not all matching values are returned, the cursor position is written to state.data
* @public
* @function
* @param {string} name - The name of the collection to fetch from
* @param {string|QueryOptions} query - A string key or key pattern (with wildcards '*') to fetch, or a query object
* @state data - the downloaded values as an array unless a specific string was specified
* @state data - the downloaded values as an array unless a specific key was specified, in which case state.data is the value
* @example <caption>Get a specific value from a collection</caption>
* collections.get('my-collection', '556e0a62')
* @example <caption>Get a range of values from a collection with a key pattern</caption>
Expand All @@ -64,34 +68,40 @@ export function get(name, query = {}) {
}
const { key, ...rest } = expandQuery(resolvedQuery);

const response = await request(
state,
getClient(state),
`${resolvedName}/${key}`,
{ query: rest }
);
let q;
let path = resolvedName;
if (key.match(/\*/) || Object.keys(rest).length) {
// request many
q = resolvedQuery;
} else {
// request one
path = `${resolvedName}/${key}`;
}

const response = await request(state, getClient(state), path, { query: q });

let data;
if (!key.match(/\*/) || Object.keys(resolvedQuery).length === 0) {
// If one specific item was requested, write it straight to state.data
const body = await response.body.json();
const item = body.items[0];
if (item) {
item.value = JSON.parse(item.value);
}
data = item;
console.log(`Fetched "${key}" from collection "${name}"`);
} else {
if (q) {
// build a response array
data = [];
console.log(`Downloading data from collection "${name}"...`);
await streamResponse(response, item => {
const cursor = await streamResponse(response, item => {
item.value = JSON.parse(item.value);
data.push(item);
});
data.cursor = cursor;
console.log(`Fetched "${data.length}" values from collection "${name}"`);
} else {
// If one specific item was requested, write it straight to state.data
const body = await response.body.json();
if (body.value) {
data = JSON.parse(body.value);
console.log(`Fetched "${key}" from collection "${name}"`);
} else {
data = {};
console.warn(`Key "${key}" not found in collection "${name}"`);
}
}

state.data = data;
return state;
};
Expand Down Expand Up @@ -211,6 +221,7 @@ export function remove(name, query = {}, options = {}) {
* @param {string} name - The name of the collection to remove from
* @param {string|QueryOptions} query - A string key or key pattern (with wildcards '*') to remove, or a query object
* @param {function} callback - A callback invoked for each item `(state, value, key) => void`
* @state data.cursor - if values are still left on the server, a cursor string will be written to state.data
* @example <caption>Iterate over a range of values with wildcards</caption>
* collections.each('my-collection', 'record-2024*-appointment-*', (state, value, key) => {
* state.cumulativeCost += value.cost;
Expand All @@ -224,34 +235,100 @@ export function each(name, query = {}, callback = () => {}) {
return async state => {
const [resolvedName, resolvedQuery] = expandReferences(state, name, query);

const { key, ...rest } = expandQuery(resolvedQuery);
let q;
if (typeof resolvedQuery === 'string') {
q = { key: resolvedQuery };
} else {
q = resolvedQuery;
}

const response = await request(
state,
getClient(state),
`${resolvedName}/${key}`,
{ query: rest }
);
const { key, ...rest } = expandQuery(resolvedQuery);
const response = await request(state, getClient(state), resolvedName, {
query: q,
});
console.log(`each response`, response.statusCode)

await streamResponse(response, async ({ value, key }) => {
const cursor = await streamResponse(response, async ({ value, key }) => {
await callback(state, JSON.parse(value), key);
});

state.data = {
cursor
};

return state;
};
}

export const streamResponse = async (response, onValue) => {
const pipeline = chain([
response.body,
parser(),
new Pick({ filter: 'items' }),
new streamArray(),
]);

for await (const { key, value } of pipeline) {
await onValue(value);
const pipeline = chain([response.body, parser()]);

let isInsideItems = false;
let cursor;
const it = pipeline.iterator();

const waitFor = async (...names) => {
while (true) {
const next = await it.next();
if (next.done) {
return;
}
if (names.includes(next.value.name)) {
return next;
}
}
};

for await (const token of it) {
// This block finds the cursor key and extracts it
if (!isInsideItems && token.name === 'startKey') {
const next = await waitFor('keyValue');

if (next.value.value === 'cursor') {
const strValue = await waitFor('stringChunk', 'nullValue');
if (strValue.name === 'nullValue') {
continue
}
cursor = strValue.value.value;
}

if (next.value.value === 'items') {
isInsideItems = true;
await waitFor('startArray');
}
}

// This lock will parse a key/value pair
// the streamer make a lot of assumptuions about this data structure
// So if it ever changes, we'll need to come back and modify it
// TODO can we leverage json-stream to just generically parse an object at this point?
if (isInsideItems && token.name === 'startObject') {
let key;
let value;

while (!key || !value) {
const nextKey = await waitFor('keyValue');
if (nextKey.value.value === 'key') {
key = await waitFor('stringValue');
}
if (nextKey.value.value === 'value') {
value = await waitFor('stringValue');
}
}

await onValue({
key: key.value.value,
value: value.value.value,
});

waitFor('endObject');
}
if (isInsideItems && token.name === 'endArray') {
// This doesn't really matter but, just for the record, let's close out the array
isInsideItems = false;
}
}
return cursor
};

export const expandQuery = query => {
Expand Down Expand Up @@ -299,7 +376,7 @@ export const request = (state, client, path, options = {}) => {
Object.assign(headers, options?.headers);

const { headers: _h, query: _q, ...otherOptions } = options;
const query = parseQuery(options.query);
const query = parseQuery(options);
const args = {
path: nodepath.join(basePath, path),
headers,
Expand Down
39 changes: 33 additions & 6 deletions packages/collections/src/mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ export function API() {
collections = api.collections = {};
};

// Note that the mock allows data in any format,
// but the real API only takes strings
// This is a string store: values are expected to be strings
const upsert = (name, key, value) => {
if (!(name in collections)) {
throw new Error(COLLECTION_NOT_FOUND);
Expand Down Expand Up @@ -88,6 +87,8 @@ const parsePath = path => {
// eslint-disable-next-line no-param-reassign
path = `/${path}`;
}
// eslint-disable-next-line no-param-reassign
path = path.split('?')[0];
let [_, _collections, name, key] = path.split('/');
return { name, key };
};
Expand Down Expand Up @@ -118,12 +119,38 @@ export function createServer(url = 'https://app.openfn.org') {

try {
let { name, key } = parsePath(req.path);
if (!key) {
key = '*';

let body;
let statusCode = 200;

if (key) {
// get one
const result = api.byKey(name, key);
if (!result) {
body = {};
statusCode = 204;
} else {
body = {
key,
value: result,
};
}
} else {
// get many

// TODO a little confused about undici's handling of query
const params = new URLSearchParams(req.query || req.path.split('?')[1]);
key = params.get('key') ?? '*';

const { items } = api.fetch(name, key);
body = {
cursor: ['xxx'], // TODO what will we do about cursor?
items,
};
}
const body = api.fetch(name, key);

return {
statusCode: 200,
statusCode,
data: JSON.stringify(body),
responseOptions: {
headers: { 'Content-Type': 'application/json' },
Expand Down
Loading

0 comments on commit e728875

Please sign in to comment.