Click here to Skip to main content
Click here to Skip to main content
Go to top

Limiting sent out rate effectively (Message throttling)

, 5 Jan 2014
Rate this:
Please Sign up or sign in to vote.
An efficient Message rate limiter using circular buffer

Introduction 

Sometimes it becomes necessary to bind the rate of messages/requests being sent out. This could be sending requests/messages over web, or sending to a different process, or even to a different thread within the same process space. Throttling helps to avoid heavy performance penalty induced due to heavy message rate at the peer end.

I faced a similar problem where the peer process actually drops connection if rate exceeds the limit. The peer application maintains a count of number of messages received in a logical time interval (as per its system clock) and resets the count after every fixed interval of time. Because synchronizing the system clocks of sending and receiving applications is not a trivial task, finding reference point to maintain throttle count at sender end becomes impossible without additional functionality from peer. To make our life easier peer process may send us a notification when it starts counting the first time but not always peer applications are implemented that way and we need to have a design to restrict out going message rate in our application without involvement of peer application.  

Background 

Because there is no reference point, maintaining a send count and resetting it after every fixed interval will definitely not work. One approach is to let wait all messages/requests for interval/(allowed messages per interval). This approach won't work for latency sensitive flows because messages are waiting even when the throttling limit is not breached and so is not efficient. Following is the approach taken here.

Suppose requirement is to restrict the rate to maximum of M sends in I interval of time. Compare the current time stamp (t_0) with the time stamp of Mth message sent before the current message under consideration (t_m). If the difference in time stamps is more than I, current message can be sent with zero wait, else it needs to wait for  I - (t_0 - t_m).

long now = TimeProvider.getCurrentTimeInNano();
long lastSent = buffer.getValueAtCurrentIndex();

long delta = interval - (actual_now - lastSent); 

Using the code 

A circular buffer is used to maintain the time stamps of last M sends and achieve a fast lookup. The rest of implementation is within class Throttler. It can be used in non blocking mode (scheduleAsync) or blocking mode. For non-blocking mode single thread scheduler is used to execute messages, blocking mode blocks the running thread for time as specified by the Throttler.getTime function.

I have done the time calculation in Nano-seconds but it can be easily converted to milisec/sec. Following is the getTime() function which computes the time for which message has to wait before sending.

private static long getTime()
{
    long now = TimeProvider.getCurrentTimeInNano();
    long lastSent = buffer.getValueAtCurrentIndex();
    if(lastSent == init_val)
    {
        buffer.updateValueAtCurrIndx(now);
        buffer.updateNextIndx();
        
        return 0;
    }
    
    if(now-lastSent < interval)
    {
        long wait_time = interval - (now-lastSent);
        buffer.updateValueAtCurrIndx(now+wait_time);
        buffer.updateNextIndx();
        return (wait_time);
    }
    buffer.updateValueAtCurrIndx(now);
    buffer.updateNextIndx();
    return 0;
}

This getTime function is used by blocking  call (scheduleBlocking()) to make current thread wait for required amount of time before dispatching.  

Non-blocking call (scheduleAsync()) call implementation is a bit more tricky than blocking call. Because current thread always returns in almost no time and is ready to schedule next message, direct comparison of Mth last sent value received from Circular buffer with current time would be wrong. To tackle this problem, I have used a global_wait variable to translate current time to "actual_now" which can directly be compared with Mth last sent time stored in circular buffer. global_wait is  essentially non-negative (aggregation of all the wait times) - (aggregation of difference in time of two subsequent messages made available to be sent)). 

long delta = interval - (actual_now - lastSent);
global_wait = global_wait + delta;

global_wait = global_wait - (now - previous);
if(global_wait < 0)
    global_wait = 0;

I have used newSingleThreadScheduledExecutor to schedule the tasks. 

Throttler is initialized as 

Throttler.init(51, 1000000000);

where 51 is allowed number of sends in 1000000000 nano secs (1 sec). 

Test

RandomThread is created which essentially samples the the value of Throttle.counter after a duration of interval (I), starting from a randomly chosen reference point. In the main() for loop makes available messages with a random wait such that numbers of messages available are always more than throttle rate.

I have created DummyRunnable which represents task to be executed by Executor in non-blocking call. It can be replaced by actual task to be executed in the application.

Points of Interest

Circular buffers are generally used in implementing efficient queues. This article shows one more efficient  application of circular buffers.  

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

prashantaxe

Singapore Singapore
No Biography provided

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Mobile
Web04 | 2.8.140916.1 | Last Updated 5 Jan 2014
Article Copyright 2014 by prashantaxe
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid