Skip to content

Commit

Permalink
Fixed logging
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Jan 31, 2025
1 parent c8a9832 commit 3965982
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
62 changes: 31 additions & 31 deletions src/Foundatio.Redis/Queues/RedisQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public RedisQueue(RedisQueueOptions<T> options) : base(options)
var interval = _options.WorkItemTimeout > TimeSpan.FromSeconds(1) ? _options.WorkItemTimeout.Min(TimeSpan.FromMinutes(1)) : TimeSpan.FromSeconds(1);
_maintenanceLockProvider = new ThrottlingLockProvider(_cache, 1, interval);

_logger.LogInformation("Queue {QueueId} created. Retries: {Retries} Retry Delay: {RetryDelay:g}, Maintenance Interval: {MaintenanceInterval:g}", QueueId, _options.Retries, _options.RetryDelay, interval);
_logger.LogInformation("Queue {QueueName}:{QueueId} created. Retries: {Retries} Retry Delay: {RetryDelay:g}, Maintenance Interval: {MaintenanceInterval:g}", _options.Name, QueueId, _options.Retries, _options.RetryDelay, interval);
}

public RedisQueue(Builder<RedisQueueOptionsBuilder<T>, RedisQueueOptions<T>> config)
Expand All @@ -80,7 +80,7 @@ private async Task EnsureMaintenanceRunningAsync()
if (_queueDisposedCancellationTokenSource.IsCancellationRequested || _maintenanceTask != null)
return;

_logger.LogTrace("Starting maintenance for {Name}", _options.Name);
_logger.LogTrace("Starting maintenance for {QueueName}", _options.Name);
_maintenanceTask = Task.Run(() => DoMaintenanceWorkLoopAsync());
}
}
Expand All @@ -95,10 +95,10 @@ private async Task EnsureTopicSubscriptionAsync()
if (_queueDisposedCancellationTokenSource.IsCancellationRequested || _isSubscribed)
return;

_logger.LogTrace("Subscribing to enqueue messages for {Name}", _options.Name);
_logger.LogTrace("Subscribing to enqueue messages for {QueueName}", _options.Name);
await _subscriber.SubscribeAsync(RedisChannel.Literal(GetTopicName()), OnTopicMessage).AnyContext();
_isSubscribed = true;
_logger.LogTrace("Subscribed to enqueue messages for {Name}", _options.Name);
_logger.LogTrace("Subscribed to enqueue messages for {QueueName}", _options.Name);
}
}

Expand Down Expand Up @@ -213,15 +213,15 @@ private string GetTopicName()
protected override async Task<string> EnqueueImplAsync(T data, QueueEntryOptions options)
{
string id = Guid.NewGuid().ToString("N");
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {Name} enqueue item: {EntryId}", _options.Name, id);
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {QueueName} enqueue item: {QueueEntryId}", _options.Name, id);

if (options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero)
throw new NotSupportedException("DeliveryDelay is not supported in the Redis queue implementation.");

bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
if (!await OnEnqueuingAsync(data, options).AnyContext())
{
if (isTraceLogLevelEnabled) _logger.LogTrace("Aborting enqueue item: {EntryId}", id);
if (isTraceLogLevelEnabled) _logger.LogTrace("Aborting enqueue item: {QueueEntryId}", id);
return null;
}

Expand Down Expand Up @@ -266,16 +266,16 @@ protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken,
if (handler == null)
throw new ArgumentNullException(nameof(handler));

_logger.LogTrace("Queue {Name} start working", _options.Name);
_logger.LogTrace("Queue {QueueName} start working", _options.Name);

_workers.Add(Task.Run(async () =>
{
using var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(cancellationToken);
_logger.LogTrace("WorkerLoop Start {Name}", _options.Name);
_logger.LogTrace("WorkerLoop Start {QueueName}", _options.Name);

while (!linkedCancellationToken.IsCancellationRequested)
{
_logger.LogTrace("WorkerLoop Signaled {Name}", _options.Name);
_logger.LogTrace("WorkerLoop Signaled {QueueName}", _options.Name);

IQueueEntry<T> queueEntry = null;
try
Expand Down Expand Up @@ -325,13 +325,13 @@ protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken,
}
}

_logger.LogTrace("Worker exiting: {Name} Cancel Requested: {IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested);
_logger.LogTrace("Worker exiting: {QueueName} Cancel Requested: {IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested);
}, GetLinkedDisposableCancellationTokenSource(cancellationToken).Token));
}

protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken linkedCancellationToken)
{
_logger.LogTrace("Queue {Name} dequeuing item...", _options.Name);
_logger.LogTrace("Queue {QueueName} dequeuing item...", _options.Name);

if (!IsMaintenanceRunning)
await EnsureMaintenanceRunningAsync().AnyContext();
Expand Down Expand Up @@ -385,10 +385,10 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken

public override async Task RenewLockAsync(IQueueEntry<T> entry)
{
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {Name} renew lock item: {EntryId}", _options.Name, entry.Id);
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {QueueName} renew lock item: {QueueEntryId}", _options.Name, entry.Id);
await Run.WithRetriesAsync(() => _cache.SetAsync(GetRenewedTimeKey(entry.Id), _timeProvider.GetUtcNow().Ticks, GetWorkItemTimeoutTimeTtl()), logger: _logger).AnyContext();
await OnLockRenewedAsync(entry).AnyContext();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Renew lock done: {EntryId}", entry.Id);
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Renew lock done: {QueueEntryId}", entry.Id);
}

private async Task<QueueEntry<T>> GetQueueEntryAsync(string workId)
Expand Down Expand Up @@ -439,24 +439,24 @@ private async Task<RedisValue> DequeueIdAsync(CancellationToken linkedCancellati
}
catch (Exception ex)
{
_logger.LogError(ex, "Queue {Name} dequeue id async error: {Error}", _options.Name, ex.Message);
_logger.LogError(ex, "Queue {QueueName} dequeue id async error: {Error}", _options.Name, ex.Message);
return RedisValue.Null;
}
}

