diff --git a/src/service.js b/src/service.js index 947c25b..a542a2f 100644 --- a/src/service.js +++ b/src/service.js @@ -47,6 +47,44 @@ module.exports = function(mixinOptions) { }; }, }, + context: { + visibility: "private", + tracing: false, + handler(ctx) { + const { req, connection } = ctx.params; + let context = { + dataLoaders: new Map(), // create an empty map to load DataLoader instances into + }; + + if (req) { + context = { + ...context, + ctx: req.$ctx, + service: req.$service, + params: req.$params, + }; + } else if (connection) { + context = { + ...context, + ctx, + connectionCtx: connection.context.$ctx, + service: connection.context.$service, + params: connection.context.$params, + }; + } else { + throw new Error("Unrecognized request type for context action"); + } + + return context; + }, + }, + actionOptions: { + visibility: "private", + tracing: false, + handler() { + return {}; + }, + }, }, events: { @@ -95,7 +133,7 @@ module.exports = function(mixinOptions) { if (service.version != null) return ( - (typeof service.version == "number" + (typeof service.version === "number" ? "v" + service.version : service.version) + "." + @@ -155,10 +193,18 @@ module.exports = function(mixinOptions) { params: staticParams = {}, rootParams = {}, fileUploadArg = null, + fieldName = "", + typeName = "", } = def; const rootKeys = Object.keys(rootParams); return async (root, args, context) => { + // Record the span if possible + let operationSpan; + if (context.ctx) + operationSpan = context.ctx.startSpan(`GQL ${typeName} ${fieldName}`); + let result; + try { if (useDataLoader) { const dataLoaderMapKey = this.getDataLoaderMapKey( @@ -205,33 +251,41 @@ module.exports = function(mixinOptions) { } if (dataLoaderKey == null) { - return null; + result = null; + } else { + result = Array.isArray(dataLoaderKey) + ? await dataLoader.loadMany(dataLoaderKey) + : await dataLoader.load(dataLoaderKey); } - - return Array.isArray(dataLoaderKey) - ? await dataLoader.loadMany(dataLoaderKey) - : await dataLoader.load(dataLoaderKey); } else if (fileUploadArg != null && args[fileUploadArg] != null) { if (Array.isArray(args[fileUploadArg])) { - return await Promise.all( + result = await Promise.all( args[fileUploadArg].map(async uploadPromise => { const { createReadStream, ...$fileInfo } = await uploadPromise; const stream = createReadStream(); - return context.ctx.call(actionName, stream, { - meta: { $fileInfo }, - }); + return context.ctx.call( + actionName, + stream, + { meta: { $fileInfo } }, + await this.actions.actionOptions(root, args, context) + ); }) ); + } else { + const { createReadStream, ...$fileInfo } = await args[ + fileUploadArg + ]; + const stream = createReadStream(); + result = await context.ctx.call( + actionName, + stream, + { meta: { $fileInfo } }, + await this.actions.actionOptions(root, args, context) + ); } - - const { createReadStream, ...$fileInfo } = await args[fileUploadArg]; - const stream = createReadStream(); - return await context.ctx.call(actionName, stream, { - meta: { $fileInfo }, - }); } else { const params = {}; if (root && rootKeys) { @@ -240,12 +294,17 @@ module.exports = function(mixinOptions) { }); } - return await context.ctx.call( + result = await context.ctx.call( actionName, - _.defaultsDeep({}, args, params, staticParams) + _.defaultsDeep({}, args, params, staticParams), + {}, + await this.actions.actionOptions(root, args, context) ); } + if (operationSpan) operationSpan.finish(); + return result; } catch (err) { + if (operationSpan) operationSpan.finish(); if (nullIfError) { return null; } @@ -324,17 +383,29 @@ module.exports = function(mixinOptions) { subscribe: filter ? withFilter( () => this.pubsub.asyncIterator(tags), - async (payload, params, { ctx }) => - payload !== undefined - ? ctx.call(filter, { ...params, payload }) - : false + async (payload, params) => { + return payload !== undefined + ? this.createAsyncIteratorContext().call(filter, { + ...params, + payload, + }) + : false; + } ) : () => this.pubsub.asyncIterator(tags), - resolve: (payload, params, { ctx }) => - ctx.call(actionName, { ...params, payload }), + resolve: (payload, params) => { + return this.createAsyncIteratorContext().call(actionName, { + ...params, + payload, + }); + }, }; }, + createAsyncIteratorContext() { + return this.broker.ContextFactory.create(this.broker, null, {}, {}); + }, + /** * Generate GraphQL Schema * @@ -442,7 +513,11 @@ module.exports = function(mixinOptions) { const name = this.getFieldName(query); queries.push(query); resolver.Query[name] = this.createActionResolver( - action.name + action.name, + { + typeName: "query", + fieldName: name, + } ); }); } @@ -457,6 +532,8 @@ module.exports = function(mixinOptions) { action.name, { fileUploadArg: def.fileUploadArg, + typeName: "mutation", + fieldName: name, } ); }); @@ -607,20 +684,7 @@ module.exports = function(mixinOptions) { this.apolloServer = new ApolloServer({ schema, ..._.defaultsDeep({}, mixinOptions.serverOptions, { - context: ({ req, connection }) => ({ - ...(req - ? { - ctx: req.$ctx, - service: req.$service, - params: req.$params, - } - : { - ctx: connection.context.$ctx, - service: connection.context.$service, - params: connection.context.$params, - }), - dataLoaders: new Map(), // create an empty map to load DataLoader instances into - }), + context: integrationContext => this.actions.context(integrationContext), subscriptions: { onConnect: (connectionParams, socket) => this.actions.ws({ connectionParams, socket }), diff --git a/test/unit/service.spec.js b/test/unit/service.spec.js index 78aad45..b213ab7 100644 --- a/test/unit/service.spec.js +++ b/test/unit/service.spec.js @@ -473,11 +473,16 @@ describe("Test Service", () => { expect(res).toBe("response from action"); expect(ctx.call).toBeCalledTimes(1); - expect(ctx.call).toBeCalledWith("posts.find", { - a: 5, - id: 12345, - repl: false, - }); + expect(ctx.call).toBeCalledWith( + "posts.find", + { + a: 5, + id: 12345, + repl: false, + }, + {}, + {} + ); }); it("should throw error", async () => { @@ -494,13 +499,20 @@ describe("Test Service", () => { const fakeRoot = { author: 12345 }; - expect(resolver(fakeRoot, { a: 5 }, { ctx })).rejects.toThrow("Something happened"); + await expect(resolver(fakeRoot, { a: 5 }, { ctx })).rejects.toThrow( + "Something happened" + ); expect(ctx.call).toBeCalledTimes(1); - expect(ctx.call).toBeCalledWith("posts.find", { - limit: 5, - a: 5, - }); + expect(ctx.call).toBeCalledWith( + "posts.find", + { + limit: 5, + a: 5, + }, + {}, + {} + ); }); it("should not throw error if nullIfError is true", async () => { @@ -524,13 +536,18 @@ describe("Test Service", () => { expect(res).toBeNull(); expect(ctx.call).toBeCalledTimes(1); - expect(ctx.call).toBeCalledWith("posts.find", { - id: 12345, - company: { - code: "Moleculer", + expect(ctx.call).toBeCalledWith( + "posts.find", + { + id: 12345, + company: { + code: "Moleculer", + }, + a: 5, }, - a: 5, - }); + {}, + {} + ); }); }); @@ -568,15 +585,20 @@ describe("Test Service", () => { expect(res).toBe("response from action"); expect(ctx.call).toBeCalledTimes(1); - expect(ctx.call).toBeCalledWith("posts.uploadSingle", "fake read stream", { - meta: { - $fileInfo: { - filename: "filename.txt", - encoding: "7bit", - mimetype: "text/plain", + expect(ctx.call).toBeCalledWith( + "posts.uploadSingle", + "fake read stream", + { + meta: { + $fileInfo: { + filename: "filename.txt", + encoding: "7bit", + mimetype: "text/plain", + }, }, }, - }); + {} + ); }); it("should invoke call once per file when handling an array of file uploads", async () => { @@ -612,24 +634,34 @@ describe("Test Service", () => { ]); expect(ctx.call).toBeCalledTimes(2); - expect(ctx.call).toBeCalledWith("posts.uploadMulti", "fake read stream 1", { - meta: { - $fileInfo: { - filename: "filename1.txt", - encoding: "7bit", - mimetype: "text/plain", + expect(ctx.call).toBeCalledWith( + "posts.uploadMulti", + "fake read stream 1", + { + meta: { + $fileInfo: { + filename: "filename1.txt", + encoding: "7bit", + mimetype: "text/plain", + }, }, }, - }); - expect(ctx.call).toBeCalledWith("posts.uploadMulti", "fake read stream 2", { - meta: { - $fileInfo: { - filename: "filename2.txt", - encoding: "7bit", - mimetype: "text/plain", + {} + ); + expect(ctx.call).toBeCalledWith( + "posts.uploadMulti", + "fake read stream 2", + { + meta: { + $fileInfo: { + filename: "filename2.txt", + encoding: "7bit", + mimetype: "text/plain", + }, }, }, - }); + {} + ); }); }); @@ -966,7 +998,8 @@ describe("Test Service", () => { // Test resolve const ctx = new Context(broker); ctx.call = jest.fn(async () => "action response"); - const res3 = await res.resolve({ a: 5 }, { b: "John" }, { ctx }); + svc.createAsyncIteratorContext = jest.fn(() => ctx); + const res3 = await res.resolve({ a: 5 }, { b: "John" }); expect(res3).toBe("action response"); expect(ctx.call).toBeCalledTimes(1); @@ -1004,24 +1037,25 @@ describe("Test Service", () => { }); // Test first function - const ctx = new Context(broker); - expect(res.subscribe[0](undefined, undefined, { ctx })).toBe("iterator-result"); + expect(res.subscribe[0](undefined, undefined)).toBe("iterator-result"); expect(svc.pubsub.asyncIterator).toBeCalledTimes(1); expect(svc.pubsub.asyncIterator).toBeCalledWith(["a", "b"]); // Test second function without payload - expect(await res.subscribe[1](undefined, undefined, { ctx })).toBe(false); + expect(await res.subscribe[1](undefined, undefined)).toBe(false); // Test second function with payload + const ctx = svc.createAsyncIteratorContext(); ctx.call = jest.fn(async () => "action response"); - expect(await res.subscribe[1]({ a: 5 }, { b: "John" }, { ctx })).toBe( - "action response" - ); + svc.createAsyncIteratorContext = jest.fn(() => ctx); + expect(await res.subscribe[1]({ a: 5 }, { b: "John" })).toBe("action response"); expect(ctx.call).toBeCalledTimes(1); expect(ctx.call).toBeCalledWith("posts.filter", { b: "John", payload: { a: 5 } }); }); + + it("should create new context on new event", () => {}); }); describe("Test 'generateGraphQLSchema'", () => { @@ -1449,7 +1483,7 @@ describe("Test Service", () => { const contextFn = ApolloServer.mock.calls[0][0].context; expect( - contextFn({ + await contextFn({ connection: { context: { $service: "service", @@ -1459,7 +1493,11 @@ describe("Test Service", () => { }, }) ).toEqual({ - ctx: "context", + // We expect the context to be replaced + ctx: expect.objectContaining({ + action: expect.objectContaining({ name: "api.context" }), + }), + connectionCtx: "context", dataLoaders: new Map(), params: { a: 5, @@ -1473,10 +1511,10 @@ describe("Test Service", () => { $params: { a: 5 }, }; expect( - contextFn({ + await contextFn({ req, connection: { - $service: "service", + $params: { b: 6 }, }, }) ).toEqual({ @@ -1535,5 +1573,217 @@ describe("Test Service", () => { await stop(); }); + + it("should have a new context for each unfiltered subscription event", async () => { + const { broker, svc, stop } = await startService({ + serverOptions: { + path: "/my-graphql", + }, + }); + broker.call = jest.fn(async () => "action response"); + + const { resolve } = svc.createAsyncIteratorResolver("myAction"); + jest.spyOn(svc, "createAsyncIteratorContext"); + const originalCtx = { call: jest.fn() }; + const resCall = await resolve("test", {}, { ctx: originalCtx }); + expect(svc.createAsyncIteratorContext).toBeCalledTimes(1); + expect(originalCtx.call).not.toBeCalled(); + expect(resCall).toBe("action response"); + + await stop(); + }); + + it("should have a new context for each filtered subscription event", async () => { + const { broker, svc, stop } = await startService({ + serverOptions: { + path: "/my-graphql", + }, + }); + + broker.call = jest.fn(async () => "action response"); + + withFilter.mockImplementation((fn1, fn2) => [fn1, fn2]); + jest.spyOn(svc, "createAsyncIteratorContext"); + + const res = svc.createAsyncIteratorResolver("myAction", [], "myFilter"); + + const originalCtx = { call: jest.fn() }; + + expect(await res.subscribe[1]("test", {}, { ctx: originalCtx })).toBe( + "action response" + ); + + expect(svc.createAsyncIteratorContext).toBeCalledTimes(1); + expect(originalCtx.call).not.toBeCalled(); + + await stop(); + }); + }); + + describe("Test hooks", () => { + it("should call the `context` hook on every GQL operation", async () => { + const contextSpy = jest.fn(); + const { svc, stop } = await startService( + { serverOptions: { path: "/my-graphql" } }, + { + name: "api", + settings: { routes: [] }, + hooks: { after: { context: "contextTest" } }, + methods: { contextTest: contextSpy }, + } + ); + + // Non-Websocket + await svc.actions.context({ req: {} }); + expect(contextSpy).toHaveBeenCalledTimes(1); + + // Websocket + await svc.actions.context({ connection: { context: {} } }); + expect(contextSpy).toHaveBeenCalledTimes(2); + + await stop(); + }); + + it("should call the `ws` hook on initial websocket connection", async () => { + const wsSpy = jest.fn(); + const { svc, stop } = await startService( + { serverOptions: { path: "/my-graphql" } }, + { + name: "api", + settings: { routes: [] }, + hooks: { after: { ws: "ws" } }, + methods: { ws: wsSpy }, + } + ); + + await svc.actions.ws({ socket: { upgradeReq: {} } }); + expect(wsSpy).toHaveBeenCalledTimes(1); + + await stop(); + }); + + it("should call the `actionOptions` hook to get options for multi file upload operations", async () => { + const actionOptionsSpy = jest + .fn() + .mockName("actionOptionsSpy") + .mockImplementation(() => ({ my: "opt" })); + const { broker, svc, stop } = await startService( + { serverOptions: { path: "/my-graphql" } }, + { + name: "api", + settings: { routes: [] }, + hooks: { after: { actionOptions: "actionOptions" } }, + methods: { actionOptions: actionOptionsSpy }, + } + ); + const ctx = new Context(broker); + ctx.call = jest + .fn() + .mockResolvedValue(["response from action"]) + .mockName("context call mock"); + + const resolver = svc.createActionResolver("api", { fileUploadArg: "files" }); + + const createReadStream = jest.fn().mockName("createReadStream"); + + await resolver( + {}, + { + files: [ + (async () => ({ createReadStream, other: "stuff" }))(), + (async () => ({ createReadStream, other: "stuff2" }))(), + ], + }, + { ctx } + ); + expect(ctx.call).toHaveBeenCalledTimes(2); + expect(ctx.call).toHaveBeenCalledWith( + "api", + expect.objectContaining({}), + expect.objectContaining({ meta: expect.anything() }), + expect.objectContaining({ my: "opt" }) + ); + expect(createReadStream).toHaveBeenCalledTimes(2); + expect(actionOptionsSpy).toHaveBeenCalledTimes(2); + + await stop(); + }); + + it("should call the `actionOptions` hook to get options for single file upload operations", async () => { + const actionOptionsSpy = jest + .fn() + .mockName("actionOptionsSpy") + .mockImplementation(() => ({ my: "opt" })); + const { broker, svc, stop } = await startService( + { serverOptions: { path: "/my-graphql" } }, + { + name: "api", + settings: { routes: [] }, + hooks: { after: { actionOptions: "actionOptions" } }, + methods: { actionOptions: actionOptionsSpy }, + } + ); + const ctx = new Context(broker); + ctx.call = jest + .fn() + .mockResolvedValue(["response from action"]) + .mockName("context call mock"); + + const resolver = svc.createActionResolver("api", { fileUploadArg: "files" }); + + const createReadStream = jest.fn().mockName("createReadStream"); + + await resolver( + {}, + { files: (async () => ({ createReadStream, other: "stuff" }))() }, + { ctx } + ); + expect(ctx.call).toHaveBeenCalledTimes(1); + expect(ctx.call).toHaveBeenCalledWith( + "api", + expect.objectContaining({}), + expect.objectContaining({ meta: expect.anything() }), + expect.objectContaining({ my: "opt" }) + ); + expect(createReadStream).toHaveBeenCalledTimes(1); + expect(actionOptionsSpy).toHaveBeenCalledTimes(1); + + await stop(); + }); + + it("should call the `actionOptions` hook to get options for normal GQL operations", async () => { + const actionOptionsSpy = jest + .fn() + .mockName("actionOptionsSpy") + .mockImplementation(() => ({ my: "opt" })); + const { broker, svc, stop } = await startService( + { serverOptions: { path: "/my-graphql" } }, + { + name: "api", + settings: { routes: [] }, + hooks: { after: { actionOptions: "actionOptions" } }, + methods: { actionOptions: actionOptionsSpy }, + } + ); + const ctx = new Context(broker); + ctx.call = jest + .fn() + .mockResolvedValue(["response from action"]) + .mockName("context call mock"); + + const resolver = svc.createActionResolver("api", { fileUploadArg: "files" }); + + await resolver({}, {}, { ctx }); + expect(ctx.call).toHaveBeenCalledTimes(1); + expect(ctx.call).toHaveBeenCalledWith( + "api", + expect.objectContaining({}), + expect.objectContaining({}), + expect.objectContaining({ my: "opt" }) + ); + expect(actionOptionsSpy).toHaveBeenCalledTimes(1); + + await stop(); + }); }); });