Click here to Skip to main content
16,020,424 members
Articles / Programming Languages / C#

Thread Safe Quantized Temporal Frame Ring Buffer

Rate me:
Please Sign up or sign in to vote.
5.00/5 (8 votes)
6 Sep 2020CPOL3 min read 7.9K   131   2   6
Ring buffer to track count of events within a temporal frame
In this post, you will learn about a ring buffer that tracks the count of events within a temporal frame. It is a simple but useful way of quantizing events to reduce memory usage and to create a sample frame size of a desired duration.

Introduction

I needed a ring buffer that tracks the count of events within a temporal frame, say one minute. Because there can be tens of thousands of events within the frame, it is ideal to quantize the counts to some sample size of the frame. For example, if the frame is one minute, quantizing the counts per second means that the ring buffer only needs to manage 60 samples. Being a ring buffer, any samples outside of the frame are dropped off and new ones are added - meaning if the frame is 1 minute, any samples recorded past one 1 minute are zeroed. For our purposes, the total sample count is maintained, being the sum of the quantized counts within the frame at any given point in time.

The screenshot above shows a bar chart of a snapshot of events counts, in this case, web API requests from different clients - the horizontal bar is the number of requests per minute and the vertical axis (with names redacted) are the names of clients making the requests.

The Ring Buffer Code

In its entirety:

C#
using System;
using System.Collections.Generic;
using System.Linq;

namespace QuantizedTemporalFrameRingBuffer
{
  public class QBuffer
  {
    protected int runningCount = 0;
    protected int[] qFrameCounts;
    protected int sampleSize;
    protected Func<DateTime, int> calcQ;
    protected Func<DateTime, DateTime, int> resetPastSamples;
    protected DateTime lastEvent;

    /// <summary>
    /// Get the frame as copy of the internal frame, mainly for unit tests.
    /// </summary>
    public int[] Frame => qFrameCounts.ToArray();

    public int GetRunningCount()
    {
      lock (locker)
      {
        return runningCount;
      }
    }

    // Used to allow Capture to be run on a separate thread from CountEvent.
    // Particularly useful when the counting HTTP events and 
    // separately monitoring the sample set.
    protected object locker = new object();
    
    public QBuffer(int frameSize, int quantization, 
           Func<DateTime, int> calcQ, Func<DateTime, DateTime, int> resetPastSamples)
    {
      Assertion.That(frameSize > 0, "frameSize cannot be negative or 0.");
      Assertion.That(quantization > 0, "quantization cannot be negative or 0.");
      Assertion.That(frameSize % quantization == 0, 
                "frameSize must be divisible by quantization without a remainder.");
      Assertion.NotNull(calcQ, "calculation of Q cannot be null.");
      Assertion.NotNull(resetPastSamples, "reset of past samples cannot be null.");

      lastEvent = DateTime.Now;
      sampleSize = frameSize / quantization;
      qFrameCounts = new int[sampleSize];
      this.calcQ = calcQ;
      this.resetPastSamples = resetPastSamples;
    }

    public void Reset(DateTime dt)
    {
      int q = calcQ(dt);
      int resetCount = Math.Min(sampleSize, resetPastSamples(lastEvent, dt));

      // We only reset up to the sample size.
      // This handles situations where the time elapsed between events 
      // is greater than the frame size for the specified quantization.
      // We effectively drop off the last n quantized samples, 
      // where n is the quantized frame size.
      // We limit this to the sample size, in case the current event 
      // occurs at some point > frame size.
      // At all times, the "past samples" are the very next samples.
      while (resetCount > 0)
      {
        int pastQ = (q + resetCount) % sampleSize;
        runningCount -= qFrameCounts[pastQ];
        qFrameCounts[pastQ] = 0;
        --resetCount;
      }
    }

    public void CountEvent(DateTime dt)
    {
      lock (locker)
      {
        int q = calcQ(dt);
        ++runningCount;
        ++qFrameCounts[q];

        lastEvent = dt;
      }
    }

