diff --git a/lib/protocol/Connection.js b/lib/protocol/Connection.js index 38643d2..70453fd 100644 --- a/lib/protocol/Connection.js +++ b/lib/protocol/Connection.js @@ -46,10 +46,8 @@ var MAX_AVAILABLE_SIZE = MAX_PACKET_SIZE - module.exports = Connection; util.inherits(Connection, EventEmitter); - function Connection(settings) { EventEmitter.call(this); - var self = this; // public this.connectOptions = new part.ConnectOptions(); @@ -57,11 +55,11 @@ function Connection(settings) { this.protocolVersion = undefined; // private this._clientInfo = new ClientInfo(); - for(var key in settings) { - if(key.toUpperCase().startsWith("SESSIONVARIABLE:")) { + for (var key in settings) { + if (key.toUpperCase().startsWith("SESSIONVARIABLE:")) { var sv_key = key.substring(key.indexOf(":") + 1); var sv_value = settings[key]; - if(sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { + if (sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { this._clientInfo.setProperty(sv_key, sv_value); } delete settings[key]; @@ -69,7 +67,7 @@ function Connection(settings) { } this._settings = settings || {}; this._socket = undefined; - this._queue = new util.Queue().pause(); + this._socketWriteQueue = []; this._state = new ConnectionState(); this._statementContext = undefined; this._transaction = new Transaction(); @@ -149,7 +147,7 @@ Object.defineProperties(Connection.prototype, { case MessageType.DISCONNECT: return 'disconnecting'; default: - // do nothing + // do nothing } if (this._state.sessionId === -1) { return 'disconnected'; @@ -229,28 +227,41 @@ Connection.prototype._addListeners = function _addListeners(socket) { // register listerners on socket function ondata(chunk) { + if (!packet.data) { + self._socketWriteQueue.shift() + self._socketWriteNext() + } + packet.push(chunk); if (packet.isReady()) { + const packetCount = packet.header.packetCount; if (self._state.sessionId !== packet.header.sessionId) { self._state.sessionId = packet.header.sessionId; self._state.packetCount = -1; } var buffer = packet.getData(); packet.clear(); - var cb = self._state.receive; + var cb = self._state.receives[packetCount]; self._state.receive = undefined; self._state.messageType = undefined; + self._state.receives[packetCount] = undefined; self.receive(buffer, cb); } } socket.on('data', ondata); function onerror(err) { - var cb = self._state && self._state.receive; - if (cb) { - self._state.receive = null; // a callback should be called only once - cb(err); - } else if (self.listeners('error').length) { + let called = false + for (const packetCount in self._state.receives) { + const cb = self._state.receives[packetCount] + if (cb) { + self._state.receives[packetCount] = undefined + called = true + cb(err) + } + } + if (called) { return } + if (self.listeners('error').length) { self.emit('error', err); } else { debug('onerror', err); @@ -293,6 +304,7 @@ Connection.prototype._clearQueue = function _clearQueue(err) { Connection.prototype.send = function send(message, receive) { if (this._statementContext) { message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions()); + this._statementContext = undefined } if (this._clientInfo.shouldSend(message.type)) { message.add(PartKind.CLIENT_INFO, this._clientInfo.getUpdatedProperties()); @@ -301,35 +313,57 @@ Connection.prototype.send = function send(message, receive) { debug('send', message); trace('REQUEST', message); - var size = MAX_PACKET_SIZE - PACKET_HEADER_LENGTH; - var buffer = message.toBuffer(size); - var packet = new Buffer(PACKET_HEADER_LENGTH + buffer.length); - buffer.copy(packet, PACKET_HEADER_LENGTH); + const buffers = message.toBuffer(); + var bufferLength = PACKET_HEADER_LENGTH + buffers.byteLength + + var packet = Buffer.allocUnsafe(bufferLength); var state = this._state; - state.messageType = message.type; + + // state.messageType = message.type; state.receive = receive; + // Increase packet count state.packetCount++; + const packetCount = state.packetCount; + + state.receives[packetCount] = receive; + // Session identifier bignum.writeUInt64LE(packet, state.sessionId, 0); // Packet sequence number in this session // Packets with the same sequence number belong to one request / reply pair - packet.writeUInt32LE(state.packetCount, 8); + packet.writeUInt32LE(packetCount, 8); // Used space in this packet - packet.writeUInt32LE(buffer.length, 12); + packet.writeUInt32LE(bufferLength - PACKET_HEADER_LENGTH, 12); // Total space in this packet - packet.writeUInt32LE(size, 16); + packet.writeUInt32LE(bufferLength, 16); // Number of segments in this packet packet.writeUInt16LE(1, 20); // Filler packet.fill(0x00, 22, PACKET_HEADER_LENGTH); - // Write request packet to socket - if (this._socket) { - this._socket.write(packet); + + let offset = PACKET_HEADER_LENGTH + let remaining = bufferLength - PACKET_HEADER_LENGTH + + message.updateBuffer(buffers, remaining) + for (let i = 0; i < buffers.length; i++) { + const buffer = buffers[i] + buffer.copy(packet, offset) + offset += buffer.length + } + + this._socketWriteQueue.push(packet) + if (this._socketWriteQueue.length === 1) { + this._socketWriteNext() } }; +Connection.prototype._socketWriteNext = function _socketWriteNext() { + if (this._socketWriteQueue.length === 0) return + const drain = this._socketWriteQueue[0] + this._socket.write(drain) +} Connection.prototype.getClientInfo = function getClientInfo() { return this._clientInfo; @@ -390,15 +424,13 @@ Connection.prototype.receive = function receive(buffer, cb) { if (error && error.fatal) { this.destroy(error); } - if(cb) { + if (cb) { cb(error, reply); } }; Connection.prototype.enqueue = function enqueue(task, cb) { - var queueable; - - if (!this._socket || !this._queue || this.readyState === 'closed') { + if (!this._socket || this.readyState === 'closed') { var err = new Error('Connection closed'); err.code = 'EHDBCLOSE'; if (cb) { @@ -408,18 +440,15 @@ Connection.prototype.enqueue = function enqueue(task, cb) { } } if (util.isFunction(task)) { - queueable = this._queue.createTask(task, cb); - queueable.name = task.name; - } else if (util.isObject(task)) { + return task(cb) + } + if (util.isObject(task)) { if (task instanceof request.Segment) { - queueable = this._queue.createTask(this.send.bind(this, task), cb); - queueable.name = MessageTypeName[task.type]; - } else if (util.isFunction(task.run)) { - queueable = task; + return this.send(task, cb) + } + if (util.isFunction(task.run)) { + return task.run() } - } - if (queueable) { - this._queue.push(queueable); } }; @@ -428,26 +457,26 @@ Connection.prototype._createAuthenticationManager = auth.createManager; Connection.prototype.connect = function connect(options, cb) { var self = this; var manager; - for(var key in options) { - if(key.toUpperCase().startsWith("SESSIONVARIABLE:")) { + for (var key in options) { + if (key.toUpperCase().startsWith("SESSIONVARIABLE:")) { var sv_key = key.substring(key.indexOf(":") + 1); var sv_value = options[key]; - if(sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { + if (sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { this._clientInfo.setProperty(sv_key, sv_value); } delete options[key]; } } this.connectOptions.setOptions([{ - name : common.ConnectOption.OS_USER, - value : this._clientInfo.getUser() + name: common.ConnectOption.OS_USER, + value: this._clientInfo.getUser() }]); this.clientContextOptions.setOptions([{ - name : common.ClientContextOption.CLIENT_APPLICATION_PROGRAM, - value : this._clientInfo.getApplication() + name: common.ClientContextOption.CLIENT_APPLICATION_PROGRAM, + value: this._clientInfo.getApplication() }]); - if(options["disableCloudRedirect"] == true) { - this._redirectType = common.RedirectType.REDIRECTION_DISABLED; + if (options["disableCloudRedirect"] == true) { + this._redirectType = common.RedirectType.REDIRECTION_DISABLED; } try { manager = this._createAuthenticationManager(options); @@ -469,7 +498,6 @@ Connection.prototype.connect = function connect(options, cb) { if (manager.sessionCookie) { self._settings.sessionCookie = manager.sessionCookie; } - self._queue.resume(); cb(null, reply); } @@ -477,7 +505,7 @@ Connection.prototype.connect = function connect(options, cb) { if (err) { return cb(err, reply); } - manager.initialize(reply.authentication, function(err) { + manager.initialize(reply.authentication, function (err) { if (err) return cb(err); var redirectOptions = [] if (typeof self._initialHost === 'undefined') { @@ -536,7 +564,7 @@ Connection.prototype.connect = function connect(options, cb) { useCesu8: self.useCesu8 } - if(this._redirectType == common.RedirectType.REDIRECTION_NONE) { + if (this._redirectType == common.RedirectType.REDIRECTION_NONE) { authOptions.dbConnectInfo = true; } @@ -551,14 +579,7 @@ Connection.prototype.disconnect = function disconnect(cb) { cb(err, reply); } - function enqueueDisconnect() { - self.enqueue(request.disconnect(), done); - } - - if (this.isIdle()) { - return enqueueDisconnect(); - } - this._queue.once('drain', enqueueDisconnect); + return this.send(request.disconnect(), done) }; Connection.prototype.executeDirect = function executeDirect(options, cb) { @@ -634,14 +655,14 @@ Connection.prototype.rollback = function rollback(options, cb) { this.enqueue(request.rollback(options), cb); }; - // The function doesn't use the queue. It's used before the queue starts running +// The function doesn't use the queue. It's used before the queue starts running Connection.prototype.fetchDbConnectInfo = function (options, cb) { if (this.readyState == 'closed') { var err = new Error('Connection unexpectedly closed'); err.code = 'EHDBCLOSE'; return cb(err) } - this.send(request.dbConnectInfo(options), function(err, reply) { + this.send(request.dbConnectInfo(options), function (err, reply) { if (err) { return cb(err); } @@ -659,19 +680,11 @@ Connection.prototype._closeSilently = function _closeSilently() { }; Connection.prototype.close = function close() { - var self = this; - - function closeConnection() { - debug('close'); - self.destroy(); - } if (this.readyState === 'closed') { return; } - if (this.isIdle()) { - return closeConnection(); - } - this._queue.once('drain', closeConnection); + debug('close'); + this.destroy(); }; Connection.prototype.destroy = function destroy(err) { @@ -680,10 +693,6 @@ Connection.prototype.destroy = function destroy(err) { } }; -Connection.prototype.isIdle = function isIdle() { - return this._queue.empty && !this._queue.busy; -}; - Connection.prototype.setAutoCommit = function setAutoCommit(autoCommit) { this._transaction.autoCommit = autoCommit; }; @@ -711,6 +720,7 @@ function ConnectionState() { this.packetCount = -1; this.messageType = undefined; this.receive = undefined; + this.receives = {}; } function Version(major, minor) { @@ -738,6 +748,6 @@ InitializationReply.read = function readInitializationReply(buffer) { return new InitializationReply(productVersion, protocolVersion); }; -var initializationRequestBuffer = new Buffer([ +var initializationRequestBuffer = Buffer.from([ 0xff, 0xff, 0xff, 0xff, 4, 20, 0, 4, 1, 0, 0, 1, 1, 1 ]); diff --git a/lib/protocol/ExecuteTask.js b/lib/protocol/ExecuteTask.js index 7eaff8e..fca4121 100644 --- a/lib/protocol/ExecuteTask.js +++ b/lib/protocol/ExecuteTask.js @@ -45,12 +45,11 @@ ExecuteTask.create = function createExecuteTask(connection, options, cb) { return new ExecuteTask(connection, options, cb); }; -ExecuteTask.prototype.run = function run(next) { +ExecuteTask.prototype.run = function run() { var self = this; function done(err) { self.end(err); - next(); } function finalize(err) { diff --git a/lib/protocol/request/Part.js b/lib/protocol/request/Part.js index 92756d8..f7192d5 100644 --- a/lib/protocol/request/Part.js +++ b/lib/protocol/request/Part.js @@ -28,10 +28,13 @@ function Part(options) { this.buffer = undefined; } +Part.prototype.getLength = function () { + return PART_HEADER_LENGTH + util.alignLength(this.buffer.length, 8) +} -Part.prototype.toBuffer = function toBuffer(size) { +Part.prototype.toBuffer = function toBuffer(remaining) { var byteLength = util.alignLength(this.buffer.length, 8); - var buffer = new Buffer(PART_HEADER_LENGTH + byteLength); + var buffer = Buffer.allocUnsafe(PART_HEADER_LENGTH + byteLength); // Part kind, specifies nature of part data buffer.writeInt8(this.kind, 0); // Further attributes of part @@ -43,7 +46,7 @@ Part.prototype.toBuffer = function toBuffer(size) { // Length of part buffer in bytes buffer.writeInt32LE(this.buffer.length, 8); // Length in packet remaining without this part. - buffer.writeInt32LE(size, 12); + buffer.writeInt32LE(remaining, 12); this.buffer.copy(buffer, PART_HEADER_LENGTH); if (this.buffer.length < byteLength) { buffer.fill(0x00, PART_HEADER_LENGTH + this.buffer.length); diff --git a/lib/protocol/request/Segment.js b/lib/protocol/request/Segment.js index d5162ac..6941453 100644 --- a/lib/protocol/request/Segment.js +++ b/lib/protocol/request/Segment.js @@ -72,19 +72,16 @@ Segment.prototype.add = function add(kind, args) { } }; -Segment.prototype.toBuffer = function toBuffer(size) { - size = size || MAX_SEGMENT_SIZE; - var remainingSize = size - SEGMENT_HEADER_LENGTH; +Segment.prototype.toBuffer = function toBuffer() { var length = SEGMENT_HEADER_LENGTH; var buffers = []; for (var i = 0; i < this.parts.length; i++) { - var buffer = partToBuffer(this.parts[i], remainingSize, this.useCesu8); - remainingSize -= buffer.length; - length += buffer.length; - buffers.push(buffer); + var part = partToBuffer(this.parts[i], this.useCesu8); + length += part.getLength(); + buffers.push(part); } - var header = new Buffer(SEGMENT_HEADER_LENGTH); + var header = Buffer.allocUnsafe(SEGMENT_HEADER_LENGTH); // Length of the segment, including the header header.writeInt32LE(length, 0); // Offset of the segment within the message buffer @@ -105,16 +102,28 @@ Segment.prototype.toBuffer = function toBuffer(size) { header.fill(0x00, 16, SEGMENT_HEADER_LENGTH); buffers.unshift(header); - return Buffer.concat(buffers, length); + buffers.byteLength = length; + // return Buffer.concat(buffers, length); + return buffers }; -function partToBuffer(pd, remainingSize, useCesu8) { +Segment.prototype.updateBuffer = function updateBuffer(buffers, remaining) { + remaining -= buffers[0].length + for (var i = 1; i < buffers.length; i++) { + const part = buffers[i] + remaining -= part.getLength() + buffers[i] = part.toBuffer(remaining) + } + return buffers +} + +function partToBuffer(pd, useCesu8) { var m = pd.module || data[pd.kind]; var part = new Part({ kind: pd.kind, useCesu8: useCesu8 }); part.argumentCount = m.getArgumentCount(pd.args); - m.write(part, pd.args, remainingSize); - return part.toBuffer(remainingSize); + m.write(part, pd.args); + return part; } diff --git a/lib/util/Queue.js b/lib/util/Queue.js deleted file mode 100644 index 85cb7dc..0000000 --- a/lib/util/Queue.js +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2013 SAP AG. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http: //www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, -// either express or implied. See the License for the specific -// language governing permissions and limitations under the License. -'use strict'; - -var util = require('util'); -var EventEmitter = require('events').EventEmitter; - -module.exports = Queue; - -util.inherits(Queue, EventEmitter); - -function Queue(immediate) { - EventEmitter.call(this); - - this.queue = []; - this.busy = false; - this.running = !!immediate; -} - -Object.defineProperty(Queue.prototype, 'empty', { - get: function isEmpty() { - return this.queue.length === 0; - } -}); - -Queue.prototype.unshift = function unshift(task) { - this.queue.unshift(task); - if (this.running) { - this.dequeue(); - } - return this; -}; - -Queue.prototype.push = function push(task) { - this.queue.push(task); - if (this.running) { - this.dequeue(); - } - return this; -}; - -Queue.prototype.resume = function resume() { - this.running = true; - if (this.queue.length) { - this.dequeue(); - } - return this; -}; - -Queue.prototype.pause = function pause() { - this.running = false; - return this; -}; - -Queue.prototype.abort = function abort(err) { - this.queue.forEach(t => t.receive(err)) - this.queue = []; - this.busy = false; - this.running = false; - this.removeAllListeners(); - return this; -}; - -Queue.prototype.createTask = function createTask(send, receive, name) { - return new Task(send, receive, name); -}; - -Queue.prototype.dequeue = function dequeue() { - var self = this; - - function next(err, name) { - /* jshint unused:false */ - self.busy = false; - if (self.queue.length) { - run(); - } else { - self.emit('drain'); - } - } - - function run() { - if (self.running && !self.busy) { - // Queue is running and not busy - self.busy = true; - var task = self.queue.shift(); - task.run(next); - } - } - run(); -}; - -function Task(send, receive, name) { - this.send = send; - this.receive = receive; - this.name = name; -} - -Task.prototype.run = function run(next) { - var self = this; - - function receive() { - /* jshint validthis:true */ - self.receive.apply(null, arguments); - next(null, self.name); - } - try { - this.send(receive); - } catch (err) { - process.nextTick(function () { - receive(err); - }); - } -}; diff --git a/lib/util/index.js b/lib/util/index.js index b534596..fedb1fe 100644 --- a/lib/util/index.js +++ b/lib/util/index.js @@ -102,7 +102,6 @@ function exportNativeUtil(fn) { exports.bignum = require('./bignum'); exports.calendar = require('./calendar'); exports.convert = require('./convert'); -exports.Queue = require('./Queue'); extend(exports, require('./zeropad')); function extend(obj) { diff --git a/test/util.Queue.js b/test/util.Queue.js deleted file mode 100644 index f8681df..0000000 --- a/test/util.Queue.js +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2013 SAP AG. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http: //www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, -// either express or implied. See the License for the specific -// language governing permissions and limitations under the License. -'use strict'; -/* jshint expr: true */ - -var lib = require('../lib'); -var Queue = lib.util.Queue; - -function createTask(reply, cb) { - function send(cb) { - setTimeout(function () { - cb(null, reply); - }, 1); - } - return Queue.prototype.createTask(send, cb, 'standard'); -} - -function createErrorTask(message, cb) { - function send(cb) { - setTimeout(function () { - cb(new Error(message)); - }, 1); - } - return Queue.prototype.createTask(send, cb, 'error'); -} - -function createThrowTask(message, cb) { - function send() { - throw new Error(message); - } - return Queue.prototype.createTask(send, cb, 'throw'); -} - -describe('Util', function () { - - describe('#Queue', function () { - - it('should create a standard queue', function (done) { - var replies = []; - var q = new Queue(); - q.empty.should.be.true; - q.busy.should.be.false; - q.running.should.be.false; - q.push(createTask('foo', function (err, reply) { - replies.push(reply); - })); - q.push(createErrorTask('abc', function (err) { - replies.push(err.message); - })); - q.unshift(createThrowTask('def', function (err) { - replies.push(err.message); - })); - q.push(createTask('bar', function (err, reply) { - replies.push(reply); - })); - q.on('drain', function () { - replies.should.eql(['def', 'foo', 'abc', 'bar']); - done(); - }); - q.resume(); - }); - - it('should create a running queue', function (done) { - var replies = []; - var q = new Queue(true); - q.empty.should.be.true; - q.busy.should.be.false; - q.running.should.be.true; - q.push(createTask('foo', function (err, reply) { - replies.push(reply); - })); - q.unshift(createTask('bar', function (err, reply) { - replies.push(reply); - })); - q.on('drain', function () { - replies.should.eql(['foo', 'bar']); - done(); - }); - }); - - }); - -}); \ No newline at end of file