Skip to content

Commit

Permalink
Merge pull request #64 from moleculerjs/context
Browse files Browse the repository at this point in the history
Context-based handlers
  • Loading branch information
icebob authored Feb 20, 2023
2 parents fcab05d + 0beefb0 commit edf022b
Show file tree
Hide file tree
Showing 14 changed files with 628 additions and 23 deletions.
3 changes: 2 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
"program": "examples/index.js",
"cwd": "${workspaceRoot}",
"args": [
"headers"
"context"
],
"console": "integratedTerminal",
"env": {
//"ADAPTER": "kafka://localhost:9093"
}
Expand Down
133 changes: 130 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

Reliable messages for Moleculer services via external queue/channel/topic. Unlike moleculer built-in events, this is **not** a fire-and-forget solution. It's a persistent, durable and reliable message sending solution. The module uses an external message queue/streaming server that stores messages until they are successfully processed. It supports consumer groups, which means that you can run multiple instances of consumer services, incoming messages will be balanced between them.

**This project is a work-in-progress. Don't use it in production.**
<!-- **This project is a work-in-progress. Don't use it in production.**
> **FAQ**: When it's going to be production safe? **Response**: `moleculer-channels` is a wrapper around well-known and battle tested tech (Redis Streams, Kafka, AMQP and NATS JetStream) so there shouldn't be any critical issues. Feature wise this module is pretty much complete. In order for us to remove the "work-in-progress" label we need people to test it and check if the wrapper is working properly and that it's stable.
> **FAQ**: When it's going to be production safe? **Response**: `moleculer-channels` is a wrapper around well-known and battle tested tech (Redis Streams, Kafka, AMQP and NATS JetStream) so there shouldn't be any critical issues. Feature wise this module is pretty much complete. In order for us to remove the "work-in-progress" label we need people to test it and check if the wrapper is working properly and that it's stable. -->

## Features

