|
|
'use strict';
var Queue = require('denque'); var utils = require('./utils'); var Command = require('./command');
function Multi (client, args) { this._client = client; this.queue = new Queue(); var command, tmp_args; if (args) { // Either undefined or an array. Fail hard if it's not an array
for (var i = 0; i < args.length; i++) { command = args[i][0]; tmp_args = args[i].slice(1); if (Array.isArray(command)) { this[command[0]].apply(this, command.slice(1).concat(tmp_args)); } else { this[command].apply(this, tmp_args); } } } }
function pipeline_transaction_command (self, command_obj, index) { // Queueing is done first, then the commands are executed
var tmp = command_obj.callback; command_obj.callback = function (err, reply) { // Ignore the multi command. This is applied by node_redis and the user does not benefit by it
if (err && index !== -1) { if (tmp) { tmp(err); } err.position = index; self.errors.push(err); } // Keep track of who wants buffer responses:
// By the time the callback is called the command_obj got the buffer_args attribute attached
self.wants_buffers[index] = command_obj.buffer_args; command_obj.callback = tmp; }; self._client.internal_send_command(command_obj); }
Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) { if (this.queue.length < 2) { return this.exec_batch(callback); } return this.exec(callback); };
function multi_callback (self, err, replies) { var i = 0, command_obj;
if (err) { err.errors = self.errors; if (self.callback) { self.callback(err); // Exclude connection errors so that those errors won't be emitted twice
} else if (err.code !== 'CONNECTION_BROKEN') { self._client.emit('error', err); } return; }
if (replies) { while (command_obj = self.queue.shift()) { if (replies[i] instanceof Error) { var match = replies[i].message.match(utils.err_code); // LUA script could return user errors that don't behave like all other errors!
if (match) { replies[i].code = match[1]; } replies[i].command = command_obj.command.toUpperCase(); if (typeof command_obj.callback === 'function') { command_obj.callback(replies[i]); } } else { // If we asked for strings, even in detect_buffers mode, then return strings:
replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]); if (typeof command_obj.callback === 'function') { command_obj.callback(null, replies[i]); } } i++; } }
if (self.callback) { self.callback(null, replies); } }
Multi.prototype.exec_transaction = function exec_transaction (callback) { if (this.monitoring || this._client.monitoring) { var err = new RangeError( 'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.' ); err.command = 'EXEC'; err.code = 'EXECABORT'; return utils.reply_in_order(this._client, callback, err); } var self = this; var len = self.queue.length; self.errors = []; self.callback = callback; self._client.cork(); self.wants_buffers = new Array(len); pipeline_transaction_command(self, new Command('multi', []), -1); // Drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < len; index++) { // The commands may not be shifted off, since they are needed in the result handler
pipeline_transaction_command(self, self.queue.get(index), index); }
self._client.internal_send_command(new Command('exec', [], function (err, replies) { multi_callback(self, err, replies); })); self._client.uncork(); return !self._client.should_buffer; };
function batch_callback (self, cb, i) { return function batch_callback (err, res) { if (err) { self.results[i] = err; // Add the position to the error
self.results[i].position = i; } else { self.results[i] = res; } cb(err, res); }; }
Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) { var self = this; var len = self.queue.length; var index = 0; var command_obj; if (len === 0) { utils.reply_in_order(self._client, callback, null, []); return !self._client.should_buffer; } self._client.cork(); if (!callback) { while (command_obj = self.queue.shift()) { self._client.internal_send_command(command_obj); } self._client.uncork(); return !self._client.should_buffer; } var callback_without_own_cb = function (err, res) { if (err) { self.results.push(err); // Add the position to the error
var i = self.results.length - 1; self.results[i].position = i; } else { self.results.push(res); } // Do not emit an error here. Otherwise each error would result in one emit.
// The errors will be returned in the result anyway
}; var last_callback = function (cb) { return function (err, res) { cb(err, res); callback(null, self.results); }; }; self.results = []; while (command_obj = self.queue.shift()) { if (typeof command_obj.callback === 'function') { command_obj.callback = batch_callback(self, command_obj.callback, index); } else { command_obj.callback = callback_without_own_cb; } if (typeof callback === 'function' && index === len - 1) { command_obj.callback = last_callback(command_obj.callback); } this._client.internal_send_command(command_obj); index++; } self._client.uncork(); return !self._client.should_buffer; };
module.exports = Multi;
|