Source: moduleProcess.js

/**
 * @file {@link ModuleProcess} represents a wrapper enabling interprocess communication functionality for
 * arbitrary module which engages it to be available as a sub-process invocation. {@link ModuleProcess} is a core part
 * of sub-processing dispatching.
 * @author Denis Ilguzin
 */

var childProcess = require("child_process");

var stringify  = require('json-stringify-safe');
var _  = require('underscore');

var COMMANDS = require("./commands");
var defaultLogger = require("./utils").logger;

/**
 * <p>Callback to run on function finished. <i>Note: It might contain arbitrary number of arguments (required at
 * least one - error)</i></p>
 *
 * @callback ModuleProcess~onReadyCallback
 * @param error {Error}   An Error object if the callback has raised an error.
 */

/**
 * <p>ModuleProcess</p>
 *
 * <p>A wrapper for module to make it running its functionality within subprocess accepting IPC messages
 * as commands.</p>
 *
 * <p>The main purpose of the {@link ModuleProcess} is to invoke within subprocess any module function by name
 * (available via module.exports of wrapped module) passing parameters to it (see {@link ModuleProcess.invoke}
 * for details). The function name to call and parameters are accepted via IPC, the callback invoked then on IPC
 * chanel in form of message back to parent.</p>
 *
 * @param moduleName {String}   Module name (script file name). E.g. __filename
 * @param moduleOpts {Object}   Options object to pass to the module
 * @param logger
 * @class
 */
function ModuleProcess (moduleName, moduleOpts, logger) {

  if (logger === undefined) { logger = defaultLogger; }

  this._initialized = false;

  this.moduleName = moduleName;
  this.moduleOpts = moduleOpts;

  this.invokeCallbacks = {};

  this.logger = logger;
}

/**
 * <p>Create listener to catch the IPC messages within the process the module is running in.</p>
 *
 * <p><i>Important: The module should be running within process otherwise registering listener makes no sense.
 * This will invoke process.on('message', function() {}) to listen for parent process commands.</i></p>
 *
 * <p>Use {@link ProcessDispatcher} to manage module processes.</p>
 *
 * @param moduleFn {Function}     Module function exposing the API for {@link ProcessDispatcher.dispatch} calls.
 * @param moduleFileName {String} File name (whole patch with file name) of the module defined by module function.
 * @param logger
 */
ModuleProcess.listenIPCMessages = function(moduleFn, moduleFileName, logger) {

  if (logger === undefined) { logger = defaultLogger; }

  /**
   * Only enable listener within sub-process. See {@link ModuleProcess.init} for process.env.PROC_NAME initialization.
   */
  if (process.env.PROC_NAME === moduleFileName) {

    logger.debug("Mimic module as process " + process.env.PROC_NAME);

    var module;

    /**
     * Listen to IPC messages, convert them to module functionality calls (adding callback), run this calls within
     * current process and once response has come invoke callback to send (on the same IPC chanel) a response to
     * parent process.
     */
    process.on('message', function(message) {

      logger.debug("Module '" + process.env.PROC_NAME + "' has gotten message: command: " + message.command + '; ' + (message.commandId ? "commandId: " + message.commandId + '; ' : '') + (message.functionName ? "functionName: " + message.functionName + '; ' : ''));

      switch (message.command) {
        // TODO : comment
        case COMMANDS.MODULE_PROCESS_COMMAND_INIT:
          var moduleOpts = message.moduleOpts;

          module = moduleFn(moduleOpts);

          if (module.init) {
            module.init(function () {
              process.send({command: message.command, callbackArguments: arguments});
            });
          } else {
            process.send({command: message.command});
          }

          break;

        // TODO : comment
        case COMMANDS.MODULE_PROCESS_COMMAND_INVOKE:
          var functionName = message.functionName;
          var params = message.params;

          var callback = function () {
            process.send({
              sendTime: message.sendTime,
              command: message.command,
              commandId: message.commandId,
              functionName: message.functionName,
              callbackArguments: arguments
            });
          };

          if (module && module[functionName]) {
            module[functionName].call(module, params)(callback);
          } else if (!module) {
            callback(new Error("Module '" + process.env.PROC_NAME + "' is not initialized"));
          } else {
            callback(new Error("'" + message.functionName + "' function does not exist in '" + process.env.PROC_NAME + "' module"));
          }

          break;
      }

    });

    /**
     * Listen to IPC chanel close-events and exit subprocess once chanel has been closed.
     * See https://nodejs.org/api/process.html#process_event_disconnect.
     *
     * Note: this is very important since the main process might have died making the module processes orphans forever.
     * The subprocess module should take care on stopping it-self in such scenarios.
     */
    process.on('disconnect', function () {
      process.exit();
    });
  }

};

