Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream management requesting ACKs #1005

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

const xml = require("@xmpp/xml");
const time = require("@xmpp/time");

// https://xmpp.org/extensions/xep-0198.html

Expand Down Expand Up @@ -46,24 +47,34 @@ module.exports = function streamManagement({
middleware,
}) {
let address = null;
let timeoutTimeout = null;

const sm = {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
timeout: 60000,
};

entity.on("online", (jid) => {
address = jid;
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty during online";
}
sm.outbound = 0;
sm.inbound = 0;
});

entity.on("offline", () => {
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
Expand All @@ -72,36 +83,93 @@ module.exports = function streamManagement({

middleware.use((context, next) => {
const { stanza } = context;
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
sm.outbound = stanza.attrs.h;
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
}

return next();
});

let requestAckTimeout = null;
function requestAck() {
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (sm.timeout) {
timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, 300000);
}

middleware.filter((context, next) => {
const { stanza } = context;
if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) {
let qStanza = stanza;
if (
qStanza.name === "message" &&
!qStanza.getChild("delay", "urn:xmpp:delay")
) {
qStanza = xml.clone(stanza);
qStanza.c("delay", {
xmlns: "urn:xmpp:delay",
from: entity.jid.toString(),
stamp: time.datetime(),
});
}
sm.outbound_q.push(qStanza);
// Debounce requests so we send only one after a big run of stanza together
if (requestAckTimeout) clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, 100);
}
return next();
});

// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session

streamFeatures.use("sm", NS, async (context, next) => {
// Resuming
if (sm.id) {
try {
await resume(entity, sm.inbound, sm.id);
let resumed = await resume(entity, sm.inbound, sm.id);
sm.enabled = true;
entity.jid = address;
if (address) entity.jid = address;
entity.status = "online";
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
let q = sm.outbound_q;
sm.outbound_q = [];
for (const item of q) {
entity.send(item); // This will trigger the middleware and re-add to the queue
}
entity.emit("stream-management/resumed");
return true;
// If resumption fails, continue with session establishment
// eslint-disable-next-line no-unused-vars
} catch {
sm.id = "";
sm.enabled = false;
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
}
}
Expand All @@ -114,6 +182,9 @@ module.exports = function streamManagement({
const promiseEnable = enable(entity, sm.allowResume, sm.preferredMaximum);

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty after enable";
}
sm.outbound = 0;

try {
Expand Down
3 changes: 2 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"management"
],
"dependencies": {
"@xmpp/xml": "^0.13.0"
"@xmpp/xml": "^0.13.0",
"@xmpp/time": "^0.13.0"
},
"engines": {
"node": ">= 14"
Expand Down
2 changes: 2 additions & 0 deletions packages/xml/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Element = require("ltx/lib/Element");
const createElement = require("ltx/lib/createElement");
const clone = require("ltx/lib/clone");
const Parser = require("./lib/Parser");
const {
escapeXML,
Expand All @@ -19,6 +20,7 @@ module.exports = xml;

Object.assign(module.exports, {
Element,
clone,
createElement,
Parser,
escapeXML,
Expand Down
Loading