Skip to content

Commit

Permalink
fix: Resource watcher re-connects should reset currentVersion (#792)
Browse files Browse the repository at this point in the history
  • Loading branch information
rpearsondev authored Sep 10, 2024
1 parent e8de144 commit 108240e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 67 deletions.
22 changes: 13 additions & 9 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,15 @@ private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
{
await OnEventAsync(type, entity, stoppingToken);
}
catch (KubernetesException e)
catch (KubernetesException e) when (e.Status.Code is (int)HttpStatusCode.GatewayTimeout)
{
if (e.Status.Code is (int)HttpStatusCode.Gone or (int)HttpStatusCode.GatewayTimeout)
{
logger.LogDebug(e, "Watch restarting due to 410 HTTP Gone or 504 Gateway Timeout.");

break;
}

LogReconciliationFailed(e);
logger.LogDebug(e, "Watch restarting due to 504 Gateway Timeout.");
break;
}
catch (KubernetesException e) when (e.Status.Code is (int)HttpStatusCode.Gone)
{
// Special handling when our resource version is outdated.
throw;
}
catch (Exception e)
{
Expand All @@ -202,6 +201,11 @@ void LogReconciliationFailed(Exception exception)
// Don't throw if the cancellation was indeed requested.
break;
}
catch (KubernetesException e) when (e.Status.Code is (int)HttpStatusCode.Gone)
{
logger.LogDebug(e, "Watch restarting with reset bookmark due to 410 HTTP Gone.");
currentVersion = null;
}
catch (Exception e)
{
await OnWatchErrorAsync(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,58 +1,58 @@
using FluentAssertions;

using k8s.LeaderElection;

using KubeOps.Operator.LeaderElection;

using Microsoft.Extensions.Logging;

using Moq;

namespace KubeOps.Operator.Test.LeaderElector;

public sealed class LeaderElectionBackgroundServiceTest
{
[Fact]
public async Task Elector_Throws_Should_Retry()
{
// Arrange.
var logger = Mock.Of<ILogger<LeaderElectionBackgroundService>>();

var electionLock = Mock.Of<ILock>();

var electionLockSubsequentCallEvent = new AutoResetEvent(false);
bool hasElectionLockThrown = false;
Mock.Get(electionLock)
.Setup(electionLock => electionLock.GetAsync(It.IsAny<CancellationToken>()))
.Returns<CancellationToken>(
async cancellationToken =>
{
if (hasElectionLockThrown)
{
// Signal to the test that a subsequent call has been made.
electionLockSubsequentCallEvent.Set();

// Delay returning for a long time, allowing the test to stop the background service, in turn cancelling the cancellation token.
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
throw new InvalidOperationException();
}

hasElectionLockThrown = true;
throw new Exception("Unit test exception");
});

var leaderElectionConfig = new LeaderElectionConfig(electionLock);
var leaderElector = new k8s.LeaderElection.LeaderElector(leaderElectionConfig);

var leaderElectionBackgroundService = new LeaderElectionBackgroundService(logger, leaderElector);

// Act / Assert.
await leaderElectionBackgroundService.StartAsync(CancellationToken.None);

// Starting the background service should result in the lock attempt throwing, and then a subsequent attempt being made.
// Wait for the subsequent event to be signalled, if we time out the test fails. The retry delay requires us to wait at least 3 seconds.
electionLockSubsequentCallEvent.WaitOne(TimeSpan.FromMilliseconds(3100)).Should().BeTrue();

await leaderElectionBackgroundService.StopAsync(CancellationToken.None);
}
}
using FluentAssertions;

using k8s.LeaderElection;

using KubeOps.Operator.LeaderElection;

using Microsoft.Extensions.Logging;

using Moq;

namespace KubeOps.Operator.Test.LeaderElector;

public sealed class LeaderElectionBackgroundServiceTest
{
[Fact]
public async Task Elector_Throws_Should_Retry()
{
// Arrange.
var logger = Mock.Of<ILogger<LeaderElectionBackgroundService>>();

var electionLock = Mock.Of<ILock>();

var electionLockSubsequentCallEvent = new AutoResetEvent(false);
bool hasElectionLockThrown = false;
Mock.Get(electionLock)
.Setup(electionLock => electionLock.GetAsync(It.IsAny<CancellationToken>()))
.Returns<CancellationToken>(
async cancellationToken =>
{
if (hasElectionLockThrown)
{
// Signal to the test that a subsequent call has been made.
electionLockSubsequentCallEvent.Set();

// Delay returning for a long time, allowing the test to stop the background service, in turn cancelling the cancellation token.
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
throw new InvalidOperationException();
}

hasElectionLockThrown = true;
throw new Exception("Unit test exception");
});

var leaderElectionConfig = new LeaderElectionConfig(electionLock);
var leaderElector = new k8s.LeaderElection.LeaderElector(leaderElectionConfig);

var leaderElectionBackgroundService = new LeaderElectionBackgroundService(logger, leaderElector);

// Act / Assert.
await leaderElectionBackgroundService.StartAsync(CancellationToken.None);

// Starting the background service should result in the lock attempt throwing, and then a subsequent attempt being made.
// Wait for the subsequent event to be signalled, if we time out the test fails. The retry delay requires us to wait at least 3 seconds.
electionLockSubsequentCallEvent.WaitOne(TimeSpan.FromMilliseconds(3100)).Should().BeTrue();

await leaderElectionBackgroundService.StopAsync(CancellationToken.None);
}
}

0 comments on commit 108240e

Please sign in to comment.