Click here to Skip to main content
15,886,724 members
Articles / Web Development / Node.js

Node.Js And Stuff

Rate me:
Please Sign up or sign in to vote.
4.97/5 (55 votes)
11 Feb 2013CPOL23 min read 357.5K   2.3K   172  
Small demo app using Node.Js/Socket.IO/MongoDB/D3.Js and jQuery.
var Server = require("../server").Server;

// The ping strategy uses pings each server and records the
// elapsed time for the server so it can pick a server based on lowest
// return time for the db command {ping:true}
var PingStrategy = exports.PingStrategy = function(replicaset, secondaryAcceptableLatencyMS) {
  this.replicaset = replicaset;
  this.secondaryAcceptableLatencyMS = secondaryAcceptableLatencyMS;
  this.state = 'disconnected';
  this.pingInterval = 5000;
  // Class instance
  this.Db = require("../../db").Db;
}

// Starts any needed code
PingStrategy.prototype.start = function(callback) {
  // already running?
  if ('connected' == this.state) return;

  this.state = 'connected';

  // Start ping server
  this._pingServer(callback);
}

// Stops and kills any processes running
PingStrategy.prototype.stop = function(callback) {
  // Stop the ping process
  this.state = 'disconnected';

  // optional callback
  callback && callback(null, null);
}

PingStrategy.prototype.checkoutSecondary = function(tags, secondaryCandidates) {
  // Servers are picked based on the lowest ping time and then servers that lower than that + secondaryAcceptableLatencyMS
  // Create a list of candidat servers, containing the primary if available
  var candidateServers = [];

  // If we have not provided a list of candidate servers use the default setup
  if(!Array.isArray(secondaryCandidates)) {
    candidateServers = this.replicaset._state.master != null ? [this.replicaset._state.master] : [];
    // Add all the secondaries
    var keys = Object.keys(this.replicaset._state.secondaries);
    for(var i = 0; i < keys.length; i++) {
      candidateServers.push(this.replicaset._state.secondaries[keys[i]])
    }
  } else {
    candidateServers = secondaryCandidates;
  }

  // Final list of eligable server
  var finalCandidates = [];

  // If we have tags filter by tags
  if(tags != null && typeof tags == 'object') {
    // If we have an array or single tag selection
    var tagObjects = Array.isArray(tags) ? tags : [tags];
    // Iterate over all tags until we find a candidate server
    for(var _i = 0; _i < tagObjects.length; _i++) {
      // Grab a tag object
      var tagObject = tagObjects[_i];
      // Matching keys
      var matchingKeys = Object.keys(tagObject);
      // Remove any that are not tagged correctly
      for(var i = 0; i < candidateServers.length; i++) {
        var server = candidateServers[i];
        // If we have tags match
        if(server.tags != null) {
          var matching = true;

          // Ensure we have all the values
          for(var j = 0; j < matchingKeys.length; j++) {
            if(server.tags[matchingKeys[j]] != tagObject[matchingKeys[j]]) {
              matching = false;
              break;
            }
          }

          // If we have a match add it to the list of matching servers
          if(matching) {
            finalCandidates.push(server);
          }
        }
      }
    }
  } else {
    // Final array candidates
    var finalCandidates = candidateServers;
  }

  // Sort by ping time
  finalCandidates.sort(function(a, b) {
    return a.runtimeStats['pingMs'] > b.runtimeStats['pingMs'];
  });

  if(0 === finalCandidates.length)
    return new Error("No replica set members available for query");

  // handle undefined pingMs
  var lowestPing = finalCandidates[0].runtimeStats['pingMs'] | 0;

  // determine acceptable latency
  var acceptable = lowestPing + this.secondaryAcceptableLatencyMS;

  // remove any server responding slower than acceptable
  var len = finalCandidates.length;
  while(len--) {
    if(finalCandidates[len].runtimeStats['pingMs'] > acceptable) {
      finalCandidates.splice(len, 1);
    }
  }

  // If no candidates available return an error
  if(finalCandidates.length == 0)
    return new Error("No replica set members available for query");

  // Pick a random acceptable server
  return finalCandidates[Math.round(Math.random(1000000) * (finalCandidates.length - 1))].checkoutReader();
}

PingStrategy.prototype._pingServer = function(callback) {
  var self = this;

  // Ping server function
  var pingFunction = function() {
    if(self.state == 'disconnected') return;
    var addresses = self.replicaset._state.addresses;

    // Grab all servers
    var serverKeys = Object.keys(addresses);

    // Number of server entries
    var numberOfEntries = serverKeys.length;

    // We got keys
    for(var i = 0; i < serverKeys.length; i++) {

      // We got a server instance
      var server = addresses[serverKeys[i]];

      // Create a new server object, avoid using internal connections as they might
      // be in an illegal state
      new function(serverInstance) {
        var options = { poolSize: 1, timeout: 500, auto_reconnect: false };
        var server = new Server(serverInstance.host, serverInstance.port, options);
        var db = new self.Db(self.replicaset.db.databaseName, server, { safe: true });

        db.on("error", done);

        // Open the db instance
        db.open(function(err, _db) {
          if(err) return done(err, _db);

          // Startup time of the command
          var startTime = Date.now();

          // Execute ping on this connection
          db.executeDbCommand({ping:1}, {failFast:true}, function() {
            if(null != serverInstance.runtimeStats && serverInstance.isConnected()) {
              serverInstance.runtimeStats['pingMs'] = Date.now() - startTime;
            }

            done(null, _db);
          })
        })

        function done (err, _db) {
          // Close connection
          if (_db) _db.close(true);

          // Adjust the number of checks
          numberOfEntries--;

          // If we are done with all results coming back trigger ping again
          if(0 === numberOfEntries && 'connected' == self.state) {
            setTimeout(pingFunction, self.pingInterval);
          }
        }
      }(server);
    }
  }

  // Start pingFunction
  setTimeout(pingFunction, 1000);

  callback && callback(null);
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions