Skip to content

Commit

Permalink
Merge pull request #28 from healthvana/jimcooley/cache-lock
Browse files Browse the repository at this point in the history
jimcooley/cache_lock
  • Loading branch information
jmichalicek authored Jul 18, 2024
2 parents c2f3498 + 3e825d9 commit d8fe9b1
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 9 deletions.
63 changes: 54 additions & 9 deletions django_mail_viewer/backends/cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""
Backend for test environment.
"""
from contextlib import contextmanager
from os import getpid
from time import monotonic, sleep

from django.core import cache
from django.core.mail.backends.base import BaseEmailBackend

Expand All @@ -24,6 +28,7 @@ def __init__(self, *args, **kwargs):
# to retrieve them. Django does not have a built in way to get the keys
# which exist in the cache.
self.cache_keys_key = 'message_keys'
self.cache_keys_lock_key = 'message_keys_lock'

def send_messages(self, messages):
msg_count = 0
Expand All @@ -32,15 +37,27 @@ def send_messages(self, messages):
message_id = m.get('message-id')
self.cache.set(message_id, m)

# if multiple processes are updating this at the same time then
# things will get hung up. May introduce a lock key and spinlock
# to avoid clobbering the value stored in the list of keys.
current_cache_keys = self.cache.get(self.cache_keys_key)
if not current_cache_keys:
current_cache_keys = []
current_cache_keys.append(message_id)
self.cache.set(self.cache_keys_key, current_cache_keys)
msg_count += 1
# Use a lock key and spinlock
# to avoid clobbering the value stored in the list of keys
# if multiple processes are updating this at the same time.
is_stored = False
loop_count = 0
max_loop_count = 100
while not is_stored:
loop_count += 1
if loop_count > max_loop_count:
break
with self.cache_lock(self.cache_keys_lock_key, getpid()) as acquired:
if acquired:
current_cache_keys = self.cache.get(self.cache_keys_key)
if not current_cache_keys:
current_cache_keys = []
current_cache_keys.append(message_id)
self.cache.set(self.cache_keys_key, current_cache_keys)
msg_count += 1
is_stored = True
else:
sleep(.01)
return msg_count

def get_message(self, lookup_id):
Expand Down Expand Up @@ -71,3 +88,31 @@ def delete_message(self, message_id: str):
message_keys.remove(message_id)
self.cache.set(self.cache_keys_key, message_keys)
self.cache.delete(message_id)

DEFAULT_LOCK_EXPIRE = 60 * 3 # Lock expires in 3 minutes

@contextmanager
def cache_lock(self, lock_id, oid, lock_expires=DEFAULT_LOCK_EXPIRE):
"""
Lock an id for the given time using cache backend
:param lock_id string lock id
:param oid string thread/process identifier
:param lock_expires optional number of seconds to lock
NOTE That we use the lock_expires value without any cushion,
unlike the original version at
http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial
:return boolean if lock acquired
"""
timeout_at = monotonic() + lock_expires
# cache.add fails if the key already exists
acquired = self.cache.add(lock_id, oid, lock_expires)
try:
yield acquired
finally:
# we have to use delete to take
# advantage of using add() for atomic locking
if acquired and monotonic() < timeout_at:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else.
self.cache.delete(lock_id)
62 changes: 62 additions & 0 deletions tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"""
import json
import shutil
import threading
import time
from pathlib import Path

from django.conf import settings
Expand Down Expand Up @@ -147,6 +149,66 @@ def test_delete_message(self):
target_id, message.get('message-id'), f'Message with id {target_id} found in outbox after delete.'
)

def test_cache_lock(self):
"""
Test that the cache_lock() method works with multiple threads.
"""
results = []
concurrency = 5
with mail.get_connection(self.connection_backend) as connection:
def concurrent(results):
try:
myid = "123-%s" % threading.current_thread().ident
with connection.cache_lock("test-lock", myid, 10) as acquired:
if acquired:
results.append(True)
time.sleep(1)
else:
results.append(False)
except Exception:
results.append(False)
threads = []
for i in range(concurrency):
threads.append(threading.Thread(target=concurrent, args=(results,)))
for t in threads:
# start processing concurrent(results) in multiple threads
t.start()
for t in threads:
# wait for all threads to finish
t.join()

# Only one thread should have acquired the lock
self.assertEqual(concurrency, len(results))
self.assertEqual(1, len([val for val in results if val]))

def test_concurrent_send_messages_with_cache_lock(self):
"""
Test that multiple messages sent simultaneously are added to the cache.
"""
messages = []
for i in range(3, 8):
m = mail.EmailMultiAlternatives(
f'Email {i} subject', f'Email {i} text', '[email protected]', [f'to{i}@example.com']
)
messages.append(m)
with mail.get_connection(self.connection_backend) as connection:
self.mail_cache.delete(connection.cache_keys_key)
self.assertIsNone(self.mail_cache.get(connection.get_outbox()))
threads = []
for m in messages:
threads.append(threading.Thread(target=connection.send_messages, args=([m],)))
for t in threads:
t.start()
for t in threads:
# wait for all threads to finish
t.join()
cache_keys = self.mail_cache.get(connection.cache_keys_key)
self.assertEqual(5, len(cache_keys))
original_messages_before_message_id = [m.message().as_string().split('Message-ID:')[0] for m in messages]
for key in cache_keys:
sent_message_before_message_id = self.mail_cache.get(key).as_string().split('Message-ID:')[0]
self.assertIn(sent_message_before_message_id, original_messages_before_message_id)


class DatabaseBackendTest(TestCase):
"""
Expand Down

0 comments on commit d8fe9b1

Please sign in to comment.