Skip to content

Commit

Permalink
Add invalidation endpoint and close connections if auth fails (#18)
Browse files Browse the repository at this point in the history
Co-authored-by: David Bosschaert <[email protected]>
  • Loading branch information
karlpauls and bosschaert authored Feb 20, 2024
1 parent 1840ee2 commit 525297e
Show file tree
Hide file tree
Showing 4 changed files with 546 additions and 59 deletions.
56 changes: 53 additions & 3 deletions src/edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import { setupWSConnection } from './shareddoc.js';
import { invalidateFromAdmin, setupWSConnection } from './shareddoc.js';

// This is the Edge Worker, built using Durable Objects!

Expand Down Expand Up @@ -42,9 +42,37 @@ async function handleErrors(request, func) {
}
}

async function handleApiRequest(request, env) {
async function syncAdmin(url, request, env) {
const doc = url.searchParams.get('doc');
if (!doc) {
return new Response('Bad', { status: 400 });
}

// eslint-disable-next-line no-console
console.log('Room name:', doc);
const id = env.rooms.idFromName(doc);
const roomObject = env.rooms.get(id);

return roomObject.fetch(new URL(`${doc}?api=syncAdmin`));
}

async function handleApiCall(url, request, env) {
switch (url.pathname) {
case '/api/v1/syncadmin':
return syncAdmin(url, request, env);
default:
return new Response('Bad Request', { status: 400 });
}
}

export async function handleApiRequest(request, env) {
// We've received at API request.
const auth = new URL(request.url).searchParams.get('Authorization');
const url = new URL(request.url);
if (url.pathname.startsWith('/api/')) {
return handleApiCall(url, request, env);
}

const auth = url.searchParams.get('Authorization');

// We need to massage the path somewhat because on connections from localhost safari sends
// a path with only one slash for some reason.
Expand Down Expand Up @@ -130,11 +158,33 @@ export class DocRoom {
this.env = env;
}

static async handleApiCall(url, request) {
const qidx = request.url.indexOf('?');
const baseURL = request.url.substring(0, qidx);

const api = url.searchParams.get('api');
switch (api) {
case 'syncAdmin':
if (await invalidateFromAdmin(baseURL)) {
return new Response('OK', { status: 200 });
} else {
return new Response('Not Found', { status: 404 });
}
default:
return new Response('Invalid API', { status: 400 });
}
}

// The system will call fetch() whenever an HTTP request is sent to this Object. Such requests
// can only be sent from other Worker code, such as the code above; these requests don't come
// directly from the internet. In the future, we will support other formats than HTTP for these
// communications, but we started with HTTP for its familiarity.
async fetch(request) {
const url = new URL(request.url);
if (url.search.startsWith('?api=')) {
return DocRoom.handleApiCall(url, request);
}

if (request.headers.get('Upgrade') !== 'websocket') {
return new Response('expected websocket', { status: 400 });
}
Expand Down
167 changes: 112 additions & 55 deletions src/shareddoc.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,6 @@ const wsReadyStateOpen = 1;
// disable gc when using snapshots!
const gcEnabled = false;

const persistence = {
bindState: async (docName, ydoc, conn) => {
const persistedYdoc = new Y.Doc();
const aemMap = persistedYdoc.getMap('aem');
const initalOpts = {};
if (conn.auth) {
initalOpts.headers = new Headers({ Authorization: conn.auth });
}
const initialReq = await fetch(docName, initalOpts);
if (initialReq.ok) {
aemMap.set('initial', await initialReq.text());
} else if (initialReq.status === 404) {
aemMap.set('initial', '');
} else {
// eslint-disable-next-line no-console
console.log(`unable to get resource: ${initialReq.status} - ${initialReq.statusText}`);
throw new Error(`unable to get resource - status: ${initialReq.status}`);
}

Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
let last = aemMap.get('initial');
ydoc.on('update', debounce(async () => {
try {
const content = ydoc.getMap('aem').get('content');
if (last !== content) {
last = content;
const blob = new Blob([content], { type: 'text/html' });

const formData = new FormData();
formData.append('data', blob);

const opts = { method: 'PUT', body: formData };
const auth = Array.from(ydoc.conns.keys())
.map((con) => con.auth);

if (auth.length > 0) {
opts.headers = new Headers({ Authorization: [...new Set(auth)].join(',') });
}

const put = await fetch(docName, opts);
if (!put.ok) {
throw new Error(`${put.status} - ${put.statusText}`);
}
// eslint-disable-next-line no-console
console.log(content);
}
} catch (err) {
// eslint-disable-next-line no-console
console.error(err);
ydoc.emit('error', [err]);
}
}, 2000, 10000));
},
};

const docs = new Map();

const messageSync = 0;
Expand All @@ -103,6 +48,106 @@ const send = (doc, conn, m) => {
}
};

export const persistence = {
fetch: fetch.bind(this),
closeConn: closeConn.bind(this),
get: async (docName, auth) => {
const initalOpts = {};
if (auth) {
initalOpts.headers = new Headers({ Authorization: auth });
}
const initialReq = await persistence.fetch(docName, initalOpts);
if (initialReq.ok) {
return initialReq.text();
} else if (initialReq.status === 404) {
return '';
} else {
// eslint-disable-next-line no-console
console.log(`unable to get resource: ${initialReq.status} - ${initialReq.statusText}`);
throw new Error(`unable to get resource - status: ${initialReq.status}`);
}
},
put: async (ydoc, content) => {
const blob = new Blob([content], { type: 'text/html' });

const formData = new FormData();
formData.append('data', blob);

const opts = { method: 'PUT', body: formData };
const auth = Array.from(ydoc.conns.keys())
.map((con) => con.auth);

if (auth.length > 0) {
opts.headers = new Headers({
Authorization: [...new Set(auth)].join(','),
'X-Initiator': 'collab',
});
}

const { ok, status, statusText } = await persistence.fetch(ydoc.name, opts);

return {
ok,
status,
statusText,
};
},
invalidate: async (ydoc) => {
const auth = Array.from(ydoc.conns.keys())
.map((con) => con.auth);
const authHeader = auth.length > 0 ? [...new Set(auth)].join(',') : undefined;

const svrContent = await persistence.get(ydoc.name, authHeader);
const aemMap = ydoc.getMap('aem');
const cliContent = aemMap.get('content');
if (svrContent !== cliContent) {
// Only update the client if they're different
aemMap.set('svrinv', svrContent);
}
},
update: async (ydoc, current) => {
let closeAll = false;
try {
const content = ydoc.getMap('aem').get('content');
if (current !== content) {
const { ok, status, statusText } = await persistence.put(ydoc, content);

if (!ok) {
closeAll = status === 401;
throw new Error(`${status} - ${statusText}`);
}
// eslint-disable-next-line no-console
console.log(content);
return content;
}
} catch (err) {
// eslint-disable-next-line no-console
console.error(err);
ydoc.emit('error', [err]);
}
if (closeAll) {
// We had an unauthorized from da-admin - lets reset the connections
Array.from(ydoc.conns.keys())
.forEach((con) => persistence.closeConn(ydoc, con));
}
return current;
},
bindState: async (docName, ydoc, conn) => {
const persistedYdoc = new Y.Doc();
const aemMap = persistedYdoc.getMap('aem');

let current = await persistence.get(docName, conn.auth);

aemMap.set('initial', current);

Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));

ydoc.on('update', debounce(async () => {
current = await persistence.update(ydoc, current);
}, 2000, 10000));
},
};

export const updateHandler = (update, _origin, doc) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
Expand Down Expand Up @@ -160,6 +205,9 @@ const getYDoc = async (docname, conn, gc = true) => {
return doc;
};

// For testing
export const setYDoc = (docname, ydoc) => docs.set(docname, ydoc);

const messageListener = (conn, doc, message) => {
try {
const encoder = encoding.createEncoder();
Expand Down Expand Up @@ -192,6 +240,15 @@ const messageListener = (conn, doc, message) => {
}
};

export const invalidateFromAdmin = async (docName) => {
const ydoc = docs.get(docName);
if (ydoc) {
await persistence.invalidate(ydoc);
return true;
}
return false;
};

export const setupWSConnection = async (conn, docName) => {
// eslint-disable-next-line no-param-reassign
conn.binaryType = 'arraybuffer';
Expand Down
Loading

0 comments on commit 525297e

Please sign in to comment.