    public (int total, List<int> samples) Capture()
    {
      lock (locker)
      {
        var ret = (runningCount, qFrameCounts.ToList());

        return ret;
      }
    }
  }
}

Initialization

The usage requires defining a callback for determining the quantized index (Q) and determining how many past events are outside of the ring buffer quantized sample size. A simple example is a 60 second ring buffer with 1 second quantization:

C#
using System;

namespace QuantizedTemporalFrameRingBuffer
{
  class Program
  {
    static void Main(string[] args)
    {
      // Create a 1 minute ring buffer quantized at one second.
      Buffer buffer = new Buffer(60, 1, CalcQ, ResetQ);
    }

    static int CalcQ(DateTime dt)
    {
      // This is easy, because we're sampling one for 1 minute and quantized at one second, 
      // so Q is by its very nature simply the Second within the time component.
      return dt.Second;
    }

    static int ResetQ(DateTime lastEvent, DateTime now)
    {
      // Again, straightforward because we're quantizing per second.
      return (int)(now - lastEvent).TotalSeconds;
    }
  }
}

The reason for the callbacks is that the ring buffer doesn't need to know how you're calculating the quantization index and the number of "slots" that need to be reset. For example, you might want a ring buffer that tracks events over a 24 hour period with quantization at one hour. The first two parameters to the constructor:

