Click here to Skip to main content
15,895,084 members
Articles / Web Development / Node.js

Node.Js And Stuff

Rate me:
Please Sign up or sign in to vote.
4.97/5 (55 votes)
11 Feb 2013CPOL23 min read 361.2K   2.3K   172  
Small demo app using Node.Js/Socket.IO/MongoDB/D3.Js and jQuery.
var Stream = require('stream').Stream,
  util = require('util');

/**
 * ReadStream
 *
 * Returns a stream interface for the **file**.
 *
 * Events
 *  - **data** {function(item) {}} the data event triggers when a document is ready.
 *  - **end** {function() {}} the end event triggers when there is no more documents available.
 *  - **close** {function() {}} the close event triggers when the stream is closed.
 *  - **error** {function(err) {}} the error event triggers if an error happens.
 *
 * @class Represents a GridFS File Stream.
 * @param {Boolean} autoclose automatically close file when the stream reaches the end.
 * @param {GridStore} cursor a cursor object that the stream wraps.
 * @return {ReadStream}
 */
function ReadStream(autoclose, gstore) {
  if (!(this instanceof ReadStream)) return new ReadStream(autoclose, gstore);
  Stream.call(this);

  this.autoclose = !!autoclose;
  this.gstore = gstore;

  this.finalLength = gstore.length - gstore.position;
  this.completedLength = 0;
  this.currentChunkNumber = 0;

  this.paused = false;
  this.readable = true;
  this.pendingChunk = null;
  this.executing = false;  
  
  // Calculate the number of chunks
  this.numberOfChunks = Math.ceil(gstore.length/gstore.chunkSize);
  
  var self = this;
  process.nextTick(function() {
    self._execute();
  });
};

/**
 * Inherit from Stream
 * @ignore
 * @api private
 */
ReadStream.prototype.__proto__ = Stream.prototype;

/**
 * Flag stating whether or not this stream is readable.
 */
ReadStream.prototype.readable;

/**
 * Flag stating whether or not this stream is paused.
 */
ReadStream.prototype.paused;

/**
 * @ignore
 * @api private
 */
ReadStream.prototype._execute = function() {
  if(this.paused === true || this.readable === false) {
    return;
  }

  var gstore = this.gstore;
  var self = this;
  // Set that we are executing
  this.executing = true;

  var last = false;
  var toRead = 0;

  if(gstore.currentChunk.chunkNumber >= (this.numberOfChunks - 1)) {
    self.executing = false;    
    last = true;    
  }

  var data = gstore.currentChunk.readSlice(gstore.currentChunk.length());
  if(data != null && gstore.currentChunk.chunkNumber == self.currentChunkNumber) {
    self.currentChunkNumber = self.currentChunkNumber + 1;
    self.completedLength += data.length;
    self.pendingChunk = null;
    self.emit("data", data);
  }

  if(last === true) {
    self.readable = false;
    self.emit("end");
    
    if(self.autoclose === true) {
      if(gstore.mode[0] == "w") {
        gstore.close(function(err, doc) {
          if (err) {
            self.emit("error", err);
            return;
          }
          self.readable = false;          
          self.emit("close", doc);
        });
      } else {
        self.readable = false;
        self.emit("close");
      }
    }
  } else {
    gstore._nthChunk(gstore.currentChunk.chunkNumber + 1, function(err, chunk) {
      if(err) {
        self.readable = false;
        self.emit("error", err);
        self.executing = false;
        return;
      }

      self.pendingChunk = chunk;
      if(self.paused === true) {
        self.executing = false;
        return;
      }

      gstore.currentChunk = self.pendingChunk;
      self._execute();        
    });
  }
};

/**
 * Pauses this stream, then no farther events will be fired.
 *
 * @ignore
 * @api public
 */
ReadStream.prototype.pause = function() {
  if(!this.executing) {
    this.paused = true;    
  }
};

/**
 * Destroys the stream, then no farther events will be fired.
 *
 * @ignore
 * @api public
 */
ReadStream.prototype.destroy = function() {
  this.readable = false;
  // Emit close event
  this.emit("close");
};

/**
 * Resumes this stream.
 *
 * @ignore
 * @api public
 */
ReadStream.prototype.resume = function() {
  if(this.paused === false || !this.readable) {
    return;
  }
    
  this.paused = false;
  var self = this;
  process.nextTick(function() {
    self._execute();
  });
};

exports.ReadStream = ReadStream;

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions