Skip to content

Commit

Permalink
Add support for Zstandard compression
Browse files Browse the repository at this point in the history
  • Loading branch information
DonovanDMC committed Jul 26, 2024
1 parent 0a2070f commit 88c8ad4
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 76 deletions.
3 changes: 2 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@
],
"unicorn/prefer-event-target": "off",
"@typescript-eslint/no-unsafe-declaration-merging": "off",
"@typescript-eslint/no-unsafe-enum-comparison": "off"
"@typescript-eslint/no-unsafe-enum-comparison": "off",
"unicorn/prefer-ternary": "off"
}
}
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ The documentation under `dev` is always for the latest commit. If something isn'
<hr>

### Optional Dependencies
* `pako` - Compression (gateway)
* `zlib-sync` - Compression (gateway, faster than pako)
All compression options are mutually exclusive.
* `pako` - zlib Compression (gateway)
* `zlib-sync` - zlib Compression (gateway, faster than pako)
* `fzstd` - Zstandard Compression (gateway)
* `erlpack` - Encoding (gateway, alternative to JSON)

## Links
Expand Down
117 changes: 49 additions & 68 deletions lib/gateway/Shard.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/** @module Shard */
import type ShardManager from "./ShardManager";
import type Compression from "./compression/base";
import type Client from "../Client";
import TypedEmitter from "../util/TypedEmitter";
import Bucket from "../rest/Bucket";
Expand All @@ -23,43 +24,26 @@ import type { ShardEvents } from "../types/events";
import GatewayError, { DependencyError } from "../util/Errors";
import ClientApplication from "../structures/ClientApplication";
import WebSocket, { type Data } from "ws";
import type Pako from "pako";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import type { Inflate } from "zlib-sync";
import { randomBytes } from "node:crypto";
import { inspect } from "node:util";
import assert from "node:assert";

/* eslint-disable @typescript-eslint/ban-ts-comment, @typescript-eslint/no-redundant-type-constituents, @typescript-eslint/no-var-requires, @typescript-eslint/no-unsafe-assignment, unicorn/prefer-module, @typescript-eslint/no-unsafe-member-access */
// @ts-ignore
let Erlpack: typeof import("erlpack") | undefined;
try {
Erlpack = require("erlpack");
} catch {}
// @ts-ignore
let ZlibSync: typeof import("pako") | typeof import("zlib-sync") | undefined, zlibConstants: typeof import("pako").constants | typeof import("zlib-sync") | undefined;
try {
ZlibSync = require("zlib-sync");
zlibConstants = require("zlib-sync");
} catch {
try {
ZlibSync = require("pako");
zlibConstants = require("pako").constants;
} catch {}
}
/* eslint-enable @typescript-eslint/ban-ts-comment, @typescript-eslint/no-redundant-type-constituents, @typescript-eslint/no-var-requires, @typescript-eslint/no-unsafe-assignment, unicorn/prefer-module */