/**
 * <p>Make unique command token to use in send/receive model to distinguish calls for the child process.</p>
 *
 * @param command {COMMANDS}  A {@link COMMANDS} definition for possible commands.
 * @returns {string}          Token
 */
ModuleProcess.makeCommandId = function(command) {
  return command + "-" + (new Date().getTime()).toString() + "-" + process.hrtime()[1];
};

/**
 * <p>Bind callback to unique command id (or token) to be called once function finished.</p>
 *
 * @param commandId {String}                        Unique string Token. See. {@link ModuleProcess.makeCommandId}
 * @param callback {ModuleProcess~onReadyCallback} Callback to run on function finished.
 */
ModuleProcess.prototype.addCallback = function(commandId, callback) {
  this.invokeCallbacks[commandId] = callback;
};

/**
 * <p>Unbind callback from Token. See. {@link ModuleProcess.addCallback}.</p>
 *
 * @param commandId {String}  Unique string token. See. {@link ModuleProcess.makeCommandId}
 */
ModuleProcess.prototype.removeCallback = function(commandId) {
  delete this.invokeCallbacks[commandId];
};

/**
 * <p>Get callback by token. See. {@link ModuleProcess.addCallback}, {@link ModuleProcess.makeCommandId}.</p>
 *
 * @param commandId {String}  Unique string token. See. {@link ModuleProcess.makeCommandId}.
 * @return callback {ModuleProcess~onInvokeCallback}  Callback function that has been stored for this commandId.
 */
ModuleProcess.prototype.getCallback = function(commandId) {
  return this.invokeCallbacks[commandId];
};

/**
 * <p>Initialize child process and incoming commands listener.</p>
 *
 * <p>This will do the following steps:
 *
 * <ul>
 *   <li>- Prepare options to be passed for module (functionality options) and process (environment mostly) initialization</li>
 *   <li>- Fork new subprocess from {@link moduleName}</li>
 *   <li>- Start listening incoming IPC messages from forked sub-process to current process:
 *     <ul>
 *       <li>- <i>init command response</i>: will invoke the {@link ModuleProcess.init} function callback, notifying
 *       the module has been initialized in subprocess</li>
 *       <li>- <i>other command response</i>: will invoke a callback registered with {@link addCallback} on incoming
 *       command call via {@link ModuleProcess.invoke}.</li>
 *     </ul>
 *   </li>
 * </ul>
 *
 * @param callback {ModuleProcess~onReadyCallback} Callback to run on initialization has finished.
 */
ModuleProcess.prototype.init = function(callback) {

  this.logger.debug("Init module process on " + this.moduleName);

  //
  if (this._initialized) {
    this.logger.warn("Already initialized");
    return callback(null, this);
  }
  // var debugPort = parseInt(process.pid);

  /**
   * Prepare the options object to be used for forking child process.
   * See. https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options
   */
  var processOpts = _.extend({}, this.moduleOpts.process);

  /**
   * Extend and inherit from current process (main process) environment in sub-process
   */
  processOpts.env = _.extend(
    {},                             // To create new env object instead mutating current
    process.env,                    // Inherit from current process env object
    processOpts.env,                // Inherit from config modulesprocdisp.XXX.process.env if any
    {PROC_NAME: this.moduleName}    // Env settings required by the ModuleProcess functionality
  );

  this.childProcess = childProcess.fork(this.moduleName, processOpts);

  /**
   * Install per {@link ModuleProcess}-instance IPC messages listener.
   *
   * For arbitrary function call (see {@link ModuleProcess.invoke}) will get responses from module from within
   * subprocess and execute previously registered (see invocation of {@link ModuleProcess.addCallback} inside
   * {@link ModuleProcess.invoke}) callback.
   *
   * For first initialization function call
   * (see "this.childProcess.send({command: COMMANDS.MODULE_PROCESS_COMMAND_INIT, ...})" invocation later) will
   * run callback function passed to {@link ModuleProcess.init}
   */
  this.childProcess.on('message', (function(cpResponseMessage) {

    this.logger.debug("Incoming message " + cpResponseMessage);

    var timeElapsed =
      cpResponseMessage.sendTime ? (new Date().getTime() - cpResponseMessage.sendTime) : null;

    this.logger.debug("Module '" + this.moduleName + "' process responded: " +
      "command: " + cpResponseMessage.command +
      (cpResponseMessage.commandId ? "; commandId: " + cpResponseMessage.commandId : '') +
      (cpResponseMessage.functionName ? "; functionName: " + cpResponseMessage.functionName : '') +
      (timeElapsed ? "; time: +" + timeElapsed + "ms" : '')
    );

    var args = cpResponseMessage.callbackArguments ? _.values(cpResponseMessage.callbackArguments) : [];

    switch (cpResponseMessage.command) {
      case COMMANDS.MODULE_PROCESS_COMMAND_INIT:
        var error = args[0];
        if (!error) {
          this._initialized = true;
        }
        callback(error, !error ? this : null);
        break;
      case COMMANDS.MODULE_PROCESS_COMMAND_INVOKE:
        // Get a callback stored for the response command id
        var invokeCallback = this.getCallback(cpResponseMessage.commandId);
        if (invokeCallback) {
          invokeCallback.apply(this, args);                   // Invoke callback and
          this.removeCallback(cpResponseMessage.commandId);  // remove callback from the queue afterwards
        }
        break;
    }

  }).bind(this));

  this.moduleOpts = JSON.parse(stringify(this.moduleOpts, null, 2, null));
  // Send first initializing call
  this.childProcess.send({command: COMMANDS.MODULE_PROCESS_COMMAND_INIT, moduleOpts: this.moduleOpts});


};

