Click here to Skip to main content
15,896,606 members
Articles / Web Development / Node.js

Node.Js And Stuff

Rate me:
Please Sign up or sign in to vote.
4.97/5 (55 votes)
11 Feb 2013CPOL23 min read 362.1K   2.3K   172  
Small demo app using Node.Js/Socket.IO/MongoDB/D3.Js and jQuery.
var utils = require('./connection_utils'),
  inherits = require('util').inherits,
  net = require('net'),
  EventEmitter = require('events').EventEmitter,
  inherits = require('util').inherits,
  MongoReply = require("../responses/mongo_reply").MongoReply,
  Connection = require("./connection").Connection;

var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bson, socketOptions) {
  if(typeof host !== 'string' || typeof port !== 'number') throw "host and port must be specified [" + host + ":"  + port + "]";
  // Set up event emitter
  EventEmitter.call(this);
  // Keep all options for the socket in a specific collection allowing the user to specify the
  // Wished upon socket connection parameters
  this.socketOptions = typeof socketOptions === 'object' ? socketOptions : {};
  this.socketOptions.host = host;
  this.socketOptions.port = port;
  this.socketOptions.domainSocket = false;
  this.bson = bson;
  // PoolSize is always + 1 for special reserved "measurment" socket (like ping, stats etc)
  this.poolSize = poolSize;
  this.minPoolSize = Math.floor(this.poolSize / 2) + 1;

  // Check if the host is a socket
  if(host.match(/^\//)) this.socketOptions.domainSocket = true;

  // Set default settings for the socket options
  utils.setIntegerParameter(this.socketOptions, 'timeout', 0);
  // Delay before writing out the data to the server
  utils.setBooleanParameter(this.socketOptions, 'noDelay', true);
  // Delay before writing out the data to the server
  utils.setIntegerParameter(this.socketOptions, 'keepAlive', 0);
  // Set the encoding of the data read, default is binary == null
  utils.setStringParameter(this.socketOptions, 'encoding', null);
  // Allows you to set a throttling bufferSize if you need to stop overflows
  utils.setIntegerParameter(this.socketOptions, 'bufferSize', 0);

  // Internal structures
  this.openConnections = [];
  // Assign connection id's
  this.connectionId = 0;

  // Current index for selection of pool connection
  this.currentConnectionIndex = 0;
  // The pool state
  this._poolState = 'disconnected';
  // timeout control
  this._timeout = false;
}

inherits(ConnectionPool, EventEmitter);

ConnectionPool.prototype.setMaxBsonSize = function(maxBsonSize) {
  if(maxBsonSize == null){
    maxBsonSize = Connection.DEFAULT_MAX_BSON_SIZE;
  }

  for(var i = 0; i < this.openConnections.length; i++) {
    this.openConnections[i].maxBsonSize = maxBsonSize;
  }
}

// Start a function
var _connect = function(_self) {
  return new function() {
    // Create a new connection instance
    var connection = new Connection(_self.connectionId++, _self.socketOptions);
    // Set logger on pool
    connection.logger = _self.logger;
    // Connect handler
    connection.on("connect", function(err, connection) {
      // Add connection to list of open connections
      _self.openConnections.push(connection);
      // If the number of open connections is equal to the poolSize signal ready pool
      if(_self.openConnections.length === _self.poolSize && _self._poolState !== 'disconnected') {
        // Set connected
        _self._poolState = 'connected';
        // Emit pool ready
        _self.emit("poolReady");
      } else if(_self.openConnections.length < _self.poolSize) {
        // We need to open another connection, make sure it's in the next
        // tick so we don't get a cascade of errors
        process.nextTick(function() {
          _connect(_self);
        });
      }
    });

    var numberOfErrors = 0

    // Error handler
    connection.on("error", function(err, connection) {
      numberOfErrors++;
      // If we are already disconnected ignore the event
      if(_self._poolState != 'disconnected' && _self.listeners("error").length > 0) {
        _self.emit("error", err);
      }

      // Set disconnected
      _self._poolState = 'disconnected';
      // Stop
      _self.stop();
    });

    // Close handler
    connection.on("close", function() {
      // If we are already disconnected ignore the event
      if(_self._poolState !== 'disconnected' && _self.listeners("close").length > 0) {
        _self.emit("close");
      }
      // Set disconnected
      _self._poolState = 'disconnected';
      // Stop
      _self.stop();
    });

    // Timeout handler
    connection.on("timeout", function(err, connection) {
      // If we are already disconnected ignore the event
      if(_self._poolState !== 'disconnected' && _self.listeners("timeout").length > 0) {
        _self.emit("timeout", err);
      }
      // Set disconnected
      _self._poolState = 'disconnected';
      // Stop
      _self.stop();
    });

    // Parse error, needs a complete shutdown of the pool
    connection.on("parseError", function() {
      // If we are already disconnected ignore the event
      if(_self._poolState !== 'disconnected' && _self.listeners("parseError").length > 0) {
        _self.emit("parseError", new Error("parseError occured"));
      }

      _self.stop();
    });

    connection.on("message", function(message) {
      _self.emit("message", message);
    });

    // Start connection in the next tick
    connection.start();
  }();
}


// Start method, will throw error if no listeners are available
// Pass in an instance of the listener that contains the api for
// finding callbacks for a given message etc.
ConnectionPool.prototype.start = function() {
  var markerDate = new Date().getTime();
  var self = this;

  if(this.listeners("poolReady").length == 0) {
    throw "pool must have at least one listener ready that responds to the [poolReady] event";
  }

  // Set pool state to connecting
  this._poolState = 'connecting';
  this._timeout = false;

  _connect(self);
}

// Restart a connection pool (on a close the pool might be in a wrong state)
ConnectionPool.prototype.restart = function() {
  // Close all connections
  this.stop(false);
  // Now restart the pool
  this.start();
}

// Stop the connections in the pool
ConnectionPool.prototype.stop = function(removeListeners) {
  removeListeners = removeListeners == null ? true : removeListeners;
  // Set disconnected
  this._poolState = 'disconnected';

  // Clear all listeners if specified
  if(removeListeners) {
    this.removeAllEventListeners();
  }

  // Close all connections
  for(var i = 0; i < this.openConnections.length; i++) {
    this.openConnections[i].close();
  }

  // Clean up
  this.openConnections = [];
}

// Check the status of the connection
ConnectionPool.prototype.isConnected = function() {
  return this._poolState === 'connected';
}

// Checkout a connection from the pool for usage, or grab a specific pool instance
ConnectionPool.prototype.checkoutConnection = function(id) {
  var index = (this.currentConnectionIndex++ % (this.openConnections.length));
  var connection = this.openConnections[index];
  return connection;
}

ConnectionPool.prototype.getAllConnections = function() {
  return this.openConnections;
}

// Remove all non-needed event listeners
ConnectionPool.prototype.removeAllEventListeners = function() {
  this.removeAllListeners("close");
  this.removeAllListeners("error");
  this.removeAllListeners("timeout");
  this.removeAllListeners("connect");
  this.removeAllListeners("end");
  this.removeAllListeners("parseError");
  this.removeAllListeners("message");
  this.removeAllListeners("poolReady");
}






















By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions