Click here to Skip to main content
14,304,886 members

In Search of Streaming... Part 1 of 2

Rate this:
5.00 (4 votes)
Please Sign up or sign in to vote.
5.00 (4 votes)
2 Mar 2019CPOL
Discussing about streams... about streaming... about functional streaming... and nothing else.

Before we begin talking about streaming or any associated details, we would like to make a commentary on the the vocabulary used in this article, in order to avoid the state of confusion.

  • We will use the word "Stream" or "Streams" to talk about the underlying code-implementation (interfaces, associated classes etc) capable of:
  1. forming a pipeline by repetitive use in tandem.
  2. mutating and forwarding byte-sequences to next such stream in chain.

As a matter of fact, as we talk about those Stream implementation, we remain indifferent to the implementation complexity and source of such implementation (i.e. part of framework, an open source library or home-made recipe) as long as the desired results can be obtained. In effect, this assumption is so important, because, otherwise it would be impossible for us to obtain a truly context agnostic API that is capable to support virtually any operation on the underlying byte-streams (as we will see below).

  • We will use the word "Streaming", to associate the flow of byte-sequences as they goes through such a stream based pipeline.

In fact, as a special case of such declaration, when one of such streams in the pipeline is actually a network (HTTP) stream and the flowing byte contents are multimedia contents; we obtain the implementation of mutlimedia-streaming (or just streaming as it is widely known). Thus, it is important to remember that the we do NOT restrict the scope of the word "streaming" to mutimedia-streaming, during our discussion.

Relevant Trivia

Streams are, undoubtably, one of its kind and special-purpose breeded beasts. Though a stream can be thought as generator of byte sequences; it unfolds its true power through its dynamics. Let us explain. In fact, at runtime, any other object be that string, array, data in a custom class/struct objects, even the source code itself; are all sequence of bytes in memory residing somewhere. Even, a simple interger value can be thought as a sequence of 4 bytes (for int32 in .Net). In addition, it is possible to extract a sub-sequence from these byte representation of these objects to perform some delicate operations; yet such byte sequences lack dynamics which streamimg exhibit out of the box. And, for our discussion, the notion of such flow, associated to bytes and runtime processing, comes handy as we start talking about our work on data-streaming. Going forward we will NOT ONLY make effort to explain this phenomenon in details, BUT ALSO, will propose a completely novel APIs to deal with data streaming requirements.

As soon as we hear the word "Streaming", many pictures comes to the mind, like watching a videos online, watching live telecast of an event, listening to a favorite song online etc. More or less, we almost immediately associate Multimedia contents like Audio & Video with this word. Thus, it is important for us to switch the gear and set a platform. In order to do so , first, we would like to personalize the definition of Streaming, by expanding the Streaming universe in our definition, that can be written as plainly as:

"Sending/transfering data1, potentially as varying sized chunks of binary data, continuously; at the same time, permitting the receiving-end2 to continuously process those chunks, whenever possible, in independent fashion (i.e. without buffering data1)."

1The term "Data" is contextual here. For us, it is whatever defines as whole dataset, i.e. whole video or just a 1 second clip of that video or simply a "Hello World!" string or a never ending data series.
2The term "receiving-end" is used to identify the next Stream in tandem.

With such a definition at hand:

  • We are more interested in BYTE format of data instead of mediatype. Hence, we want to deal with any kind of data that is either promptly available as bytes or convertible (irrespective of complexity of conversion) to bytes.
  • We want to transfer data continuously, i.e. as it becomes available, and, potentially as chunks. Thus, we strive to not to buffer whole dataset, in memory, at any point during the streaming.
  • We are agnostic to underlying protocols/APIs, as long as we are able to send those data-chunks continuously.
  • We want to design a scheme/framework/mechanism that can support any such arbitrary data processing end-to-end.
  • And, we are receiver agnostic as long as it is able to accept such data-chunks (i.e. irrespective of its data-processing capabilities).

Implementation Notes

  • From a theoritical point of view, the article is generic in nature and may remain valid for several languages/frameworks; however, we have implemented our thoughts in C# .Net and we would be pitching some .Net code snippets throughout the discussion.
  • Readers who wants to compile the attached source code, as it is, in Visual studio should make sure that they have .Net Framework 4.7.2 SDKs installed and have C# language version 7.1 or above installed (as mentioned in MSDN blog)

NOTE: Statistics, presented in this article, are obtained with following system configuration:

Hardware_Config

Software_Config

VS_CONFIG

Reasons First

