/**
* @file {@link ProcessDispatcher} is simply a container of pre-forked {@link ModuleProcess} instances.
* It also a set of useful tools.
* @author Denis Ilguzin
*/
var async = require("async");
var utils = require("./utils"), defaultLogger = utils.logger, range = utils.range;
var ModuleProcess = require ("./moduleProcess");
/**
* <p>Callback to run on function finished. <i>Note: It might contain arbitrary number of arguments (required at
* least one - error)</i></p>
*
* @callback ProcessDispatcher~onReadyCallback
* @param error {Error} An Error object if the callback has raised an error.
*/
/**
* <p><i><strong>Note: before running dispatching workflow do not forget to init listener on the module.</strong>
* For that purpose, please invoke {@link ModuleProcess.listenIPCMessages} in the module root. It will engage module
* to listen to IPC messages and expose module API available via sub-processing.</i></p>
*
* @param moduleName {string} Module name (script file name). E.g. __filename
* @param moduleOpts {Object} Options object to pass to the module
* @param logger {Object}
* @class
*/
function ProcessDispatcher (moduleName, moduleOpts, logger) {
if (logger === undefined) { logger = defaultLogger; }
this.moduleName = moduleName;
this.moduleOpts = moduleOpts;
this.availableProcesses = [];
this.logger = logger;
}
/**
* <p>Create a module sub-process and configure its listener to accepts messages.</p>
*
* @param moduleName
* @param moduleOpts
* @param logger
* @returns {ProcessDispatcher~onReadyCallback}
*/
ProcessDispatcher.makeModuleProcess = function (moduleName, moduleOpts, logger) {
return function (callback) { new ModuleProcess(moduleName, moduleOpts, logger).init(callback); };
};
/**
* <p>Dispatch function invocation to existing moduleProcess.</p>
*
* @param moduleName {string} A js filename containing module function (!) (e.g. module.exports = function() { }).
* I.e. module that is defined by function.
* @param moduleOpts {Object} Will be passed to module-function to create a module instance.
* @param logger
* @param functionName {string} Function to be invoked (should be defined on the module).
* @param params {Object} Parameters mapping.
* @returns {ProcessDispatcher~onReadyCallback} On execution finished.
*/
ProcessDispatcher.dispatchToModule = function(moduleName, moduleOpts, logger, functionName, params) {
return function (callback) {
async.waterfall([
function (callback) {
ProcessDispatcher.makeModuleProcess(moduleName, moduleOpts, logger)(callback);
},
function (moduleProcess, callback) {
ProcessDispatcher.dispatchToModuleProcess(moduleProcess, functionName, params)(callback);
}
], callback);
};
};
/**
* <p>Dispatch function invocation to existing moduleProcess.</p>
*
* @param moduleProcess
* @param functionName {string} Function defined on the module.
* @param params {Object} Parameters mapping.
* @returns {ProcessDispatcher~onReadyCallback} On execution finished.
*/
ProcessDispatcher.dispatchToModuleProcess = function (moduleProcess, functionName, params) {
return function (callback) { moduleProcess.invoke(functionName, params)(callback); };
};
/**
* <p>Create listener to catch the IPC messages within the process the module is running in.</p>
*
* @deprecated This is no longer a part of {@link ProcessDispatcher}. Migrated to
* {@link ModuleProcess.listenIPCMessages}.
*/
ProcessDispatcher.listenIPCMessages = function (moduleFn, moduleFileName, logger) {
ModuleProcess.listenIPCMessages(moduleFn, moduleFileName, logger);
};
/**
* <p>Creates round robin ids for preforked subprocesses. See how {@link ProcessDispatcher.dispatch} works.</p>
*/
ProcessDispatcher.prototype.updateRRId = function () {
this._rrIds = range(this.availableProcesses.length);
};
/**
* <p>Implementation of FIFO queue for selecting of pre-forked subprocess ids.</p>
* <p>See how {@link ProcessDispatcher.dispatch} works in this regard.</p>
*
* @returns {Number} Number of sub-process ID.
*/
ProcessDispatcher.prototype.getNextProcId = function () {
var nextId = this._rrIds.shift();
this._rrIds.push(nextId);
return nextId;
};
/**
* <p>Pre-fork subprocess and update a list of available sub-process ids.</p>
*
* @param num
* @returns {ProcessDispatcher~onReadyCallback} On execution finished.
*/
ProcessDispatcher.prototype.preFork = function (num) {
var _this = this;
_this.logger.debug("preFork " + num);
return function (callback) {
async.series(
range(num).map( function () {
return function (callback) { ProcessDispatcher.makeModuleProcess(_this.moduleName, _this.moduleOpts, _this.logger)(callback); };
} ),
function (error, moduleProcesses) {
_this.logger.debug("preFork created " + moduleProcesses.length);
if (!error && moduleProcesses) {
_this.availableProcesses = moduleProcesses;
_this.updateRRId();
callback();
} else {
callback(error);
}
}
);
};
};
/**
* <p>Dispatch arbitrary function call to available {@link ModuleProcess} instance.</p>
* <p>The {@link ModuleProcess} selection algorithm is round robin based. See {@link getNextProcId} in this regard.</p>
*
* @param functionName Function name to be called. The desired module wrapped by {@link ModuleProcess} should expose
* API via {@link ProcessDispatcher.listenIPCMessages}. The functionName should be a part of that
* API.
* @param params The parameters to be passed to the functionName
* @returns {ProcessDispatcher~onReadyCallback} On execution finished.
*/
ProcessDispatcher.prototype.dispatch = function (functionName, params) {
//this.logger.debug("ProcessDispatcher.dispatch t: ", process.hrtime()[1]);
var _this = this;
var moduleProcess;
var availableProcessNum = this.availableProcesses.length;
if (availableProcessNum) {
var processNumber = _this.getNextProcId();
_this.logger.debug("Dispatching '" + functionName + "' to process number " + processNumber + ". Available processes number " + availableProcessNum);
moduleProcess = _this.availableProcesses[processNumber];
}
return function (callback) {
async.waterfall([
function (callback) {
// Make a descicion on create new process or use existing one.
if (moduleProcess) {
_this.logger.debug("Make use of existing process for '" + _this.moduleName + "' invocation of '" + functionName + "'");
callback(null, moduleProcess);
} else {
// var errorMessage = "No '" + _this.moduleName + "' processes available yet for invocation of '" + functionName + "'";
// _this.logger.debug(errorMessage);
// callback(new Error(errorMessage));
_this.logger.debug("Creating new process for '" + _this.moduleName + "' invocation of '" + functionName + "'");
ProcessDispatcher.makeModuleProcess(_this.moduleName, _this.moduleOpts, _this.logger)(callback);
}
},
function (moduleProcess, callback) {
ProcessDispatcher.dispatchToModuleProcess(moduleProcess, functionName, params)(callback);
}
], callback);
};
};
module.exports = ProcessDispatcher;