public override async Task CompleteAsync(IQueueEntry<T> entry)
{
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {Name} complete item: {EntryId}", _options.Name, entry.Id);
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {QueueName} complete item: {QueueEntryId}", _options.Name, entry.Id);
if (entry.IsAbandoned || entry.IsCompleted)
{
//_logger.LogDebug("Queue {Name} item already abandoned or completed: {EntryId}", _options.Name, entry.Id);
//_logger.LogDebug("Queue {QueueName} item already abandoned or completed: {QueueEntryId}", _options.Name, entry.Id);
throw new InvalidOperationException("Queue entry has already been completed or abandoned.");
}

long result = await Run.WithRetriesAsync(() => Database.ListRemoveAsync(_workListName, entry.Id), logger: _logger).AnyContext();
if (result == 0)
{
_logger.LogDebug("Queue {Name} item not in work list: {EntryId}", _options.Name, entry.Id);
_logger.LogDebug("Queue {QueueName} item not in work list: {QueueEntryId}", _options.Name, entry.Id);
throw new InvalidOperationException("Queue entry not in work list, it may have been auto abandoned.");
}

Expand All @@ -472,15 +472,15 @@ await Run.WithRetriesAsync(() => Task.WhenAll(
Interlocked.Increment(ref _completedCount);
entry.MarkCompleted();
await OnCompletedAsync(entry).AnyContext();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Complete done: {EntryId}", entry.Id);
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Complete done: {QueueEntryId}", entry.Id);
}

public override async Task AbandonAsync(IQueueEntry<T> entry)
{
_logger.LogDebug("Queue {Name}:{QueueId} abandon item: {EntryId}", _options.Name, QueueId, entry.Id);
_logger.LogDebug("Queue {QueueName}:{QueueId} abandon item: {QueueEntryId}", _options.Name, QueueId, entry.Id);
if (entry.IsAbandoned || entry.IsCompleted)
{
_logger.LogError("Queue {Name}:{QueueId} unable to abandon item because already abandoned or completed: {EntryId}", _options.Name, QueueId, entry.Id);
_logger.LogError("Queue {QueueName}:{QueueId} unable to abandon item because already abandoned or completed: {QueueEntryId}", _options.Name, QueueId, entry.Id);
throw new InvalidOperationException("Queue entry has already been completed or abandoned.");
}

Expand All @@ -491,11 +491,11 @@ public override async Task AbandonAsync(IQueueEntry<T> entry)
attempts = attemptsCachedValue.Value + 1;

var retryDelay = GetRetryDelay(attempts);
_logger.LogInformation("Item: {EntryId}, Retry attempts: {RetryAttempts}, Retries Allowed: {Retries}, Retry Delay: {RetryDelay:g}", entry.Id, attempts - 1, _options.Retries, retryDelay);
_logger.LogInformation("Item: {QueueEntryId}, Retry attempts: {QueueEntryAttempts}, Retries Allowed: {Retries}, Retry Delay: {RetryDelay:g}", entry.Id, attempts - 1, _options.Retries, retryDelay);

if (attempts > _options.Retries)
{
_logger.LogInformation("Exceeded retry limit moving to deadletter: {EntryId}", entry.Id);
_logger.LogInformation("Exceeded retry limit moving to deadletter: {QueueEntryId}", entry.Id);

var tx = Database.CreateTransaction();
tx.AddCondition(Condition.KeyExists(GetRenewedTimeKey(entry.Id)));
Expand All @@ -515,7 +515,7 @@ await Run.WithRetriesAsync(() => Task.WhenAll(
}
else if (retryDelay > TimeSpan.Zero)
{
_logger.LogInformation("Adding item to wait list for future retry: {EntryId}", entry.Id);
_logger.LogInformation("Adding item to wait list for future retry: {QueueEntryId}", entry.Id);

await Run.WithRetriesAsync(() => Task.WhenAll(
_cache.SetAsync(GetWaitTimeKey(entry.Id), _timeProvider.GetUtcNow().Add(retryDelay).Ticks, GetWaitTimeTtl()),
Expand All @@ -535,7 +535,7 @@ await Run.WithRetriesAsync(() => Task.WhenAll(
}
else
{
_logger.LogInformation("Adding item back to queue for retry: {EntryId}", entry.Id);
_logger.LogInformation("Adding item back to queue for retry: {QueueEntryId}", entry.Id);

await Run.WithRetriesAsync(() => _cache.IncrementAsync(attemptsCacheKey, 1, GetAttemptsTtl()), logger: _logger).AnyContext();

Expand All @@ -558,7 +558,7 @@ await Run.WithRetriesAsync(() => Task.WhenAll(
Interlocked.Increment(ref _abandonedCount);
entry.MarkAbandoned();
await OnAbandonedAsync(entry).AnyContext();
_logger.LogInformation("Abandon complete: {EntryId}", entry.Id);
_logger.LogInformation("Abandon complete: {QueueEntryId}", entry.Id);
}

private TimeSpan GetRetryDelay(int attempts)
Expand All @@ -580,7 +580,7 @@ protected override Task<IEnumerable<T>> GetDeadletterItemsImplAsync(Cancellation

public override async Task DeleteQueueAsync()
{
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Deleting queue: {Name}", _options.Name);
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Deleting queue: {QueueName}", _options.Name);
await Task.WhenAll(
DeleteListAsync(_queueListName),
DeleteListAsync(_workListName),
Expand Down Expand Up @@ -640,7 +640,7 @@ private async Task TrimDeadletterItemsAsync(int maxItems)

private void OnTopicMessage(RedisChannel redisChannel, RedisValue redisValue)
{
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Queue OnMessage {Name}: {Value}", _options.Name, redisValue);
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Queue OnMessage {QueueName}: {Value}", _options.Name, redisValue);
_autoResetEvent.Set();
}

Expand All @@ -656,7 +656,7 @@ public async Task DoMaintenanceWorkAsync()
if (_queueDisposedCancellationTokenSource.IsCancellationRequested)
return;

_logger.LogTrace("Starting DoMaintenance: Name: {Name} Id: {Id}", _options.Name, QueueId);
_logger.LogTrace("Starting DoMaintenance: Name: {QueueName} Id: {QueueId}", _options.Name, QueueId);
var utcNow = _timeProvider.GetUtcNow();

try
Expand Down Expand Up @@ -746,20 +746,20 @@ public async Task DoMaintenanceWorkAsync()
_logger.LogError(ex, "Error trimming deadletter items: {0}", ex.Message);
}

_logger.LogTrace("Finished DoMaintenance: Name: {Name} Id: {Id} Duration: {Duration:g}", _options.Name, QueueId, _timeProvider.GetUtcNow().Subtract(utcNow));
_logger.LogTrace("Finished DoMaintenance: Name: {QueueName} Id: {QueueId} Duration: {Duration:g}", _options.Name, QueueId, _timeProvider.GetUtcNow().Subtract(utcNow));
}

private async Task DoMaintenanceWorkLoopAsync()
{
while (!_queueDisposedCancellationTokenSource.IsCancellationRequested)
{
_logger.LogTrace("Requesting Maintenance Lock. Name: {Name} Id: {Id}", _options.Name, QueueId);
_logger.LogTrace("Requesting Maintenance Lock. Name: {QueueName} Id: {QueueId}", _options.Name, QueueId);

var utcNow = _timeProvider.GetUtcNow();
using var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token);
bool gotLock = await _maintenanceLockProvider.TryUsingAsync($"{_options.Name}-maintenance", DoMaintenanceWorkAsync, cancellationToken: linkedCancellationToken.Token).AnyContext();

_logger.LogTrace("{Status} Maintenance Lock. Name: {Name} Id: {Id} Time To Acquire: {AcquireDuration:g}", gotLock ? "Acquired" : "Failed to acquire", _options.Name, QueueId, _timeProvider.GetUtcNow().Subtract(utcNow));
_logger.LogTrace("{Status} Maintenance Lock. Name: {QueueName} Id: {QueueId} Time To Acquire: {AcquireDuration:g}", gotLock ? "Acquired" : "Failed to acquire", _options.Name, QueueId, _timeProvider.GetUtcNow().Subtract(utcNow));
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public async Task DatabaseTimeoutDuringDequeueHandledCorectly_Issue64()
{
// not using GetQueue() here because I need to change the ops timeout in the redis connection string
const int OPS_TIMEOUT_MS = 100;
string connectionString = Configuration.GetConnectionString("RedisConnectionString") + $",syncTimeout={OPS_TIMEOUT_MS},asyncTimeout={OPS_TIMEOUT_MS}"; ;
string connectionString = Configuration.GetConnectionString("RedisConnectionString") + $",syncTimeout={OPS_TIMEOUT_MS},asyncTimeout={OPS_TIMEOUT_MS}";
var muxer = await ConnectionMultiplexer.ConnectAsync(connectionString);

const string QUEUE_NAME = "Test";
Expand Down

0 comments on commit 3965982

Please sign in to comment.