/** Represents a gateway connection to Discord. See {@link ShardEvents | Shard Events} for a list of events. */
export default class Shard extends TypedEmitter<ShardEvents> {
private _compressor: Compression | undefined;
private _connectTimeout: NodeJS.Timeout | null;
private _getAllUsersCount: Record<string, true>;
private _getAllUsersQueue: Array<string>;
private _guildCreateTimeout: NodeJS.Timeout | null;
private _heartbeatInterval: NodeJS.Timeout | null;
private _requestMembersPromise: Record<string, { members: Array<Member>; received: number; timeout: NodeJS.Timeout; reject(reason?: unknown): void; resolve(value: unknown): void; }>;
// eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents
private _sharedZLib!: Pako.Inflate | Inflate;
client!: Client;
connectAttempts: number;
connecting: boolean;
Expand Down Expand Up @@ -156,7 +140,8 @@ export default class Shard extends TypedEmitter<ShardEvents> {
}
this.resumeURL = `${url}?v=${GATEWAY_VERSION}&encoding=${Erlpack ? "etf" : "json"}`;
if (this.client.shards.options.compress) {
this.resumeURL += "&compress=zlib-stream";
const type = this.client.shards.options.compress === "zstd-stream" ? "zstd-stream" : "zlib-stream";
this.resumeURL += `&compress=${type}`;
}
this.sessionID = data.session_id;

Expand Down Expand Up @@ -221,12 +206,34 @@ export default class Shard extends TypedEmitter<ShardEvents> {
}
this.status = "connecting";
if (this.client.shards.options.compress) {
if (!ZlibSync) {
throw new DependencyError("Cannot use compression without pako or zlib-sync.");
const type = this.client.shards.options.compress;
/* eslint-disable @typescript-eslint/no-var-requires, unicorn/prefer-module */
if (type === "zstd-stream") {
if (!this.client.util._isModuleInstalled("fzstd")) {
throw new DependencyError("Cannot use zstd based compression without fzstd.");
}
this.client.emit("debug", "Initializing zstd-based compression with fzstd.");
const ZstdCompression = (require(`${__dirname}/compression/zstd`) as { default: new(shard: Shard) => Compression; }).default;
this._compressor = new ZstdCompression(this);
} else if (type === "zlib-stream") {
const hasZlibSync = this.client.util._isModuleInstalled("zlib-sync");
const hasPako = this.client.util._isModuleInstalled("pako");

if (hasZlibSync) {
this.client.emit("debug", "Initializing zlib-based compression with zlib-sync.");
const ZlibSyncCompression = (require(`${__dirname}/compression/zlib-sync`) as { default: new(shard: Shard) => Compression; }).default;
this._compressor = new ZlibSyncCompression(this);
} else if (hasPako) {
this.client.emit("debug", "Initializing zlib-based compression with pako.");
const PakoCompression = (require(`${__dirname}/compression/pako`) as { default: new(shard: Shard) => Compression; }).default;
this._compressor = new PakoCompression(this);
} else {
throw new DependencyError("Cannot use zlib based compression without pako or zlib-sync.");
}
} else {
throw new TypeError(`Invalid compression type "${type as string}".`);
}
this.client.emit("debug", "Initializing zlib-sync-based compression.");
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call
this._sharedZLib = new ZlibSync.Inflate({ chunkSize: 128 * 1024 });
/* eslint-enable @typescript-eslint/no-var-requires, unicorn/prefer-module */
}
if (!this.client.shards.options.override.gatewayURLIsResumeURL && this.sessionID) {
if (this.resumeURL === null) {
Expand Down Expand Up @@ -443,7 +450,7 @@ export default class Shard extends TypedEmitter<ShardEvents> {
}

/* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call, @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-argument */
private onWSMessage(data: Data): void {
private async onWSMessage(data: Data): Promise<void> {
if (typeof data === "string") {
data = Buffer.from(data);
}
Expand All @@ -457,54 +464,28 @@ export default class Shard extends TypedEmitter<ShardEvents> {
data = Buffer.concat(data);
}

const is = <T>(input: unknown): input is T => true;
assert(is<Buffer>(data));
const buf = data as Buffer;
if (this.client.shards.options.compress) {
if (data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) {
// store the current pointer for slicing buffers after pushing.
const currentPointer: number | undefined = this._sharedZLib.strm?.next_out;
this._sharedZLib.push(data, zlibConstants!.Z_SYNC_FLUSH);
if (this._sharedZLib.err) {
this.client.emit("error", new GatewayError(`zlib error ${this._sharedZLib.err}: ${this._sharedZLib.msg ?? ""}`, 0));
return;
}

if (currentPointer === undefined) {
// decompression support by zlib-sync
data = Buffer.from(this._sharedZLib.result ?? "");
} else if (this._sharedZLib.chunks.length === 0) {
// decompression support by pako. The current buffer hasn't been flushed
data = Buffer.from(this._sharedZLib.strm!.output.slice(currentPointer));
} else {
// decompression support by pako. Buffers have been flushed once or more times.
data = Buffer.concat([
this._sharedZLib.chunks[0].slice(currentPointer),
...this._sharedZLib.chunks.slice(1),
this._sharedZLib.strm.output
]);
this._sharedZLib.chunks = [];
}

assert(is<Buffer>(data));
let result = await this._compressor!.decompress(buf);
if (result === null) {
return;
}

if (Erlpack) {
return this.onPacket(Erlpack.unpack(data as Buffer) as AnyReceivePacket);
} else {
// After the valid data, all the remaining octets are filled with zero, so remove them.
let last = data.length - 1;
if (data[last] === 0) {
while (data[last - 1] === 0 && last > 0) last--;
data = data.subarray(0, last);
}
return this.onPacket(JSON.parse(String(data)) as AnyReceivePacket);
}
if (Erlpack) {
return this.onPacket(Erlpack.unpack(result) as AnyReceivePacket);
} else {
this._sharedZLib.push(data, false);
// After the valid data, all the remaining octets are filled with zero, so remove them.
let last = result.length - 1;
if (result[last] === 0) {
while (result[last - 1] === 0 && last > 0) last--;
result = result.subarray(0, last);
}
return this.onPacket(JSON.parse(String(result)) as AnyReceivePacket);
}
} else if (Erlpack) {
return this.onPacket(Erlpack.unpack(data) as AnyReceivePacket);
return this.onPacket(Erlpack.unpack(buf) as AnyReceivePacket);
} else {
return this.onPacket(JSON.parse(data.toString()) as AnyReceivePacket);
return this.onPacket(JSON.parse(String(buf)) as AnyReceivePacket);
}
} catch (err) {
this.client.emit("error", err as Error, this.id);
Expand Down Expand Up @@ -776,7 +757,7 @@ export default class Shard extends TypedEmitter<ShardEvents> {
const func = (): void => {
if (++i >= waitFor && this.ws && this.ws.readyState === WebSocket.OPEN) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call
const d: string = Erlpack ? Erlpack.pack({ op, d: data }) : JSON.stringify({ op, d: data });
const d: string | Buffer = Erlpack ? Erlpack.pack({ op, d: data }) : JSON.stringify({ op, d: data });
this.ws.send(d);
if (typeof data === "object" && data && "token" in data) {
(data as { token: string; }).token = "[REMOVED]";
Expand Down
11 changes: 9 additions & 2 deletions lib/gateway/ShardManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ try {
/* eslint-enable @typescript-eslint/ban-ts-comment, @typescript-eslint/no-redundant-type-constituents, @typescript-eslint/no-var-requires, @typescript-eslint/no-unsafe-assignment, unicorn/prefer-module */


let __compressionTrueDeprecationWarning = 0;
/** A manager for all the client's shards. */
export default class ShardManager extends Collection<number, Shard> {
private _buckets: Record<number, number>;
Expand All @@ -39,7 +40,7 @@ export default class ShardManager extends Collection<number, Shard> {
this._connectTimeout = null;
this.options = {
autoReconnect: options.autoReconnect ?? true,
compress: options.compress ?? false,
compress: options.compress === true ? (__compressionTrueDeprecationWarning++, "zlib-stream") : options.compress ?? false,
connectionProperties: {
browser: options.connectionProperties?.browser ?? "Oceanic",
device: options.connectionProperties?.device ?? "Oceanic",
Expand Down Expand Up @@ -77,6 +78,11 @@ export default class ShardManager extends Collection<number, Shard> {
shardIDs: options.shardIDs ?? [],
ws: options.ws ?? {}
};
if (__compressionTrueDeprecationWarning === 1) {
process.emitWarning("Using `compress: true` is deprecated and will be removed in a future version. Please use `compress: \"zlib-stream\"` instead.", {
code: "OCEANIC_GATEWAY_COMPRESSION_TRUE_DEPRECATION"
});
}
this.options.override.appendQuery ??= (this.options.override.getBot === undefined && this.options.override.url === undefined);
this.options.override.gatewayURLIsResumeURL ??= (this.options.override.getBot !== undefined || this.options.override.url !== undefined);
this.options.override.timeBetweenShardConnects ??= 5000;
Expand Down Expand Up @@ -214,7 +220,8 @@ export default class ShardManager extends Collection<number, Shard> {
if (url && this.options.override.appendQuery) {
url += `?v=${GATEWAY_VERSION}&encoding=${Erlpack ? "etf" : "json"}`;
if (this.options.compress) {
url += "&compress=zlib-stream";
const type = this.options.compress === "zstd-stream" ? "zstd-stream" : "zlib-stream";
url += `&compress=${type}`;
}
this._gatewayURL = url;
}
Expand Down
14 changes: 14 additions & 0 deletions lib/gateway/compression/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type Shard from "../Shard";

export default abstract class Compression {
shard!: Shard;
abstract decompress(data: Buffer): Promise<Buffer | null>;
constructor(shard: Shard) {
Object.defineProperty(this, "shard", {
value: shard,
configurable: false,
enumerable: false,
writable: false
});
}
}
50 changes: 50 additions & 0 deletions lib/gateway/compression/pako.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import Compression from "./base";
import type Shard from "../Shard";
import GatewayError from "../../util/Errors";
import { Inflate, constants } from "pako";

interface PakoExtra {
chunks: Array<Buffer>;
strm: {
next_out: number;
output: Buffer;
};
}

export default class PakoCompression extends Compression {
_sharedZLib: Inflate & PakoExtra;
constructor(shard: Shard) {
super(shard);
this._sharedZLib = new Inflate({ chunkSize: 128 * 1024 }) as Inflate & PakoExtra;
}

async decompress(data: Buffer): Promise<Buffer | null> {
if (data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) {
// store the current pointer for slicing buffers after pushing.
const currentPointer: number | undefined = this._sharedZLib.strm?.next_out;
this._sharedZLib.push(data, constants.Z_SYNC_FLUSH);
if (this._sharedZLib.err) {
this.shard.client.emit("error", new GatewayError(`zlib error ${this._sharedZLib.err}: ${this._sharedZLib.msg ?? ""}`, 0));
return null;
}

if (this._sharedZLib.chunks.length === 0) {
// The current buffer hasn't been flushed
data = Buffer.from(this._sharedZLib.strm!.output.slice(currentPointer));
} else {
// Buffers have been flushed one or more times
data = Buffer.concat([
this._sharedZLib.chunks[0].slice(currentPointer),
...this._sharedZLib.chunks.slice(1),
this._sharedZLib.strm.output
]);
this._sharedZLib.chunks = [];
}

return data;
} else {
this._sharedZLib.push(data, false);
return null;
}
}
}
30 changes: 30 additions & 0 deletions lib/gateway/compression/zlib-sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import Compression from "./base";
import type Shard from "../Shard";
import GatewayError from "../../util/Errors";
import ZlibSync from "zlib-sync";

export default class ZlibSyncCompression extends Compression {
_sharedZLib: ZlibSync.Inflate;
constructor(shard: Shard) {
super(shard);
this._sharedZLib = new ZlibSync.Inflate({ chunkSize: 128 * 1024 });
}

async decompress(data: Buffer): Promise<Buffer | null> {
if (data.length >= 4 && data.readUInt32BE(data.length - 4) === 0xFFFF) {
// store the current pointer for slicing buffers after pushing.
this._sharedZLib.push(data, ZlibSync.Z_SYNC_FLUSH);
if (this._sharedZLib.err) {
this.shard.client.emit("error", new GatewayError(`zlib error ${this._sharedZLib.err}: ${this._sharedZLib.msg ?? ""}`, 0));
return null;
}

data = Buffer.from(this._sharedZLib.result ?? "");

return data;
} else {
this._sharedZLib.push(data, false);
return null;
}
}
}
27 changes: 27 additions & 0 deletions lib/gateway/compression/zstd.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import Compression from "./base";
import type Shard from "../Shard";
import fzstd from "fzstd";

export default class ZstdCompression extends Compression {
_resolvePromise?: (data: Uint8Array) => void;
_resultPromise?: Promise<Uint8Array>;
stream: fzstd.Decompress;
constructor(shard: Shard) {
super(shard);
this.stream = new fzstd.Decompress(data => {
this._resolvePromise!(data);
});
}

async decompress(data: Buffer): Promise<Buffer> {
if (this._resultPromise) {
await this._resultPromise;
}
this._resultPromise = new Promise(resolve => {
this._resolvePromise = resolve;
});
this.stream.push(data);
const result = await this._resultPromise;
return Buffer.from(result);
}
}
Loading

0 comments on commit 88c8ad4

Please sign in to comment.