Before we try to understand why we thought of such an implementation, we should first understand the existing tools we have. Considering we have two Stream instances _readableStream and _writableStream; as the name suggests we can read from _readableStream and write to _writableStream. Further assume, we have a trivial task at hand which warrants us to copy data from _readableStream to _writableStream. Most of the languages/framework provides following implementation (more or less) to achieve it:

/////////////////////
//// PSUEDO CODE ////
/////////////////////

//define some temporary byte array as buffer
byte[] buffer = new byte[buffer_size];

//continuously read from readable stream 
while ((readLength = _readableStream.read(buffer)) > 0) 
{
   //write on writable stream as long as we read at least 1 byte
   _writableStream.write(buffer, 0, readLength);
}

From the above code snippet, we notice that by using a fixed size buffer (normally of a few KB in size), we achieve such stream-to-stream copy. Complexity is linear to the stream length and we dont consume much space; fair enough.

But wait a moment, we made an assumption here that streams are associated to I/O devices (especially _writableStream) like file, network etc. But, what would happens when our _writableStream turns out to be an in-memory stream (MemoryStream in C# .Net), then we immediately increases the space complexity. And what if both (_readableStream and _writableStream) are in-memory streams. Then space requirement is doubled.

But why we care so much about it? Simplistically speaking, It's sufficient to say that Memory is Cheap BUT not FREE and neither LIMITLess, nonetheless, the reason is NOT that simple. Thus, without adding any further verbosity; author invites readers to read an excellent article, titled "To Heap or not to Heap; That’s the Large Object Question?", written by , to understand those details related to increasing space complexity associated with large objects (such as strings, list or arrays in general). 

In general, reducing runtime memory is our first reason. On the same lines, our next reason is latency which can be reduced by re-using the same buffer (allocated once) during copy operations, without the need to spend precious CPU time in re-sizing/copying byte arrays (in memory) to buffer entire data.

Though, normally less talked, our next reason is code organization (e.g. readability,testability, separation of concerns etc.); our goal is to prepare an API to perform streaming operations which is intuitive and expressive. Furthermore, we want to embed some sort of artificial intelligence in our APIs to allow us to bring runtime malleability (i.e. conditional plumbing of pipeline) in our chain of streams. In the end, we want to have a liberty to build pipelines to perform arbitrary operations (WILDCARDs) on the running chunks of byte without losing the associated benefits. As a matter of fact, we will build some specific streaming operations to demostrate such wildcard capabilities.

Being Pragmatic

If you have followed us until here, you might argue that streaming is NOT that significantly used in a regular application and even most of the applications do NOT go beyond file reading/writing. We cannot argue about that as it is experience based argument. However, following non-exhaustive list does provide usage of streaming:

  • WebAPIs
  • Base64 conversion
  • Object Seriailization
  • Data Encryption
  • Data Compression
  • Hash computing
  • File handling... so on and so forth...

Measuring performance of a trivial task

Before going into details, lets start with a simple example. Assume we have a following task at hand:

Definition:

Give a path of a binary file, read all its bytes. First, decompress it using GZip compression algorithm, then deserialize data as a well-defined Object array (i.e. List<T> where T is known) using JSON serializer.

From above statement, we can identify three (3) distinct operations, namely:

  1. Read all bytes from the given file
  2. Use GZip algorithm to decompress those bytes
  3. With Json serializer create List<T> (T is known or it is a generic place holder it hardly matters) from decompressed bytes

To maintain code readability and by neglecting any performance/code optimization (just for the moment), we consider implementation of following three (3) functions:

public byte[] PullAllBytesFrom(FileInfo file)
{
     return File.ReadAllBytes(file.FullName);
}

public byte[] DecompressUsingGzip(byte[] compressedBytes)
{
     var unzippedData = new MemoryStream();
     using (var unzipper = new GZipStream(new MemoryStream(compressedBytes), CompressionMode.Decompress, false))
     {
          unzipper.CopyTo(unzippedData);
     }
     return unzippedData.ToArray();
}

public List<T> DeserializeAs<T>(byte[] data)
{
     // ===> Using Newtonsoft.Json (we will call it with T = List<T>)
     return JsonConvert.DeserializeObject<T>(new UTF8Encoding().GetString(data));
}

We could have written the code in other way, however, the reason that we created all the three (3) operations separately is a subject for later discussion and we will speak of those in details there. For the moment, we just want to focus on the performance of following code:

////////////////
//// CODE ID 1
//// We will use this ID as reference below during our discussion
/////////////// 

public List<T> DeserializeListFrom<T>(FileInfo compressedJsonFile)
{
     var fileBytes = PullAllBytesFrom(compressedJsonFile);
     var uncompressedBytes = DecompressUsingGzip(fileBytes);
     return DeserializeAs<List<T>>(uncompressedBytes);
}

If you run similarly written code of "DeserializeListFrom" (if you have downloaded the attached source code from this article, you can run PerfCompareNonStreamingWithStreamingAsync method), you will see following similar performance graphs from the Visual Studio Diagnostic Tools (NOTE: API Method is our implementation and subject of this discussion and DeserializeListFrom is similarly written method as shown in above snippet):

Image1_Perf_Visual

Looking at this image, we see there is a costly memory consuming operation going on during code execution and perhaps the byte array was re-allocated several time (hence, recopied). Overall, it's evident that we have an opportunity here to win big on memory and significantly on CPU time too. Thus, knowing the issue we can drill down further.

Defining Goals

Based on our discussion so far, we want to:

  • Avoid the usage of in-Memory buffers to improve on runtime memory
  • Work only with necessary fixed size buffers
  • be able to create efficient pipeline (chain of operations) end-to-end (source to target)
  • Create an API that offers:
    • Composability: composition of operations
    • Readability: composition are declarative
    • Maintainability: promotes single responcibility principle for each underlying composed operation
    • Elasticity: open to any exotic and/or regular data processing requirement
    • Reusability: permits run-time mutation in a composed chain in a deterministic manner

Rest of the article is going to present the work we have done to achieve above listed goals.

Streams In General

On surface all streams looks alike and it's hard to put those in different bins. However, to exploit streaming capabilities we do need to understand different characteristics of those stream implementations.

Unidirectional Vs Bidirectional

Fortunately, in .Net there exists a well-defined interface for Streams (inside System.IO namespace, Stream is defined as Abstract class) and all stream implementations are inherited from it. We take a closer look at some of it's capabilities as shown below:

// from https://referencesource.microsoft.com/#mscorlib/system/io/stream.cs

public abstract class Stream : MarshalByRefObject, IDisposable
{
        public abstract bool CanRead { get; }
        public abstract bool CanWrite { get; }

        public abstract int Read(byte[] buffer, int offset, int count);
        public abstract void Write(byte[] buffer, int offset, int count);

        /* ...
         * Other methods and properties
         * ...
         */
}

Thus, apart from Read and Write methods, Stream exposes CanRead and CanWrite truth values; thus, if a stream supports Read operation it shall return true for CanRead and similarly if it supports Write operation it should be truthy for CanWrite. In fact, NOT all stream implementations return True for both of these properties. Thus, we can say when stream is either Readable or Writable (not both) it's unidirectional (e.g. FileStream with read access); similarly, when its both Readable & writable at the same time it is bidirectional (e.g. MemoryStream with writable=true).

Open-Ended Vs Closed-Ended

Some stream implementations are in fact closed in the sense that they are bound to target device; for e.g. FileStream is bound to physical location on a disk. On the other hand, some stream implementation are open (agnostic) to the target involved in reading or writing operations, i.e., they operates on abstraction (e.g. abstract Stream class in .Net). Such streams, often, requires an instance of Stream at construction time (i.e. constrctor call); for e.g. GZipStream constructor accepts another Stream's instance for reading/writing operation during decompression/compression respectively, yet, agnostic to whether the given Stream is MemoryStream or FileStream. Though, given explanation (and stream classification) looks trivial in nature, it enable us to make a chain (pipeline) during streaming.

In fact, as we will see later, based on this distinct characterstics of Streams, our proposed API is able to create a chain of streaming operations in tandem without relying on intermediate full data-buffering between two independent streaming operations.

Specifics of MemoryStream

NOTE: Below listed concerns equally, more or less, apply to Byte[] (byte arrays) and List<Byte> (list of bytes)

MemoryStream is unique in its own way. Under the hood, it is a simple Byte array whose capacity is adjusted during write operations (in a similar way as if it is a List<Byte>, i.e. allocating bigger array and recopying bytes from existing array) and the array is traversed during read operations. Though, current implementation works just fine, nonetheless, those array (buffer) resizing operations do adds some pressure on CPU (memory allocation/data copying). Such operations can affect performance significantly if involved data (total bytes) size is large. Though, it would be hard to point out the data size limit as a single number; however, once the array reached 85000 bytes in size we would be touching Large Object Heap (LOH) and any new call (during write operation) to resize this array to a bigger capacity will only end up dealing with LOH. In short, once MemoryStream is involved in any streaming related operation one should be careful.

We have already seen above (in pseudo code under the title "Reasons First"), stream to stream copy employs a fixed size buffer and reuses buffer during its iterative copying operations (byte chunks limited to buffer capacity); instead of using MemoryStream (read everything in memory from source stream and then write everything to target stream). Furthermore, we know the MemoryStream is NOT thread-safe and it is not possible to write and read from it at the same time. Though, it is all the way possible to create a new thread-safe version of in-memory stream, yet such a painstaking effort might not necessarily bring fruits; especially in the case when writer that is writing on such memory stream is way too fast than the associated reader on the same stream (the internal array will eventually grow and may create a performance hit). Thus, we identify that any in-memory buffering of data (beyond fixed size buffers required for regular streaming operations) is not helpful for a streaming oriented API. And, down the lane we'll discuss, our approach avoids such usage of in-memory byte arrays.

Flow of Data

Roughly speaking, streaming data flow, based on the nature of source/target of the data, can be listed as:

  • One kind of byte representation to another kind of byte representation (e.g. text data in a file to a compressed file)
  • Memory data-structure to a byte representation, i.e. serialization + some additional stream processing (e.g. json serialization of a .Net class instance to an encrypted file on hard-disk)
  • From a byte-representation to a memory data-structure, i.e. deserialization + some additional stream processing

Based on these flows, we identify data-structure which are most commonly encountered during streaming, and, potentially are responcible of those unwanted performance hits:

  • string: Normally obtained during serializations, file reading, string concatenations, Base64 operations etc
  • byte[]: Obtained normally from string encoding, use of in-memory stream, File reading etc
  • MemoryStream: Normally appears due to misaligned stream pipeline
  • T[] or List<T> or any similar collection of object, where T is a known serializable object: Normally targets of serialization/Deserialization operations.

Furthermore, we identify most common streaming operations (also available as a part of the framework):

  • File handling
  • Byte encoding
  • Compression
  • Hash Computing
  • Base64 Conversion
  • Encryption/Decryption

Finally, we also recognize a few following frequent requirements:

  • Stream Fan-out: When a given stream needs to be inputted to multiple targets, for e.g., in order to maintain data availability by mean of redundency, same data is copied to several files streams and/or send to remote services etc.
  • Stream Length: When interest is to obtain the data byte count based on choosen encoding and treatment.

Overall, if we accumulate all these thoughts to prepare a mind-map, we come up with following naive illustration:

flow_of_data

Academic interests aside, the presence of streaming makes sense when:

  • Data is persisted (e.g. as a file on hard disk) and when persisted data is consumed
  • Data is transferred to another entity/process (e.g. server to client)

Irrespective of the use-case, the data-flow can be modeled as if the data publisher side (i.e. data producer) pushes the data at one end and the consumer side of the data pulls the data at another end. Depending upon the nature of data exchange, consumer either can run concurrently or sequentially. For e.g., in case of http communication, while sender is writing data on network during chunked transfer-encoding, receiver may recover payload data simultenously; whereas when sender writes the data to a file, receiver can consume (in absence of synchronization) the file only after file is persisted. Secondly, during these data push(es), whenever sender applies any data transformation, consumer, normally, requires to apply inverse transformations in reverse order to obtain the original data. These are the common streaming scenarios where performance can be optimized. Following diagram illustrates same idea.

Sd_Rx_Flow_Of_Data

Thus, we see above, that all data transformation operations (shown by OP-) at sender side have corresponding inverse transformation (shown by INV-OP-) in reverse order (i.e. if sender applies OP-1 before OP-2 then receiver applies INV-OP-2 before INV-OP-1). Thus, lets say if sender first serialized data as json and then applied GZip-compression, then, in order to get back the equivalent original data in-memory representation, receiver first applies GZip decompression and then deserializes the JSON data.

NOTE:

Some data transformations are inherently non-reversable, i.e., once the data is transformed it is theoritically not possible to obtain the original data; for e.g., Cryptographic HASH compting. But of course, if the intend is to just send the HASH of the data to the receiver then it is already assumed that original data is NOT required at receiver's end. Thus, for these similar cases, above shown reverse chain won't be present at receiver side, nonetheless, streaming can still be used with all its benefits; for e.g. to obtain the hash of the data in its target byte representation.

Towards Implementation

We noticed above, in order to implement our trivial task (under the title "Measuring performance of a trivial task"), we wrote three (3) distinct functions. The idea behind having those dedicated function's was to achieve composibility, for e.g., if we have a new feature requirement that demands to read from json file and obtain a known object (not necessarily list), the resultant code may look like:

public T DeserializeObjectFrom<T>(FileInfo uncompressedJsonFile)
{
   var uncompressedBytes = PullAllBytesFrom(uncompressedJsonFile);
   return DeserializeAs<T>(uncompressedBytes);
}

So without rewriting/modifying/breaking existing code, we immediately (almost) delivered a feature. Such innocent implementation honestly speaking fits well to SOLID (Single responcibility). However, if we notice, at surface, these implementations look benign, however, as we start increasing the file byte size, we discover associate issues. We realize that the "File.ReadAllBytes" allocates byte array proportional to the size of the file, but what is less remarked is that, internally, "File.ReadAllBytes" still using a similar buffer copy loop, as shown in Psuedo Code in the very beginning. Once we realize that, using those "fixed size buffer copying loop" is one of the strengths (and perhaps the least understood/appreciated) of streaming, we can appreciate all that comes here onwards.

Aware of this issue, we might be tempted to change the code in following way (in order to improve on the performance):

public T DeserializeObjectFrom<T>(FileInfo uncompressedJsonFile)
{
   using(var fileStream = new FileStream(uncompressedJsonFile, ...))
   {
       using(var textReader = new TextReader(fileStream, ...))
       {
            //JsonReader of Newtonsoft.Json
            using(var jsonReader = new JsonReader(textReader, ...))
            {
                 //... your serialization code ...//
            }
       }
   }
}

But, with this code we again realize we complicate the code. Not only we reduced readability and punctured maintainability (in a way), but also, added unwanted code redundency; i.e. with such nested calls (of "using") we need to re-write some (major) part of the code twice (one as listed above and another with GZip Compression). In fact, in long run, we will notice that we create redundancy everytime we write any stream related code (inside same project and across multiple projects). Aboveall, such implementations are neither composable nor elastic nor reusable (Note: reusable in broad sense).

Visualizing Implementation

Reading/interpreting such a verbose text is by no mean easy, thus, here we make an effort to provide some illustrations to have understanding of above written literature. Lets first try to understand what happens, at different moment in time, while our "Code Id 1" (from above "Measuring performance of a trivial task" title) runs.

For simplicity, let's assume following time-scale:

  • At T = 0, we call it Initiation point where we assume memory usage is Nil (zero) and code execution just waiting to execute "PullAllBytesFrom" line.
  • At T = t1, code "var fileBytes = PullAllBytesFrom(compressedJsonFile);" has been successfully executed and our fileBytes variable holds the reference to Byte array.
  • At T = t2, code "var uncompressedBytes = DecompressUsingGzip(fileBytes);" has been successfully executed and our uncompressedBytes variable holds the reference to uncompressed Byte array.  
  • At T = t3, call to "DeserializeAs<List<T>>(uncompressedBytes);" is finished

We make following assumptions to avoid complexity in visualization:

  • GC (Garbage collector) is NOT running until t3
  • Buffers used internally, by framework, are compatetively insignificant in size, thus, can be dropped from the visualization
  • We can use linear approximation for memory allocation/re-allocation
  • File size is 1 MBytes while decompressed data and DeserializedList both individually requires 2 MBytes each, hypothetically

NOTE: At T=t3, we assume final return statement will execute, GC will occur and Memory reduced to 2 Mbytes (hold by list)

Based on above listed assumption, following is an approximate visualization:

runtime-mem-viz-no-optim

Even in this simple image, which ignores memory wastage (due to reallocation/copy), we clearly observe that we have unnecessarily consumed memory upto 5 Mbytes (peak at t3); as target state consumes only 2 MBytes of memory. Having realized this fact, it makes it easy to envision ideal target state which is consistent with following visualization:

target_mem_viz_optim

Comparing Image 4 and Image 5, immediately gives us insights on the gains we would like to make. Now, we are in position to actually discuss implementation details that helps us achieve our goals.

So Far, Not Far...

Until here, we have discussed associated issues related to stream operations and code implementations, and have identified the goals. In general, we have gathered material based on which we would like to assert the choices we have made during our implementation.

We decided to split the whole article into 2 parts we did not want to just show the implementation but wanted to elaborate the "why" behind it. We are still working on the Part 2 of the article to complete our discussion and we would love to hear comments from our readers in order to improve on the quality of the material.

We also invite our readers to check attached example code source and the implementation ahead in time:

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

D Sarthi Maheshwari
Architect
France France
An open-minded, passionate, adaptive and resourceful software solution developer. He daydreams of code and spend nights coding his thoughts.

Comments and Discussions

 
-- There are no messages in this forum --
Article
Posted 24 Feb 2019

Stats

5.2K views
75 downloads
9 bookmarked