From 855270b7260d02f9d032171b1ea2ad8788ff5654 Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Sat, 19 Oct 2019 01:37:49 +0900 Subject: [PATCH] fix: #23 Added mux system when sending commands (#24) --- modules-lock.json | 3 +- modules.json | 1 + pipeline.ts | 71 ++++++++++++++++-------- pipeline_test.ts | 30 +++++++++- redis.ts | 40 ++++++++++++- redis_test.ts | 51 +++++++++-------- vendor/https/deno.land/std/util/async.ts | 1 + 7 files changed, 148 insertions(+), 49 deletions(-) create mode 100644 vendor/https/deno.land/std/util/async.ts diff --git a/modules-lock.json b/modules-lock.json index 46122c13..bc594150 100644 --- a/modules-lock.json +++ b/modules-lock.json @@ -2,10 +2,11 @@ "https://deno.land/std": { "version": "@v0.20.0", "modules": [ + "/util/async.ts", "/testing/mod.ts", "/testing/asserts.ts", "/io/bufio.ts", "/fmt/colors.ts" ] } -} \ No newline at end of file +} diff --git a/modules.json b/modules.json index de8c3562..bc594150 100644 --- a/modules.json +++ b/modules.json @@ -2,6 +2,7 @@ "https://deno.land/std": { "version": "@v0.20.0", "modules": [ + "/util/async.ts", "/testing/mod.ts", "/testing/asserts.ts", "/io/bufio.ts", diff --git a/pipeline.ts b/pipeline.ts index 9a32fdcf..514eed7c 100644 --- a/pipeline.ts +++ b/pipeline.ts @@ -1,7 +1,8 @@ import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts"; import { createRequest, readReply, RedisRawReply } from "./io.ts"; import { ErrorReplyError } from "./errors.ts"; -import { create, Redis } from "./redis.ts"; +import { create, muxExecutor, Redis } from "./redis.ts"; +import { deferred, Deferred } from "./vendor/https/deno.land/std/util/async.ts"; const encoder = new TextEncoder(); export type RedisPipeline = { @@ -14,36 +15,62 @@ export function createRedisPipeline( reader: BufReader, opts?: { tx: true } ): RedisPipeline { - let queue: string[] = []; + let commands: string[] = []; + let queue: { + commands: string[]; + d: Deferred; + }[] = []; + + function dequeue() { + const [e] = queue; + if (!e) return; + exec(e.commands) + .then(e.d.resolve) + .catch(e.d.reject) + .finally(() => { + queue.shift(); + dequeue(); + }); + } + + async function exec(cmds: string[]) { + const msg = cmds.join(""); + await writer.write(encoder.encode(msg)); + await writer.flush(); + const ret: RedisRawReply[] = []; + for (let i = 0; i < cmds.length; i++) { + try { + const rep = await readReply(reader); + ret.push(rep); + } catch (e) { + if (e.constructor === ErrorReplyError) { + ret.push(e); + } else { + throw e; + } + } + } + return ret; + } + const executor = { enqueue(command: string, ...args) { const msg = createRequest(command, ...args); - queue.push(msg); + commands.push(msg); }, async flush() { // wrap pipelined commands with MULTI/EXEC if (opts && opts.tx) { - queue.splice(0, 0, createRequest("MULTI")); - queue.push(createRequest("EXEC")); + commands.splice(0, 0, createRequest("MULTI")); + commands.push(createRequest("EXEC")); } - const msg = queue.join(""); - await writer.write(encoder.encode(msg)); - await writer.flush(); - const ret: RedisRawReply[] = []; - for (let i = 0; i < queue.length; i++) { - try { - const rep = await readReply(reader); - ret.push(rep); - } catch (e) { - if (e.constructor === ErrorReplyError) { - ret.push(e); - } else { - throw e; - } - } + const d = deferred(); + queue.push({ commands, d }); + if (queue.length === 1) { + dequeue(); } - queue = []; - return ret; + commands = []; + return d; }, async execRawReply( command: string, diff --git a/pipeline_test.ts b/pipeline_test.ts index 15bacaa4..4fe50c2e 100644 --- a/pipeline_test.ts +++ b/pipeline_test.ts @@ -1,4 +1,4 @@ -import { test } from "./vendor/https/deno.land/std/testing/mod.ts"; +import { runIfMain, test } from "./vendor/https/deno.land/std/testing/mod.ts"; import { assertEquals } from "./vendor/https/deno.land/std/testing/asserts.ts"; import { connect } from "./redis.ts"; @@ -72,3 +72,31 @@ test(async function testTx() { parseInt(rep3[0][1] as string) + 3 ); }); + +test("pipeline in concurrent", async () => { + const redis = await connect(addr); + const tx = redis.pipeline(); + let promises: Promise[] = []; + await redis.del("a", "b", "c"); + for (const key of ["a", "b", "c"]) { + promises.push(tx.set(key, key)); + } + promises.push(tx.flush()); + for (const key of ["a", "b", "c"]) { + promises.push(tx.get(key)); + } + promises.push(tx.flush()); + const res = await Promise.all(promises); + assertEquals(res, [ + "OK", //set(a) + "OK", //set(b) + "OK", //set(c) + [["status", "OK"], ["status", "OK"], ["status", "OK"]], //flush() + "OK", // get(a) + "OK", // get(b) + "OK", //get(c) + [["bulk", "a"], ["bulk", "b"], ["bulk", "c"]] //flush() + ]); +}); + +runIfMain(import.meta); diff --git a/redis.ts b/redis.ts index be347a76..5b41d4b0 100644 --- a/redis.ts +++ b/redis.ts @@ -9,6 +9,7 @@ import { ConnectionClosedError } from "./errors.ts"; import { psubscribe, RedisSubscription, subscribe } from "./pubsub.ts"; import { RedisRawReply, sendCommand } from "./io.ts"; import { createRedisPipeline, RedisPipeline } from "./pipeline.ts"; +import { deferred, Deferred } from "./vendor/https/deno.land/std/util/async.ts"; export type Redis = { // Connection @@ -443,11 +444,46 @@ export interface CommandExecutor { ): Promise; } -class RedisImpl implements Redis, CommandExecutor { +export function muxExecutor(r: BufReader, w: BufWriter): CommandExecutor { + let queue: { + command: string; + args: (string | number)[]; + d: Deferred; + }[] = []; + + function dequeue(): void { + const [e] = queue; + if (!e) return; + sendCommand(w, r, e.command, ...e.args) + .then(v => e.d.resolve(v)) + .catch(err => e.d.reject(err)) + .finally(() => { + queue.shift(); + dequeue(); + }); + } + + return { + async execRawReply( + command: string, + ...args: (string | number)[] + ): Promise { + const d = deferred(); + queue.push({ command, args, d }); + if (queue.length === 1) { + dequeue(); + } + return d; + } + }; +} + +class RedisImpl implements Redis { _isClosed = false; get isClosed() { return this._isClosed; } + private executor: CommandExecutor; constructor( private closer: Closer, @@ -455,7 +491,7 @@ class RedisImpl implements Redis, CommandExecutor { private reader: BufReader, executor?: CommandExecutor ) { - this.executor = executor || this; + this.executor = executor || muxExecutor(reader, writer); } async execRawReply( diff --git a/redis_test.ts b/redis_test.ts index f1518fb7..2605c9c9 100644 --- a/redis_test.ts +++ b/redis_test.ts @@ -1,5 +1,9 @@ -import { connect } from "./redis.ts"; -import { runIfMain, test } from "./vendor/https/deno.land/std/testing/mod.ts"; +import { connect, Redis } from "./redis.ts"; +import { + runIfMain, + setFilter, + test +} from "./vendor/https/deno.land/std/testing/mod.ts"; import { assertEquals, assertThrowsAsync @@ -10,8 +14,9 @@ const addr = { port: 6379 }; +let redis:Redis; test(async function beforeAll() { - const redis = await connect(addr); + redis = await connect(addr); await redis.del( "incr", "incrby", @@ -25,87 +30,87 @@ test(async function beforeAll() { }); test(async function testExists() { - const redis = await connect(addr); const none = await redis.exists("none", "none2"); assertEquals(none, 0); await redis.set("exists", "aaa"); const exists = await redis.exists("exists", "none"); assertEquals(exists, 1); - redis.close(); }); test(async function testGetWhenNil() { - const redis = await connect(addr); const hoge = await redis.get("none"); assertEquals(hoge, void 0); - redis.close(); }); + test(async function testSet() { - const redis = await connect(addr); const s = await redis.set("get", "fuga你好こんにちは"); assertEquals(s, "OK"); const fuga = await redis.get("get"); assertEquals(fuga, "fuga你好こんにちは"); - redis.close(); }); + test(async function testGetSet() { - const redis = await connect(addr); await redis.set("getset", "val"); const v = await redis.getset("getset", "lav"); assertEquals(v, "val"); assertEquals(await redis.get("getset"), "lav"); - redis.close(); }); + test(async function testMget() { - const redis = await connect(addr); await redis.set("mget1", "val1"); await redis.set("mget2", "val2"); await redis.set("mget3", "val3"); const v = await redis.mget("mget1", "mget2", "mget3"); assertEquals(v, ["val1", "val2", "val3"]); - redis.close(); }); + test(async function testDel() { - const redis = await connect(addr); let s = await redis.set("del1", "fuga"); assertEquals(s, "OK"); s = await redis.set("del2", "fugaaa"); assertEquals(s, "OK"); const deleted = await redis.del("del1", "del2"); assertEquals(deleted, 2); - redis.close(); }); test(async function testIncr() { - const redis = await connect(addr); const rep = await redis.incr("incr"); assertEquals(rep, 1); assertEquals(await redis.get("incr"), "1"); - redis.close(); }); test(async function testIncrby() { - const redis = await connect(addr); const rep = await redis.incrby("incrby", 101); assertEquals(rep, 101); assertEquals(await redis.get("incrby"), "101"); - redis.close(); }); test(async function testDecr() { - const redis = await connect(addr); const rep = await redis.decr("decr"); assertEquals(rep, -1); assertEquals(await redis.get("decr"), "-1"); - redis.close(); }); test(async function testDecrby() { - const redis = await connect(addr); const rep = await redis.decrby("decryby", 101); assertEquals(rep, -101); assertEquals(await redis.get("decryby"), "-101"); - redis.close(); +}); + +test(async function testConcurrent() { + let promises: Promise[] = []; + for (const key of ["a", "b", "c"]) { + promises.push(redis.set(key, key)); + } + await Promise.all(promises); + promises = []; + for (const key of ["a", "b", "c"]) { + promises.push(redis.get(key)); + } + const [a, b, c] = await Promise.all(promises); + assertEquals(a, "a"); + assertEquals(b, "b"); + assertEquals(c, "c"); }); [Infinity, NaN, "", "port"].forEach(v => { diff --git a/vendor/https/deno.land/std/util/async.ts b/vendor/https/deno.land/std/util/async.ts new file mode 100644 index 00000000..27e039f8 --- /dev/null +++ b/vendor/https/deno.land/std/util/async.ts @@ -0,0 +1 @@ +export * from "https://deno.land/std@v0.20.0/util/async.ts";