diff --git a/src/Hangfire.MemoryStorage/MemoryStorage.cs b/src/Hangfire.MemoryStorage/MemoryStorage.cs index 69a3c70..0b5a479 100644 --- a/src/Hangfire.MemoryStorage/MemoryStorage.cs +++ b/src/Hangfire.MemoryStorage/MemoryStorage.cs @@ -9,6 +9,9 @@ public class MemoryStorage : JobStorage { private readonly MemoryStorageOptions _options; + private readonly object _connectionLock = new object(); + private MemoryStorageConnection _connection; + public Data Data { get; } public MemoryStorage() : this(new MemoryStorageOptions(), new Data()) @@ -27,7 +30,12 @@ public MemoryStorage(MemoryStorageOptions options, Data data) public override IStorageConnection GetConnection() { - return new MemoryStorageConnection(Data, _options.FetchNextJobTimeout); + lock (_connectionLock) + { + if (_connection == null) + _connection = new MemoryStorageConnection(Data, _options.FetchNextJobTimeout); + } + return _connection; } public override IMonitoringApi GetMonitoringApi() diff --git a/src/Hangfire.MemoryStorage/MemoryStorageConnection.cs b/src/Hangfire.MemoryStorage/MemoryStorageConnection.cs index 30e581c..989c0c5 100644 --- a/src/Hangfire.MemoryStorage/MemoryStorageConnection.cs +++ b/src/Hangfire.MemoryStorage/MemoryStorageConnection.cs @@ -15,10 +15,11 @@ namespace Hangfire.MemoryStorage { public class MemoryStorageConnection : JobStorageConnection { - private static readonly object FetchJobsLock = new object(); + private readonly object FetchJobsLock = new object(); private readonly TimeSpan _fetchNextJobTimeout; private readonly Data _data; - private static readonly ConcurrentDictionary _locks = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _locks = new ConcurrentDictionary(); + private readonly AutoResetEvent _newItemInQueueEvent = new AutoResetEvent(true); public MemoryStorageConnection(Data data, TimeSpan fetchNextJobTimeout) { @@ -26,8 +27,6 @@ public MemoryStorageConnection(Data data, TimeSpan fetchNextJobTimeout) _data = data; } - internal static readonly AutoResetEvent NewItemInQueueEvent = new AutoResetEvent(true); - public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout) { // try to acquire an existing lock or create a new one if none exists for the resource @@ -102,7 +101,7 @@ public override string CreateExpiredJob(Job job, IDictionary par public override IWriteOnlyTransaction CreateWriteTransaction() { - return new MemoryStorageWriteOnlyTransaction(_data, NewItemInQueueEvent); + return new MemoryStorageWriteOnlyTransaction(_data, _newItemInQueueEvent); } public override IFetchedJob FetchNextJob(string[] queues, CancellationToken cancellationToken) @@ -139,7 +138,7 @@ orderby q.AddedAt descending } } - WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, NewItemInQueueEvent }, TimeSpan.FromSeconds(15)); + WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, _newItemInQueueEvent }, TimeSpan.FromSeconds(15)); cancellationToken.ThrowIfCancellationRequested(); }