In this post, you will learn about a ring buffer that tracks the count of events within a temporal frame. It is a simple but useful way of quantizing events to reduce memory usage and to create a sample frame size of a desired duration.
Introduction
I needed a ring buffer that tracks the count of events within a temporal frame, say one minute. Because there can be tens of thousands of events within the frame, it is ideal to quantize the counts to some sample size of the frame. For example, if the frame is one minute, quantizing the counts per second means that the ring buffer only needs to manage 60 samples. Being a ring buffer, any samples outside of the frame are dropped off and new ones are added - meaning if the frame is 1 minute, any samples recorded past one 1 minute are zeroed. For our purposes, the total sample count is maintained, being the sum of the quantized counts within the frame at any given point in time.
The screenshot above shows a bar chart of a snapshot of events counts, in this case, web API requests from different clients - the horizontal bar is the number of requests per minute and the vertical axis (with names redacted) are the names of clients making the requests.
The Ring Buffer Code
In its entirety:
using System;
using System.Collections.Generic;
using System.Linq;
namespace QuantizedTemporalFrameRingBuffer
{
public class QBuffer
{
protected int runningCount = 0;
protected int[] qFrameCounts;
protected int sampleSize;
protected Func<DateTime, int> calcQ;
protected Func<DateTime, DateTime, int> resetPastSamples;
protected DateTime lastEvent;
public int[] Frame => qFrameCounts.ToArray();
public int GetRunningCount()
{
lock (locker)
{
return runningCount;
}
}
protected object locker = new object();
public QBuffer(int frameSize, int quantization,
Func<DateTime, int> calcQ, Func<DateTime, DateTime, int> resetPastSamples)
{
Assertion.That(frameSize > 0, "frameSize cannot be negative or 0.");
Assertion.That(quantization > 0, "quantization cannot be negative or 0.");
Assertion.That(frameSize % quantization == 0,
"frameSize must be divisible by quantization without a remainder.");
Assertion.NotNull(calcQ, "calculation of Q cannot be null.");
Assertion.NotNull(resetPastSamples, "reset of past samples cannot be null.");
lastEvent = DateTime.Now;
sampleSize = frameSize / quantization;
qFrameCounts = new int[sampleSize];
this.calcQ = calcQ;
this.resetPastSamples = resetPastSamples;
}
public void Reset(DateTime dt)
{
int q = calcQ(dt);
int resetCount = Math.Min(sampleSize, resetPastSamples(lastEvent, dt));
while (resetCount > 0)
{
int pastQ = (q + resetCount) % sampleSize;
runningCount -= qFrameCounts[pastQ];
qFrameCounts[pastQ] = 0;
--resetCount;
}
}
public void CountEvent(DateTime dt)
{
lock (locker)
{
int q = calcQ(dt);
++runningCount;
++qFrameCounts[q];
lastEvent = dt;
}
}
public (int total, List<int> samples) Capture()
{
lock (locker)
{
var ret = (runningCount, qFrameCounts.ToList());
return ret;
}
}
}
}
Initialization
The usage requires defining a callback for determining the quantized index (Q) and determining how many past events are outside of the ring buffer quantized sample size. A simple example is a 60 second ring buffer with 1 second quantization:
using System;
namespace QuantizedTemporalFrameRingBuffer
{
class Program
{
static void Main(string[] args)
{
Buffer buffer = new Buffer(60, 1, CalcQ, ResetQ);
}
static int CalcQ(DateTime dt)
{
return dt.Second;
}
static int ResetQ(DateTime lastEvent, DateTime now)
{
return (int)(now - lastEvent).TotalSeconds;
}
}
}
The reason for the callbacks is that the ring buffer doesn't need to know how you're calculating the quantization index and the number of "slots" that need to be reset. For example, you might want a ring buffer that tracks events over a 24 hour period with quantization at one hour. The first two parameters to the constructor:
public Buffer(int frameSize, int quantization,
are not associated with a temporal "unit" but rather the frame size and the desired quantization. The callbacks for calculating Q
and the number of slots that need to be reset determine the desired usage.
A Real Life Example
In the WCF application I've wired this into, the client's web requests per minute are tracked like this:
public static Dictionary<string, QBuffer> buffers = new Dictionary<string, QBuffer>();
...
QBuffer qbuffer;
if (!buffers.TryGetValue(client, out qbuffer))
{
qbuffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds);
buffers[tenant] = qbuffer;
}
DateTime now = DateTime.Now;
qbuffer.Reset(now);
qbuffer.CountEvent(now);
The Reset
call might look odd at first. What this is doing is zeroing past event slots outside of the frame, being one minute in this case. It executes the callback:
(lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds
Thus:
- If we are still in the current slot (the last event occurred within the same second as the current event), nothing is touched.
- If the elapsed seconds is greater than
0
, those slots are zeroed:
while (resetCount > 0)
{
int pastQ = (q + resetCount) % sampleSize;
runningCount -= qFrameCounts[pastQ];
qFrameCounts[pastQ] = 0;
--resetCount;
}
and the running count is reduced by whatever the event counts were in each slot.
Similarly, when retrieving the current frame of events, "current" means, at the time the request being made to get the events. Therefore, in the API endpoint that retrieves the events -- keep in mind that this is a WCF application I was using this for.
public class ClientHitCount
{
public string Client { get; set; }
public int Count { get; set; }
}
public Stream GetQBufferRunningCounts()
{
var clients = GetClientList();
var samples = clients.Select(c =>
{
var buffer = buffers[c];
buffer.Reset(DateTime.Now);
return new ClientHitCount() { Client = c, Count = buffers[c].GetRunningCount() };
});
return Response.AsJson(samples);
}
Again, note that Reset
is called first. Let's say the last event occurred two minutes ago -- when we make the call to get the number of events that occurred in the frame, we are two minutes "too late", so the purpose of the Reset
call is to synchronize the ring buffer with the current time frame.
A Simple Page
A very simple rendering on a web page can be accomplished with AnyChart:
<!DOCTYPE html>
<html height="100%">
<head>
<title>Events</title>
<script src="https://cdn.anychart.com/releases/8.0.0/js/anychart-base.min.js"></script>
</head>
<body height="100%">
<div id="container" style="width: 100%; height: 90vh"></div>
<script>
function getData(callback) {
var xhttp = new XMLHttpRequest();
xhttp.onreadystatechange = function() {
if (this.readyState == 4 && this.status == 200) {
callback(this.responseText);
}
};
xhttp.open("GET", "[your endpoint]", true);
xhttp.send();
}
function updateChart(chart, strJson) {
console.log();
let json = JSON.parse(strJson);
let fjson = json.filter(r => r.Count > 0);
let data = {rows: fjson.map(r => [r.Client, r.Count])};
chart.data(data);
chart.draw();
}
anychart.onDocumentReady(function() {
var chart = anychart.bar();
chart.container('container');
chart.barsPadding(10);
setInterval(() => {
let data = getData(json => updateChart(chart, json));
}, 1000);
});
</script>
</body>
</html>
Unit Tests
A few useful unit tests.
Events Within a Q Test
[TestMethod]
public void EventsInOneQIndexTest()
{
QBuffer buffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);
DateTime now = DateTime.Now;
buffer.Reset(now);
buffer.CountEvent(now);
buffer.Reset(now);
buffer.CountEvent(now);
buffer.Reset(now);
buffer.CountEvent(now);
int index = now.Second;
Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}
Events Within Two Consecutive Q Test
[TestMethod]
public void EventsInTwoQIndicesTest()
{
QBuffer buffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);
DateTime now = DateTime.Now;
DateTime next = now.AddSeconds(1);
buffer.Reset(now);
buffer.CountEvent(now);
buffer.Reset(next);
buffer.CountEvent(next);
buffer.Reset(next);
buffer.CountEvent(next);
int index = now.Second;
Assert.IsTrue(buffer.Frame[index] == 1, "Expected a count of 1.");
Assert.IsTrue(buffer.Frame[index + 1] == 2, "Expected a count of 2.");
Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}
Frame Cleared Test
[TestMethod]
public void FrameClearedTest()
{
QBuffer buffer = new QBuffer(60, 1, dt => dt.Second,
(lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);
DateTime now = DateTime.Now;
DateTime next = now.AddSeconds(1);
DateTime frameNext = next.AddSeconds(60);
buffer.Reset(now);
buffer.CountEvent(now);
buffer.Reset(next);
buffer.CountEvent(next);
buffer.Reset(next);
buffer.CountEvent(next);
buffer.Reset(frameNext);
buffer.CountEvent(frameNext);
buffer.Reset(frameNext);
buffer.CountEvent(frameNext);
buffer.Reset(frameNext);
buffer.CountEvent(frameNext);
int index = frameNext.Second;
Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}
Conclusion
Not much to say here - it's a simple but useful way of quantizing events to reduce memory usage and to create a sample frame size of a desired duration. And the title "Thread Safe Quantized Temporal Frame Ring Buffer" is cool.
History
- 6th September, 2020: Initial version