Expand Down Expand Up @@ -188,6 +188,7 @@ module.exports = {
| `schemaProperty` | `String` | `"channels"` | Name of the property in service schema. |
| `sendMethodName` | `String` | `"sendToChannel"` | Name of the method in ServiceBroker to send message to the channels. |
| `adapterPropertyName` | `String` | `"channelAdapter"` | Name of the property in ServiceBroker to access the `Adapter` instance directly. |
| `context` | `boolean` | `false` | Using Moleculer context in channel handlers by default. |

**Examples**

Expand Down Expand Up @@ -218,6 +219,8 @@ module.exports = {
| `maxRetries` | `Number` | \* | Maximum number of retries before sending the message to dead-letter-queue or drop. |
| `deadLettering.enabled` | `Boolean` | \* | Enable "Dead-lettering" feature. |
| `deadLettering.queueName` | `String` | \* | Name of dead-letter queue. |
| `context` | `boolean` | \* | Using Moleculer context in channel handlers. |
| `tracing` | `Object` | \* | Tracing options same as [action tracing options](https://moleculer.services/docs/0.14/tracing.html#Customizing). It works only with `context: true`. |
| `handler` | `Function(payload: any, rawMessage: any)` | \* | Channel handler function. It receives the payload at first parameter. The second parameter is a raw message which depends on the adapter. |
| `redis.startID` | `String` | Redis | Starting point when consumers fetch data from the consumer group. By default equals to `$`, i.e., consumers will only see new elements arriving in the stream. More info [here](https://redis.io/commands/XGROUP) |
| `redis.minIdleTime` | `Number` | Redis | Time (in milliseconds) after which pending messages are considered NACKed and should be claimed. Defaults to 1 hour. |
Expand Down Expand Up @@ -311,6 +314,130 @@ module.exports = {
};
```

## Context-based messages

In order to use Moleculer Context in handlers (transferring `ctx.meta` and tracing information) you should set the `context: true` option in channel definition object or in middleware options to enable it for all channel handlers.

**Example to enable context for all handlers**

```js
// moleculer.config.js
const ChannelsMiddleware = require("@moleculer/channels").Middleware;

module.exports = {
logger: true,

middlewares: [
ChannelsMiddleware({
adapter: "redis://localhost:6379",
// Enable context in all channel handlers
context: true
})
]
};
```

**Using `Context` in handlers**

```js
module.exports = {
name: "payments",

actions: {
/*...*/
},

channels: {
"default.options.topic": {
context: true, // Unless not enabled it globally
async handler(ctx/*, raw*/) {
// The `ctx` is a regular Moleculer Context
if (ctx.meta.loggedInUser) {
// The `ctx.params` contains the original payload of the message
await ctx.call("some.action", ctx.params);
}
}
}
}
};
```

**Send message with parent Context**

In this case the `ctx.meta` and other tracing information is transferred to the channel handler.

```js
module.exports = {
name: "payments",

actions: {
submitOrder: {
async handler(ctx) {
await broker.sendToChannel("order.created", {
id: 1234,
items: [/*...*/]
}, {
// Pass the `ctx` in options of `sendToChannel`
ctx
});

}
}
},
}
```

### Tracing
To enable tracing for context-based handlers, you should register `Tracing` middleware in broker options.

> The middleware works only with `context: true`.
**Register channel tracing middleware**
```js
//moleculer.config.js
const TracingMiddleware = require("@moleculer/channels").Tracing;

module.exports = {
logger: true,

middlewares: [
ChannelsMiddleware({
adapter: "redis://localhost:6379",
// Enable context in all channel handlers
context: true
}),
TracingMiddleware()
]
};
```

You can fine-tuning tracing tags and span name in `tracing` channel property similar to [actions](https://moleculer.services/docs/0.14/tracing.html#Customizing).

**Customize tags and span name**

```js
broker.createService({
name: "sub1",
channels: {
"my.topic": {
context: true,
tracing: {
spanName: ctx => `My custom span: ${ctx.params.id}`
tags: {
params: true,
meta: true
}
},
async handler(ctx, raw) {
// ...
}
}
}
});
```

> To disable tracing, set `tracing: false in channel definition.
## Adapters

### Adapter options
Expand Down Expand Up @@ -750,6 +877,6 @@ The project is available under the [MIT license](https://tldrlegal.com/license/m

## Contact

Copyright (c) 2022 MoleculerJS
Copyright (c) 2023 MoleculerJS

[![@MoleculerJS](https://img.shields.io/badge/github-moleculerjs-green.svg)](https://github.com/moleculerjs) [![@MoleculerJS](https://img.shields.io/badge/twitter-MoleculerJS-blue.svg)](https://twitter.com/MoleculerJS)
147 changes: 147 additions & 0 deletions examples/context/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"use strict";

const { ServiceBroker } = require("moleculer");
const ChannelsMiddleware = require("../..").Middleware;
const TracingMiddleware = require("../..").Tracing;

let c = 1;

// Create broker
const broker = new ServiceBroker({
logLevel: {
CHANNELS: "debug",
"**": "info"
},
tracing: {
enabled: true,
exporter: [{ type: "Console" }, { type: "Event" }]
},
middlewares: [
ChannelsMiddleware({
adapter: {
type: "Fake"
},
/*adapter: {
type: "Kafka",
options: { kafka: { brokers: ["localhost:9093"] } }
},*/
/*adapter: {
type: "AMQP"
},*/
/*adapter: {
type: "NATS"
},*/
/*
adapter: {
type: "Redis",
options: {
redis: "localhost:6379"
//serializer: "MsgPack"
}
},
*/
context: true
}),
TracingMiddleware()
],
replCommands: [
{
command: "publish",
alias: ["p"],
async action(broker, args) {
const payload = {
id: ++c,
name: "Jane Doe",
pid: process.pid
};

await broker.call(
"publisher.publish",
{ payload, headers: { a: "123" } },
{
meta: {
loggedInUser: {
id: 12345,
name: "John Doe",
roles: ["admin"],
status: true
}
}
}
);
}
}
]
});

broker.createService({
name: "publisher",
actions: {
async publish(ctx) {
await broker.sendToChannel("my.topic", ctx.params.payload, {
ctx,
headers: ctx.params.headers
});

await broker.Promise.delay(1000);
}
}
});

broker.createService({
name: "sub1",
channels: {
"my.topic": {
//context: true,
tracing: {
//spanName: ctx => `My custom span: ${ctx.params.id}`,
tags: {
params: true,
meta: true
}
},
async handler(ctx, raw) {
this.logger.info("Processing...", ctx);
this.logger.info("RAW:", raw);

await Promise.delay(100);

await ctx.call("test.demo");

this.logger.info("Processed!", ctx.params, ctx.meta);
}
}
}
});

broker.createService({
name: "test",
actions: {
async demo(ctx) {
this.logger.info("Demo service called");
}
}
});

broker.createService({
name: "event-handler",
events: {
"$tracing.spans": {
tracing: false,
handler(ctx) {
this.logger.info("Tracing event received");
ctx.params.forEach(span => this.logger.info(span));
}
}
}
});

broker
.start()
.then(async () => {
broker.repl();
})
.catch(err => {
broker.logger.error(err);
broker.stop();
});
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@

module.exports = {
Middleware: require("./src"),
Tracing: require("./src/tracing"),
Adapters: require("./src/adapters")
};
9 changes: 9 additions & 0 deletions src/adapters/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,15 @@ class AmqpAdapter extends BaseAdapter {
if (res === false) throw new MoleculerError("AMQP publish error. Write buffer is full.");
this.logger.debug(`Message was published at '${channelName}'`);
}

/**
* Parse the headers from incoming message to a POJO.
* @param {any} raw
* @returns {object}
*/
parseMessageHeaders(raw) {
return raw && raw.properties ? raw.properties.headers : null;
}
}

module.exports = AmqpAdapter;
9 changes: 9 additions & 0 deletions src/adapters/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ class BaseAdapter {
/* istanbul ignore next */
throw new Error("This method is not implemented.");
}

/**
* Parse the headers from incoming message to a POJO.
* @param {any} raw
* @returns {object}
*/
parseMessageHeaders(raw) {
return raw ? raw.headers : null;
}
}

module.exports = BaseAdapter;
17 changes: 17 additions & 0 deletions src/adapters/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,23 @@ class KafkaAdapter extends BaseAdapter {
}
this.logger.debug(`Message was published at '${channelName}'`, res);
}

/**
* Parse the headers from incoming message to a POJO.
* @param {any} raw
* @returns {object}
*/
parseMessageHeaders(raw) {
if (raw.headers) {
const res = {};
for (const [key, value] of Object.entries(raw.headers)) {
res[key] = value != null ? value.toString() : null;
}

return res;
}
return null;
}
}

module.exports = KafkaAdapter;
Loading

0 comments on commit edf022b

Please sign in to comment.