Skip to content

Commit ea22463

Browse files
committed
wip: add OpenTelemetry metrics instrumentation
1 parent d7c6544 commit ea22463

File tree

11 files changed

+1374
-5
lines changed

11 files changed

+1374
-5
lines changed

package-lock.json

Lines changed: 92 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/client/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,4 @@ export { GEO_REPLY_WITH, GeoReplyWith } from './lib/commands/GEOSEARCH_WITH';
3535
export { SetOptions, CLIENT_KILL_FILTERS, FAILOVER_MODES, CLUSTER_SLOT_STATES, COMMAND_LIST_FILTER_BY, REDIS_FLUSH_MODES } from './lib/commands'
3636

3737
export { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
38+
export { OpenTelemetry } from './lib/opentelemetry/opentelemetry';

packages/client/lib/client/index.ts

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import COMMANDS from '../commands';
2-
import RedisSocket, { RedisSocketOptions } from './socket';
2+
import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from './socket';
33
import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsProvider, UnableToObtainNewCredentialsError, Disposable } from '../authx';
44
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
55
import { EventEmitter } from 'node:events';
@@ -21,6 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
2323
import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager';
24+
import { OTelMetrics } from '../opentelemetry/metrics';
2425

2526
export interface RedisClientOptions<
2627
M extends RedisModules = RedisModules,
@@ -1064,21 +1065,47 @@ export default class RedisClient<
10641065
args: ReadonlyArray<RedisArgument>,
10651066
options?: CommandOptions
10661067
): Promise<T> {
1068+
const recordOperation = OTelMetrics.createRecordOperationDuration(args, {
1069+
host: (this._self.#options.socket as RedisTcpSocketOptions)?.host || "",
1070+
port:
1071+
(
1072+
this._self.#options.socket as RedisTcpSocketOptions
1073+
)?.port?.toString() || "",
1074+
db: this._self.#selectedDB.toString(),
1075+
});
1076+
10671077
if (!this._self.#socket.isOpen) {
1078+
recordOperation(new ClientClosedError());
10681079
return Promise.reject(new ClientClosedError());
1069-
} else if (!this._self.#socket.isReady && this._self.#options.disableOfflineQueue) {
1080+
} else if (
1081+
!this._self.#socket.isReady &&
1082+
this._self.#options.disableOfflineQueue
1083+
) {
1084+
recordOperation(new ClientOfflineError());
10701085
return Promise.reject(new ClientOfflineError());
10711086
}
10721087

10731088
// Merge global options with provided options
10741089
const opts = {
10751090
...this._self._commandOptions,
1076-
...options
1077-
}
1091+
...options,
1092+
};
10781093

10791094
const promise = this._self.#queue.addCommand<T>(args, opts);
1095+
OTelMetrics.recordPendingRequests(1);
1096+
1097+
const trackedPromise = promise.then((reply) => {
1098+
recordOperation();
1099+
return reply;
1100+
}).catch((err) => {
1101+
recordOperation(err);
1102+
throw err;
1103+
}).finally(() => {
1104+
OTelMetrics.recordPendingRequests(-1);
1105+
});
1106+
10801107
this._self.#scheduleWrite();
1081-
return promise;
1108+
return trackedPromise;
10821109
}
10831110

10841111
async SELECT(db: number): Promise<void> {

packages/client/lib/client/socket.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyErro
55
import { setTimeout } from 'node:timers/promises';
66
import { RedisArgument } from '../RESP/types';
77
import { dbgMaintenance } from './enterprise-maintenance-manager';
8+
import { OTelMetrics } from '../opentelemetry/metrics';
89

910
type NetOptions = {
1011
tls?: false;
@@ -215,6 +216,7 @@ export default class RedisSocket extends EventEmitter {
215216
let retries = 0;
216217
do {
217218
try {
219+
const started = performance.now();
218220
this.#socket = await this.#createSocket();
219221
this.emit('connect');
220222

@@ -228,6 +230,8 @@ export default class RedisSocket extends EventEmitter {
228230
this.#isReady = true;
229231
this.#socketEpoch++;
230232
this.emit('ready');
233+
OTelMetrics.recordConnectionCount(1);
234+
OTelMetrics.recordConnectionCreateTime(performance.now() - started);
231235
} catch (err) {
232236
const retryIn = this.#shouldReconnect(retries++, err as Error);
233237
if (typeof retryIn !== 'number') {
@@ -304,6 +308,10 @@ export default class RedisSocket extends EventEmitter {
304308
this.#isReady = false;
305309
this.emit('error', err);
306310

311+
if (wasReady) {
312+
OTelMetrics.recordConnectionCount(-1);
313+
}
314+
307315
if (!wasReady || !this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return;
308316

309317
this.emit('reconnecting');
@@ -362,6 +370,7 @@ export default class RedisSocket extends EventEmitter {
362370
this.#socket = undefined;
363371
}
364372

373+
OTelMetrics.recordConnectionCount(-1);
365374
this.emit('end');
366375
}
367376

0 commit comments

Comments
 (0)