Skip to content

Commit

Permalink
add utilities for route generation
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Jul 25, 2024
1 parent bfc0dc5 commit 2fee597
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 110 deletions.
5 changes: 5 additions & 0 deletions .changeset/fair-planes-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': minor
---

Added utility functions for registering routes
5 changes: 3 additions & 2 deletions packages/rsocket-router/src/router/ReactiveSocketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
* to expose reactive websocket stream in an interface similar to
* other Journey micro routers.
*/
import { errors, logger } from '@powersync/lib-services-framework';
import * as http from 'http';
import { Payload, RSocketServer } from 'rsocket-core';
import * as ws from 'ws';
import { SocketRouterObserver } from './SocketRouterListener.js';
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
import {
CommonParams,
IReactiveStream,
Expand All @@ -15,8 +17,6 @@ import {
ReactiveSocketRouterOptions,
SocketResponder
} from './types.js';
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
import { errors, logger } from '@powersync/lib-services-framework';

export class ReactiveSocketRouter<C> {
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}
Expand Down Expand Up @@ -56,6 +56,7 @@ export class ReactiveSocketRouter<C> {
acceptor: {
accept: async (payload) => {
const { max_concurrent_connections } = this.options ?? {};
logger.info(`Currently have ${wss.clients.size} active WebSocket connection(s)`);
// wss.clients.size includes this connection, so we check for greater than
// TODO: Share connection limit between this and http stream connections
if (max_concurrent_connections && wss.clients.size > max_concurrent_connections) {
Expand Down
102 changes: 102 additions & 0 deletions packages/service-core/src/routes/configure-fastify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import type fastify from 'fastify';
import { registerFastifyRoutes } from './route-register.js';

import * as system from '../system/system-index.js';

import { ADMIN_ROUTES } from './endpoints/admin.js';
import { CHECKPOINT_ROUTES } from './endpoints/checkpointing.js';
import { DEV_ROUTES } from './endpoints/dev.js';
import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js';
import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js';
import { createRequestQueueHook, CreateRequestQueueParams } from './hooks.js';
import { RouteDefinition } from './router.js';

/**
* A list of route definitions to be registered as endpoints.
* Supplied concurrency limits will be applied to the grouped routes.
*/
export type RouteRegistrationOptions = {
routes: RouteDefinition[];
queueOptions: CreateRequestQueueParams;
};

/**
* HTTP routes separated by API and Sync stream categories.
* This allows for separate concurrency limits.
*/
export type RouteDefinitions = {
api?: Partial<RouteRegistrationOptions>;
syncStream?: Partial<RouteRegistrationOptions>;
};

export type FastifyServerConfig = {
system: system.CorePowerSyncSystem;
routes?: RouteDefinitions;
};

export const DEFAULT_ROUTE_OPTIONS = {
api: {
routes: [...ADMIN_ROUTES, ...CHECKPOINT_ROUTES, ...DEV_ROUTES, ...SYNC_RULES_ROUTES],
queueOptions: {
concurrency: 10,
max_queue_depth: 20
}
},
syncStream: {
routes: [...SYNC_STREAM_ROUTES],
queueOptions: {
concurrency: 200,
max_queue_depth: 0
}
}
};

/**
* Registers default routes on a Fastify server. Consumers can optionally configure
* concurrency queue limits or override routes.
*/
export function configureFastifyServer(server: fastify.FastifyInstance, options: FastifyServerConfig) {
const { system, routes = DEFAULT_ROUTE_OPTIONS } = options;
/**
* Fastify creates an encapsulated context for each `.register` call.
* Creating a separate context here to separate the concurrency limits for Admin APIs
* and Sync Streaming routes.
* https://github.com/fastify/fastify/blob/main/docs/Reference/Encapsulation.md
*/
server.register(async function (childContext) {
registerFastifyRoutes(
childContext,
async () => {
return {
user_id: undefined,
system: system
};
},
routes.api?.routes ?? DEFAULT_ROUTE_OPTIONS.api.routes
);
// Limit the active concurrent requests
childContext.addHook(
'onRequest',
createRequestQueueHook(routes.api?.queueOptions ?? DEFAULT_ROUTE_OPTIONS.api.queueOptions)
);
});

// Create a separate context for concurrency queueing
server.register(async function (childContext) {
registerFastifyRoutes(
childContext,
async () => {
return {
user_id: undefined,
system: system
};
},
routes.syncStream?.routes ?? DEFAULT_ROUTE_OPTIONS.syncStream.routes
);
// Limit the active concurrent requests
childContext.addHook(
'onRequest',
createRequestQueueHook(routes.syncStream?.queueOptions ?? DEFAULT_ROUTE_OPTIONS.syncStream.queueOptions)
);
});
}
59 changes: 59 additions & 0 deletions packages/service-core/src/routes/configure-rsocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { deserialize } from 'bson';
import * as http from 'http';

import { errors, logger } from '@powersync/lib-services-framework';
import { ReactiveSocketRouter, RSocketRequestMeta } from '@powersync/service-rsocket-router';

import { CorePowerSyncSystem } from '../system/CorePowerSyncSystem.js';
import { generateContext, getTokenFromHeader } from './auth.js';
import { syncStreamReactive } from './endpoints/socket-route.js';
import { RSocketContextMeta, SocketRouteGenerator } from './router-socket.js';
import { Context } from './router.js';

export type RSockerRouterConfig = {
system: CorePowerSyncSystem;
server: http.Server;
routeGenerators?: SocketRouteGenerator[];
};

export const DEFAULT_SOCKET_ROUTES = [syncStreamReactive];

export function configureRSocket(router: ReactiveSocketRouter<Context>, options: RSockerRouterConfig) {
const { routeGenerators = DEFAULT_SOCKET_ROUTES, server, system } = options;

router.applyWebSocketEndpoints(server, {
contextProvider: async (data: Buffer) => {
const { token } = RSocketContextMeta.decode(deserialize(data) as any);

if (!token) {
throw new errors.AuthorizationError('No token provided');
}

try {
const extracted_token = getTokenFromHeader(token);
if (extracted_token != null) {
const { context, errors: token_errors } = await generateContext(system, extracted_token);
if (context?.token_payload == null) {
throw new errors.AuthorizationError(token_errors ?? 'Authentication required');
}
return {
token,
...context,
token_errors: token_errors,
system
};
} else {
throw new errors.AuthorizationError('No token provided');
}
} catch (ex) {
logger.error(ex);
throw ex;
}
},
endpoints: routeGenerators.map((generator) => generator(router)),
metaDecoder: async (meta: Buffer) => {
return RSocketRequestMeta.decode(deserialize(meta) as any);
},
payloadDecoder: async (rawData?: Buffer) => rawData && deserialize(rawData)
});
}
8 changes: 1 addition & 7 deletions packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@ import { RequestParameters } from '@powersync/service-sync-rules';
import { serialize } from 'bson';

import { Metrics } from '../../metrics/Metrics.js';
import { RequestTracker } from '../../sync/RequestTracker.js';
import { streamResponse } from '../../sync/sync.js';
import * as util from '../../util/util-index.js';
import { SocketRouteGenerator } from '../router-socket.js';
import { SyncRoutes } from './sync-stream.js';
import { RequestTracker } from '../../sync/RequestTracker.js';

export const syncStreamReactive: SocketRouteGenerator = (router) =>
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
authorize: ({ context }) => {
return {
authorized: !!context.token_payload,
errors: ['Authentication required'].concat(context.token_errors ?? [])
};
},
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
handler: async ({ context, params, responder, observer, initialN }) => {
const { system } = context;
Expand Down
4 changes: 2 additions & 2 deletions packages/service-core/src/routes/route-register.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import fastify from 'fastify';
import type fastify from 'fastify';

import { errors, router, HTTPMethod, logger } from '@powersync/lib-services-framework';
import { errors, HTTPMethod, logger, router } from '@powersync/lib-services-framework';
import { Context, ContextProvider, RequestEndpoint, RequestEndpointHandlerPayload } from './router.js';

export type FastifyEndpoint<I, O, C> = RequestEndpoint<I, O, C> & {
Expand Down
10 changes: 5 additions & 5 deletions packages/service-core/src/routes/router-socket.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { IReactiveStream, ReactiveSocketRouter } from '@powersync/service-rsocket-router';
import * as t from 'ts-codec';
import { ReactiveSocketRouter, IReactiveStream } from '@powersync/service-rsocket-router';

import { Context } from './router.js';

export const RSocketContextMeta = t.object({
token: t.string
});

/**
* Creates a socket route handler given a router instance
*/
export type SocketRouteGenerator = (router: ReactiveSocketRouter<Context>) => IReactiveStream;

export const RSocketContextMeta = t.object({
token: t.string
});
2 changes: 2 additions & 0 deletions packages/service-core/src/routes/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export type RequestEndpointHandlerPayload<
request: Request;
};

export type RouteDefinition<I = any, O = any> = RequestEndpoint<I, O>;

/**
* Helper function for making generics work well when defining routes
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/service-core/src/routes/routes-index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
export * as auth from './auth.js';
export * from './configure-fastify.js';
export * from './configure-rsocket.js';
export * as endpoints from './endpoints/route-endpoints-index.js';
export * as hooks from './hooks.js';
export * from './route-register.js';
Expand Down
1 change: 1 addition & 0 deletions packages/service-core/src/sync/sync-index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from './BroadcastIterable.js';
export * from './LastValueSink.js';
export * from './merge.js';
export * from './RequestTracker.js';
export * from './safeRace.js';
export * from './sync.js';
export * from './util.js';
99 changes: 5 additions & 94 deletions service/src/runners/server.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { deserialize } from 'bson';
import fastify from 'fastify';
import cors from '@fastify/cors';
import { container, logger } from '@powersync/lib-services-framework';
import * as core from '@powersync/service-core';
import { container, errors, logger } from '@powersync/lib-services-framework';
import { RSocketRequestMeta } from '@powersync/service-rsocket-router';
import fastify from 'fastify';

import { PowerSyncSystem } from '../system/PowerSyncSystem.js';
import { SocketRouter } from '../routes/router.js';
import { PowerSyncSystem } from '../system/PowerSyncSystem.js';
/**
* Starts an API server
*/
Expand All @@ -18,60 +16,6 @@ export async function startServer(runnerConfig: core.utils.RunnerConfig) {

const server = fastify.fastify();

/**
* Fastify creates an encapsulated context for each `.register` call.
* Creating a separate context here to separate the concurrency limits for Admin APIs
* and Sync Streaming routes.
* https://github.com/fastify/fastify/blob/main/docs/Reference/Encapsulation.md
*/
server.register(async function (childContext) {
core.routes.registerFastifyRoutes(
childContext,
async () => {
return {
user_id: undefined,
system: system
};
},
[
...core.routes.endpoints.ADMIN_ROUTES,
...core.routes.endpoints.CHECKPOINT_ROUTES,
...core.routes.endpoints.DEV_ROUTES,
...core.routes.endpoints.SYNC_RULES_ROUTES
]
);
// Limit the active concurrent requests
childContext.addHook(
'onRequest',
core.routes.hooks.createRequestQueueHook({
max_queue_depth: 20,
concurrency: 10
})
);
});

// Create a separate context for concurrency queueing
server.register(async function (childContext) {
core.routes.registerFastifyRoutes(
childContext,
async () => {
return {
user_id: undefined,
system: system
};
},
[...core.routes.endpoints.SYNC_STREAM_ROUTES]
);
// Limit the active concurrent requests
childContext.addHook(
'onRequest',
core.routes.hooks.createRequestQueueHook({
max_queue_depth: 0,
concurrency: 200
})
);
});

server.register(cors, {
origin: '*',
allowedHeaders: ['Content-Type', 'Authorization'],
Expand All @@ -80,41 +24,8 @@ export async function startServer(runnerConfig: core.utils.RunnerConfig) {
maxAge: 3600
});

SocketRouter.applyWebSocketEndpoints(server.server, {
contextProvider: async (data: Buffer) => {
const { token } = core.routes.RSocketContextMeta.decode(deserialize(data) as any);

if (!token) {
throw new errors.AuthorizationError('No token provided');
}

try {
const extracted_token = core.routes.auth.getTokenFromHeader(token);
if (extracted_token != null) {
const { context, errors: token_errors } = await core.routes.auth.generateContext(system, extracted_token);
if (context?.token_payload == null) {
throw new errors.AuthorizationError(token_errors ?? 'Authentication required');
}
return {
token,
...context,
token_errors: token_errors,
system
};
} else {
throw new errors.AuthorizationError('No token provided');
}
} catch (ex) {
logger.error(ex);
throw ex;
}
},
endpoints: [core.routes.endpoints.syncStreamReactive(SocketRouter)],
metaDecoder: async (meta: Buffer) => {
return RSocketRequestMeta.decode(deserialize(meta) as any);
},
payloadDecoder: async (rawData?: Buffer) => rawData && deserialize(rawData)
});
core.routes.configureFastifyServer(server, { system });
core.routes.configureRSocket(SocketRouter, { server: server.server, system });

logger.info('Starting system');
await system.start();
Expand Down

0 comments on commit 2fee597

Please sign in to comment.