I have a RabbitMQ Queue, filled with thousands of messages. I need my consumer to consume 1 message per second, so I have implemented a RateLimit policy using Polly. My configuration is as follows:
public static IAsyncPolicy GetPolicy(int mps)
{
if (mps <= 0)
{
throw new ArgumentOutOfRangeException(nameof(mps));
}
return Policy
.HandleResult<HttpResponseMessage>(result => {
return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
})
.Or<Polly.RateLimit.RateLimitRejectedException>()
.WaitAndRetryForeverAsync((retryNum, context) => {
Console.WriteLine($"Retrying. Num: {retryNum}");
return TimeSpan.FromSeconds(1);
}).WrapAsync(
Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}
where mps
is 1
Now what I've noticed is the following:
WaitAndRetryForeverAsync
executing multiple (tens) of timesIf I set the mps
to 50, the following happens:
Is there a bug with the Policy.RateLimitAsync
call?
Am I doing something wrong?
I have to emphasize here that at the time of writing the rate limiter policy is considered quite new. It is available since 7.2.3, which is the current stable version. So, it is not as mature as other policies.
Based on its documentation I think it's unclear how does it really work.
Let me show you what I mean through a simple example
var localQueue = new Queue<int>();
for (int i = 0; i < 1000; i++)
{
localQueue.Enqueue(i);
}
RateLimitPolicy rateLimiter = Policy
.RateLimit(20, TimeSpan.FromSeconds(1));
while (localQueue.TryPeek(out _))
{
rateLimiter.Execute(() =>
{
Console.WriteLine(localQueue.Dequeue());
Thread.Sleep(10);
});
}
If you run this program it will print 0
and 1
then it crashes with RateLimitRejectedException
.
The answer is that the policy is defined in a way that it can prevent abuse. We wait only 10 milliseconds between two operations. It is considered as an abuse from a policy perspective.
So, without this abuse prevention we would consume the allowed bandwidth under 200 milliseconds and we could not perform any further action in the remaining 800 milliseconds.
The result would be more or less the same
It could happily consume the whole queue without ever gets halted
Why? Because 1000 milliseconds / 20 allowed execution = 50 milliseconds token
This means that your allowed executions are distributed evenly over time.
Let's play a little bit with the burst mode.
Let's set the maxBurst to 2 and the sleep to 49
RateLimitPolicy rateLimiter = Policy
.RateLimit(20, TimeSpan.FromSeconds(1), 2);
while (localQueue.TryPeek(out _))
{
rateLimiter.Execute(() =>
{
Console.WriteLine(localQueue.Dequeue());
Thread.Sleep(49);
});
}
In this mode the exception will be thrown between 10-20. (Run it multiple times)
Let's change the maxBurst to 4. It will crash between 20-60...
So, the rate limiter does not work in the way as you might expect. It allows evenly distributed traffic.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With