Skip to content

Commit

Permalink
Merge pull request #157 from dlxeon/feat/idleduration-for-bucketratel…
Browse files Browse the repository at this point in the history
…imiter

Implement IdleDuration for RedisTokenBucketRateLimiter
  • Loading branch information
cristipufu authored Jun 13, 2024
2 parents c1522a8 + 24c8000 commit bb1dd3a
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 11 deletions.
24 changes: 21 additions & 3 deletions src/RedisRateLimiting/Concurrency/RedisConcurrencyRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -10,6 +11,8 @@ namespace RedisRateLimiting
{
public class RedisConcurrencyRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

Check warning on line 14 in src/RedisRateLimiting/Concurrency/RedisConcurrencyRateLimiter.cs

View workflow job for this annotation

GitHub Actions / Build and analyze

A static field in a generic type is not shared among instances of different close constructed types. (https://rules.sonarsource.com/csharp/RSPEC-2743)

private readonly RedisConcurrencyManager _redisManager;
private readonly RedisConcurrencyRateLimiterOptions _options;
private readonly ConcurrentQueue<Request> _queue = new();
Expand All @@ -20,7 +23,12 @@ public class RedisConcurrencyRateLimiter<TKey> : RateLimiter

private readonly ConcurrencyLease FailedLease = new(false, null, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisConcurrencyRateLimiter(TKey partitionKey, RedisConcurrencyRateLimiterOptions options)
{
Expand Down Expand Up @@ -64,14 +72,24 @@ public RedisConcurrencyRateLimiter(TKey partitionKey, RedisConcurrencyRateLimite
return _redisManager.GetStatistics();
}

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
_idleSince = Stopwatch.GetTimestamp();
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, string.Format("{0} permit(s) exceeds the permit limit of {1}.", permitCount, _options.PermitLimit));
}

return AcquireAsyncCoreInternal(cancellationToken);
Interlocked.Increment(ref _activeRequestsCount);
try
{
return await AcquireAsyncCoreInternal(cancellationToken);
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
Expand Down
22 changes: 20 additions & 2 deletions src/RedisRateLimiting/FixedWindow/RedisFixedWindowRateLimiter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RedisRateLimiting.Concurrency;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -9,12 +10,19 @@ namespace RedisRateLimiting
{
public class RedisFixedWindowRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

Check warning on line 13 in src/RedisRateLimiting/FixedWindow/RedisFixedWindowRateLimiter.cs

View workflow job for this annotation

GitHub Actions / Build and analyze

A static field in a generic type is not shared among instances of different close constructed types. (https://rules.sonarsource.com/csharp/RSPEC-2743)

private readonly RedisFixedWindowManager _redisManager;
private readonly RedisFixedWindowRateLimiterOptions _options;

private readonly FixedWindowLease FailedLease = new(isAcquired: false, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisFixedWindowRateLimiter(TKey partitionKey, RedisFixedWindowRateLimiterOptions options)
{
Expand Down Expand Up @@ -74,7 +82,17 @@ private async ValueTask<RateLimitLease> AcquireAsyncCoreInternal(int permitCount
Window = _options.Window,
};

var response = await _redisManager.TryAcquireLeaseAsync(permitCount);
RedisFixedWindowResponse response;
Interlocked.Increment(ref _activeRequestsCount);
try
{
response = await _redisManager.TryAcquireLeaseAsync(permitCount);
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}

leaseContext.Count = response.Count;
leaseContext.RetryAfter = response.RetryAfter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RedisRateLimiting.Concurrency;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -9,12 +10,19 @@ namespace RedisRateLimiting
{
public class RedisSlidingWindowRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

Check warning on line 13 in src/RedisRateLimiting/SlidingWindow/RedisSlidingWindowRateLimiter.cs

View workflow job for this annotation

GitHub Actions / Build and analyze

A static field in a generic type is not shared among instances of different close constructed types. (https://rules.sonarsource.com/csharp/RSPEC-2743)

private readonly RedisSlidingWindowManager _redisManager;
private readonly RedisSlidingWindowRateLimiterOptions _options;

private readonly SlidingWindowLease FailedLease = new(isAcquired: false, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisSlidingWindowRateLimiter(TKey partitionKey, RedisSlidingWindowRateLimiterOptions options)
{
Expand Down Expand Up @@ -50,14 +58,24 @@ public RedisSlidingWindowRateLimiter(TKey partitionKey, RedisSlidingWindowRateLi
return _redisManager.GetStatistics();
}

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
_idleSince = Stopwatch.GetTimestamp();
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, string.Format("{0} permit(s) exceeds the permit limit of {1}.", permitCount, _options.PermitLimit));
}

return AcquireAsyncCoreInternal();
Interlocked.Increment(ref _activeRequestsCount);
try
{
return await AcquireAsyncCoreInternal();
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
Expand Down
24 changes: 21 additions & 3 deletions src/RedisRateLimiting/TokenBucket/RedisTokenBucketRateLimiter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RedisRateLimiting.Concurrency;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -9,12 +10,19 @@ namespace RedisRateLimiting
{
public class RedisTokenBucketRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

Check warning on line 13 in src/RedisRateLimiting/TokenBucket/RedisTokenBucketRateLimiter.cs

View workflow job for this annotation

GitHub Actions / Build and analyze

A static field in a generic type is not shared among instances of different close constructed types. (https://rules.sonarsource.com/csharp/RSPEC-2743)

private readonly RedisTokenBucketManager _redisManager;
private readonly RedisTokenBucketRateLimiterOptions _options;

private readonly TokenBucketLease FailedLease = new(isAcquired: false, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisTokenBucketRateLimiter(TKey partitionKey, RedisTokenBucketRateLimiterOptions options)
{
Expand Down Expand Up @@ -55,14 +63,24 @@ public RedisTokenBucketRateLimiter(TKey partitionKey, RedisTokenBucketRateLimite
throw new NotImplementedException();
}

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
_idleSince = Stopwatch.GetTimestamp();
if (permitCount > _options.TokenLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, string.Format("{0} permit(s) exceeds the permit limit of {1}.", permitCount, _options.TokenLimit));
}

return AcquireAsyncCoreInternal(permitCount);
Interlocked.Increment(ref _activeRequestsCount);
try
{
return await AcquireAsyncCoreInternal(permitCount);
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
Expand Down
20 changes: 20 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/ConcurrencyUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,26 @@ public async Task GetPermitWhilePermitEmptyQueueNotEmptyGetsQueued()
using var lease3 = await wait3;
Assert.True(lease3.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisConcurrencyRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisConcurrencyRateLimiterOptions
{
PermitLimit = 1,
QueueLimit = 1,
TryDequeuePeriod = TimeSpan.FromHours(1),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}

static internal void ForceDequeue(RedisConcurrencyRateLimiter<string> limiter)
{
Expand Down
19 changes: 19 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/FixedWindowUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,24 @@ public async Task CanAcquireMultiplePermits()
using var lease3 = await limiter.AcquireAsync(permitCount: 2);
Assert.True(lease3.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisFixedWindowRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisFixedWindowRateLimiterOptions
{
PermitLimit = 1,
Window = TimeSpan.FromMinutes(1),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}
}
}
19 changes: 19 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/SlidingWindowUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,24 @@ public async Task CanAcquireAsyncResourceWithSmallWindow()
using var lease4 = await limiter.AcquireAsync();
Assert.False(lease4.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisSlidingWindowRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisSlidingWindowRateLimiterOptions
{
PermitLimit = 1,
Window = TimeSpan.FromMilliseconds(600),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}
}
}
20 changes: 20 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/TokenBucketUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,25 @@ public async Task CanAcquireMultiPermits()
using var lease3 = await limiter.AcquireAsync(1);
Assert.True(lease3.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisTokenBucketRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisTokenBucketRateLimiterOptions
{
TokenLimit = 1,
TokensPerPeriod = 1,
ReplenishmentPeriod = TimeSpan.FromMinutes(1),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}
}
}

0 comments on commit bb1dd3a

Please sign in to comment.