Skip to content

Commit

Permalink
fix: Make subscribe and psubscribe methods generic (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
uki00a authored Feb 21, 2021
1 parent 79dbece commit bb046fb
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ await mainClient.clientTracking({
mode: "ON",
redirect: cacheClientID,
});
const sub = await cacheClient.subscribe("__redis__:invalidate");
const sub = await cacheClient.subscribe<string[]>("__redis__:invalidate");

(async () => {
for await (const { channel, message } of sub.receive()) {
Expand Down
8 changes: 6 additions & 2 deletions command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,16 @@ export interface RedisCommands {
pfmerge(destkey: string, ...sourcekeys: string[]): Promise<Status>;

// PubSub
psubscribe(...patterns: string[]): Promise<RedisSubscription>;
psubscribe<TMessage extends string | string[] = string>(
...patterns: string[]
): Promise<RedisSubscription<TMessage>>;
pubsubChannels(pattern?: string): Promise<BulkString[]>;
pubsubNumsub(...channels: string[]): Promise<(BulkString | Integer)[]>;
pubsubNumpat(): Promise<Integer>;
publish(channel: string, message: string): Promise<Integer>;
subscribe(...channels: string[]): Promise<RedisSubscription>;
subscribe<TMessage extends string | string[] = string>(
...channels: string[]
): Promise<RedisSubscription<TMessage>>;

// Set
sadd(key: string, ...members: string[]): Promise<Integer>;
Expand Down
48 changes: 34 additions & 14 deletions pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ import type { Connection } from "./connection.ts";
import { InvalidStateError } from "./errors.ts";
import { readArrayReply, sendCommand } from "./io.ts";

export interface RedisSubscription {
type DefaultMessageType = string;
type ValidMessageType = string | string[];

export interface RedisSubscription<
TMessage extends ValidMessageType = DefaultMessageType,
> {
readonly isClosed: boolean;
receive(): AsyncIterableIterator<RedisPubSubMessage>;
receive(): AsyncIterableIterator<RedisPubSubMessage<TMessage>>;
psubscribe(...patterns: string[]): Promise<void>;
subscribe(...channels: string[]): Promise<void>;
punsubscribe(...patterns: string[]): Promise<void>;
unsubscribe(...channels: string[]): Promise<void>;
close(): Promise<void>;
}

export interface RedisPubSubMessage {
export interface RedisPubSubMessage<TMessage = DefaultMessageType> {
pattern?: string;
channel: string;
message: string;
message: TMessage;
}

class RedisSubscriptionImpl implements RedisSubscription {
class RedisSubscriptionImpl<
TMessage extends ValidMessageType = DefaultMessageType,
> implements RedisSubscription<TMessage> {
get isConnected(): boolean {
return this.connection.isConnected;
}
Expand Down Expand Up @@ -83,13 +90,22 @@ class RedisSubscriptionImpl implements RedisSubscription {
}
}

async *receive(): AsyncIterableIterator<RedisPubSubMessage> {
async *receive(): AsyncIterableIterator<RedisPubSubMessage<TMessage>> {
let forceReconnect = false;
while (this.isConnected) {
try {
let rep: string[];
let rep: [string, string, TMessage] | [
string,
string,
string,
TMessage,
];
try {
rep = (await readArrayReply(this.connection.reader)) as string[];
rep = (await readArrayReply(this.connection.reader)) as [
string,
string,
TMessage,
] | [string, string, string, TMessage];
} catch (err) {
if (err instanceof Deno.errors.BadResource) {
// Connection already closed.
Expand Down Expand Up @@ -145,20 +161,24 @@ class RedisSubscriptionImpl implements RedisSubscription {
}
}

export async function subscribe(
export async function subscribe<
TMessage extends ValidMessageType = DefaultMessageType,
>(
connection: Connection,
...channels: string[]
): Promise<RedisSubscription> {
const sub = new RedisSubscriptionImpl(connection);
): Promise<RedisSubscription<TMessage>> {
const sub = new RedisSubscriptionImpl<TMessage>(connection);
await sub.subscribe(...channels);
return sub;
}

export async function psubscribe(
export async function psubscribe<
TMessage extends ValidMessageType = DefaultMessageType,
>(
connection: Connection,
...patterns: string[]
): Promise<RedisSubscription> {
const sub = new RedisSubscriptionImpl(connection);
): Promise<RedisSubscription<TMessage>> {
const sub = new RedisSubscriptionImpl<TMessage>(connection);
await sub.psubscribe(...patterns);
return sub;
}
12 changes: 8 additions & 4 deletions redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1054,12 +1054,16 @@ export class RedisImpl implements Redis {
return this.execIntegerReply("PUBLISH", channel, message);
}

subscribe(...channels: string[]) {
return subscribe(this.connection, ...channels);
subscribe<TMessage extends string | string[] = string>(
...channels: string[]
) {
return subscribe<TMessage>(this.connection, ...channels);
}

psubscribe(...patterns: string[]) {
return psubscribe(this.connection, ...patterns);
psubscribe<TMessage extends string | string[] = string>(
...patterns: string[]
) {
return psubscribe<TMessage>(this.connection, ...patterns);
}

pubsubChannels(pattern?: string) {
Expand Down

0 comments on commit bb046fb

Please sign in to comment.