/**
 * <p>Request a functionName to be called within subprocess with supplied parameters.
 * This call return a callback-style function wrapping the invoked function. See invokeExample.js below.</p>
 *
 * <p>Behind the scenes...</p>
 *
 * <p>The function invocation workflow is based on the following:</p>
 *
 * <p>1. On {@link ModuleProcess} init the module is wrapped by subprocess. Since that the {@link ModuleProcess} object
 *  API contains module members that are exposed via {@link ProcessDispatcher.listenIPCMessages}.
 *  First parameter in {@link ProcessDispatcher.listenIPCMessages} is 'moduleFn' (function(moduleOpts) { ... }).
 *  {@link moduleOpts} is used eventually as 'moduleOpts' parameter for that function.
 *  Then the functionName passed to the {@link ModuleProcess.invoke} is one of that functions installed via
 *  {@link ProcessDispatcher.listenIPCMessages} invocation.</p>
 *
 *  <p>2. Once functionName invocation requested the callback is registered for that particular function call (i.e.
 *  unique token created for the function call)</p>
 *
 *  <p>3. Later under the hood the callback will be called when the subprocess that wraps the module will respond via
 *  IPC message to its parent process. The response will be caught by {@link ModuleProcess} functionality.
 *  To understand that see how initialization goes in {@link ModuleProcess.init}, the
 *  "case COMMANDS.MODULE_PROCESS_COMMAND_INVOKE" block is what catching command result responses and calling callback
 *  that is registered when {@link ModuleProcess.invoke} has been called.</p>
 *
 * @example <caption>invokeExample.js</caption>
 * var modProc = ...; // modProc is instance of ModuleProcess
 * modProc.invoke('foo', {param1: "test"})(function(error, result) {
 *   console.log("result is ", result);
 * });
 *
 * @param functionName {string}   Function name to be invoked. It is a wrapped module module.export member.
 * @param params {object}         Params to be passed to function.
 * @returns {ModuleProcess~onReadyCallback}  A callback function to be run on command invocation has finished.
 */
ModuleProcess.prototype.invoke = function(functionName, params) {
  // TODO : check if this.childProcess is not null

  var _this = this;

  var commandId = ModuleProcess.makeCommandId(COMMANDS.MODULE_PROCESS_COMMAND_INVOKE);

  /**
   * The {@param callback} will be invoked by {@link getCallback} from within {@link init} function
   * within {@link COMMANDS.MODULE_PROCESS_COMMAND_INVOKE} command branch.
   */
  return function(callback) {

    _this.addCallback(commandId, callback);

    _this.childProcess.send({
      sendTime: new Date().getTime(),
      command: COMMANDS.MODULE_PROCESS_COMMAND_INVOKE,
      commandId: commandId,
      functionName: functionName,
      params: params
    });

  };
};

module.exports = ModuleProcess;