C#
public Buffer(int frameSize, int quantization, 

are not associated with a temporal "unit" but rather the frame size and the desired quantization. The callbacks for calculating Q and the number of slots that need to be reset determine the desired usage.

A Real Life Example

In the WCF application I've wired this into, the client's web requests per minute are tracked like this:

C#
public static Dictionary<string, QBuffer> buffers = new Dictionary<string, QBuffer>();
...
QBuffer qbuffer;

if (!buffers.TryGetValue(client, out qbuffer))
{
  qbuffer = new QBuffer(60, 1, dt => dt.Second, 
            (lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds);
  buffers[tenant] = qbuffer;
}

DateTime now = DateTime.Now;
qbuffer.Reset(now);
qbuffer.CountEvent(now);

The Reset call might look odd at first. What this is doing is zeroing past event slots outside of the frame, being one minute in this case. It executes the callback:

C#
(lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds

Thus:

  1. If we are still in the current slot (the last event occurred within the same second as the current event), nothing is touched.
  2. If the elapsed seconds is greater than 0, those slots are zeroed:
C#
while (resetCount > 0)
{
  int pastQ = (q + resetCount) % sampleSize;
  runningCount -= qFrameCounts[pastQ];
  qFrameCounts[pastQ] = 0;
  --resetCount;
}

and the running count is reduced by whatever the event counts were in each slot.

Similarly, when retrieving the current frame of events, "current" means, at the time the request being made to get the events. Therefore, in the API endpoint that retrieves the events -- keep in mind that this is a WCF application I was using this for.

C#
public class ClientHitCount
{
  public string Client { get; set; }
  public int Count { get; set; }
}

public Stream GetQBufferRunningCounts()
{
  var clients = GetClientList();

  var samples = clients.Select(c =>
  {
    var buffer = buffers[c];
    buffer.Reset(DateTime.Now);

    return new ClientHitCount() { Client = c, Count = buffers[c].GetRunningCount() };
  });

  return Response.AsJson(samples);
}

Again, note that Reset is called first. Let's say the last event occurred two minutes ago -- when we make the call to get the number of events that occurred in the frame, we are two minutes "too late", so the purpose of the Reset call is to synchronize the ring buffer with the current time frame.

A Simple Page

A very simple rendering on a web page can be accomplished with AnyChart:

HTML
<!DOCTYPE html>
<html height="100%">
  <head>
    <title>Events</title>
    <script src="https://cdn.anychart.com/releases/8.0.0/js/anychart-base.min.js"></script>
  </head>
  <body height="100%">
    <div id="container" style="width: 100%; height: 90vh"></div>
    <script>
      function getData(callback) {
        var xhttp = new XMLHttpRequest();
        xhttp.onreadystatechange = function() {
          if (this.readyState == 4 && this.status == 200) {
            callback(this.responseText);
          }
        };

        xhttp.open("GET", "[your endpoint]", true);
        xhttp.send();
      } 

      function updateChart(chart, strJson) {
        console.log();
        let json = JSON.parse(strJson);
        let fjson = json.filter(r => r.Count > 0);
        // let fjson = json;       // use this line to include 0 counts
        let data = {rows: fjson.map(r => [r.Client, r.Count])};
        chart.data(data);
        chart.draw();
      }

      anychart.onDocumentReady(function() {
        var chart = anychart.bar();
        chart.container('container');
        chart.barsPadding(10);

        setInterval(() => {
          let data = getData(json => updateChart(chart, json));
        }, 1000);
      });
    </script>
  </body>
</html>

Unit Tests

A few useful unit tests.

Events Within a Q Test

C#
[TestMethod]
public void EventsInOneQIndexTest()
{
  QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, 
          (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);

  DateTime now = DateTime.Now;
  buffer.Reset(now);
  buffer.CountEvent(now);

  buffer.Reset(now);
  buffer.CountEvent(now);

  buffer.Reset(now);
  buffer.CountEvent(now);

  // Note that the count is in the current "second" slot.
  int index = now.Second; 

  Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
  Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}

Events Within Two Consecutive Q Test

C#
[TestMethod]
public void EventsInTwoQIndicesTest()
{
  QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, 
          (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);

  DateTime now = DateTime.Now;
  DateTime next = now.AddSeconds(1);

  buffer.Reset(now);
  buffer.CountEvent(now);

  buffer.Reset(next);
  buffer.CountEvent(next);

  buffer.Reset(next);
  buffer.CountEvent(next);

  int index = now.Second;

  Assert.IsTrue(buffer.Frame[index] == 1, "Expected a count of 1.");
  Assert.IsTrue(buffer.Frame[index + 1] == 2, "Expected a count of 2.");
  Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}

Frame Cleared Test

C#
[TestMethod]
public void FrameClearedTest()
{
  QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, 
          (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);

  DateTime now = DateTime.Now;
  DateTime next = now.AddSeconds(1);
  DateTime frameNext = next.AddSeconds(60);
  buffer.Reset(now);
  buffer.CountEvent(now);       // 1 at now

  buffer.Reset(next);
  buffer.CountEvent(next);      // 2 at now + 1
  
  buffer.Reset(next);
  buffer.CountEvent(next);

  buffer.Reset(frameNext);
  buffer.CountEvent(frameNext); // 3 at now + 61

  buffer.Reset(frameNext);
  buffer.CountEvent(frameNext);

  buffer.Reset(frameNext);
  buffer.CountEvent(frameNext);

  int index = frameNext.Second;

  Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
  Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}

Conclusion

Not much to say here - it's a simple but useful way of quantizing events to reduce memory usage and to create a sample frame size of a desired duration. And the title "Thread Safe Quantized Temporal Frame Ring Buffer" is cool.

History

  • 6th September, 2020: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions

 
QuestionIs the download ok? Pin
George Swan7-Sep-20 3:46
mveGeorge Swan7-Sep-20 3:46 
AnswerRe: Is the download ok? Pin
Marc Clifton9-Sep-20 10:42
mvaMarc Clifton9-Sep-20 10:42 
BugImage issue Pin
Garth J Lancaster7-Sep-20 0:34
professionalGarth J Lancaster7-Sep-20 0:34 
GeneralRe: Image issue Pin
Marc Clifton7-Sep-20 0:44
mvaMarc Clifton7-Sep-20 0:44 
GeneralRe: Image issue Pin
Marc Clifton7-Sep-20 0:45
mvaMarc Clifton7-Sep-20 0:45 
GeneralRe: Image issue Pin
Garth J Lancaster7-Sep-20 0:48
professionalGarth J Lancaster7-Sep-20 0:48 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.