Skip to content

Commit

Permalink
Fix DiskQueue bugs fix #1354
Browse files Browse the repository at this point in the history
  • Loading branch information
reidsunderland committed Dec 20, 2024
1 parent 9a8e040 commit c3e7c8b
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 34 deletions.
54 changes: 28 additions & 26 deletions sarracenia/diskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def _count_msgs(self, file_path) -> int:
for line in f:
if "{" in line:
count +=1
logger.debug(f"counted {count} msgs in {file_path}")

return count

Expand Down Expand Up @@ -231,35 +232,29 @@ def get(self, maximum_messages_to_get=1):
if no message (and new or state file there)
we wait for housekeeping to present retry messages
"""

if self.msg_count < 0:
return []
elif self.msg_count == 0:
try:
os.unlink(self.queue_file)
self.queue_fp.close()
except Exception as ex:
pass

self.queue_fp=None
self.msg_count=-1
if self.msg_count == 0 and self.queue_fp is None:
return []

ml = []
count = 0

# if the retry queue is empty, no sense looping.
mx = self.msg_count if self.msg_count < maximum_messages_to_get else maximum_messages_to_get

while count < mx:
while count < maximum_messages_to_get:
self.queue_fp, message = self.msg_get_from_file(
self.queue_fp, self.queue_file)

# FIXME MG as discussed with Peter
# no housekeeping in get ...
# if no message (and new or state file there)
# we wait for housekeeping to present retry messages
if not message:
self.msg_count=0
return
try:
os.unlink(self.queue_file)
except:
pass
self.queue_fp = None
self.msg_count = 0
#logger.debug("MG DEBUG retry get return None")
break

count += 1
if self.is_expired(message):
#logger.error("MG invalid %s" % message)
continue
Expand All @@ -269,9 +264,18 @@ def get(self, maximum_messages_to_get=1):
message['_deleteOnPost'].remove('ack_id')

ml.append(message)
count += 1

self.msg_count -= count

# after getting the last message from the file, close it
if self.msg_count == 0:
try:
os.unlink(self.queue_file)
except:
pass
self.queue_fp = None

return ml

def in_cache(self, message) -> bool:
Expand All @@ -291,7 +295,7 @@ def in_cache(self, message) -> bool:
elif 'pubTime' in message:
sumstr = jsonpickle.encode(message['pubTime'])
else:
logger.info('no key found for message, cannot add')
logger.warning('no key found for message, cannot add')
return False

cache_key = urlstr + ' ' + sumstr
Expand Down Expand Up @@ -374,14 +378,12 @@ def on_housekeeping(self):
remove .new
rename housekeeping to queue for next period.
"""
logger.debug("%s on_housekeeping" % self.name)
logger.debug(f"{self.name} on_housekeeping, {self.msg_count} msgs in queue file, {self.msg_count_new} in new file")

# finish retry before reshuffling all retries entries

if os.path.isfile(self.queue_file) and self.queue_fp != None:
logger.info(
"have not finished retry list. Resuming retries with %s" %
self.queue_file)
if (os.path.isfile(self.queue_file) and self.queue_fp != None) or self.msg_count != 0:
logger.info(f"still {self.msg_count} messages in {self.name} list. Resuming retries with {self.queue_file}")
return

self.now = sarracenia.nowflt()
Expand Down
129 changes: 121 additions & 8 deletions tests/sarracenia/diskqueue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,25 +240,35 @@ def test_on_housekeeping__FinishRetry(tmp_path, caplog):
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
download_retry = DiskQueue(BaseOptions, 'test_on_housekeeping__FinishRetry')

message = make_message()

download_retry.queue_fp = open(download_retry.queue_file, 'a')
line = jsonpickle.encode(message) + '\n'
download_retry.queue_fp.write(line + line)
download_retry.queue_fp.flush()

hk_out = download_retry.on_housekeeping()

assert hk_out == None

# This should not be logged unless there is actually messages in the queue
log_found_notFinished = False
for record in caplog.records:
if "Resuming retries" in record.message:
log_found_notFinished = True

assert log_found_notFinished == False

m1 = make_message()
download_retry.put([m1])

# put message into Queue from new
download_retry.on_housekeeping()

# run housekeeping again and now it should say it's not done
download_retry.on_housekeeping()
# This should not be logged unless there is actually messages in the queue
log_found_notFinished = False
for record in caplog.records:
if "have not finished retry list" in record.message:
if "Resuming retries" in record.message:
log_found_notFinished = True

assert log_found_notFinished == True


def test_on_housekeeping(tmp_path, caplog):
BaseOptions = Options()
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
Expand Down Expand Up @@ -290,3 +300,106 @@ def test_on_housekeeping(tmp_path, caplog):
assert log_found_HasQueue == True
assert log_found_NumMessages == True
assert log_found_Elapsed == True

def test_diskqueue(tmp_path, caplog):
""" DiskQueue integration test, tests the behaviour of the class, mimicking how it's actually used in sr3.
"""
BaseOptions = Options()
BaseOptions.pid_filename = str(tmp_path) + os.sep + "pidfilename.txt"
dq = DiskQueue(BaseOptions, 'DiskQueue_Integration')
m1 = make_message()
m2 = make_message()
m2['pubTime'] = "20200118151049.356378078"
m3 = make_message()
m3['pubTime'] = "20240118151049.356378078"

dq.put([m1])

assert len(dq) == 1
assert dq.msg_count_new == 1
assert dq.msg_count == 0

dq.put([m2, m3])
assert len(dq) == 3
assert dq.msg_count_new == 3
assert dq.msg_count == 0

# should not be possible to get a message until after housekeeping
got = dq.get(2)
assert len(got) == 0
assert len(dq) == 3
assert dq.msg_count_new == 3
assert dq.msg_count == 0

# now all messages should be moved from new file to normal file
dq.on_housekeeping()
assert len(dq) == 3
assert dq.msg_count_new == 0
assert dq.msg_count == 3

# now we can get
got = dq.get(2)
assert len(got) == 2
assert len(dq) == 1
assert dq.msg_count_new == 0
assert dq.msg_count == 1

# try running housekeeping again
dq.on_housekeeping()
assert len(dq) == 1
assert dq.msg_count_new == 0
assert dq.msg_count == 1

log_found_resuming_retries = False
for record in caplog.records:
if "Resuming retries" in record.message:
log_found_resuming_retries = True
assert log_found_resuming_retries

# add messages back
dq.put([m1, m2])
assert len(dq) == 3
assert dq.msg_count_new == 2
assert dq.msg_count == 1

dq.on_housekeeping()
assert len(dq) == 3
assert dq.msg_count_new == 2
assert dq.msg_count == 1

log_found_resuming_retries = False
for record in caplog.records:
if "Resuming retries" in record.message:
log_found_resuming_retries = True
assert log_found_resuming_retries

# get 1, now the queue is empty
got = dq.get()
assert len(got) == 1
assert len(dq) == 2
assert dq.msg_count_new == 2
assert dq.msg_count == 0

# now housekeeping can move new msgs to regular file
dq.on_housekeeping()
assert len(dq) == 2
assert dq.msg_count_new == 0
assert dq.msg_count == 2

# add message back before closing, 1 in new, 2 in regular
dq.put([m1])
assert len(dq) == 3
assert dq.msg_count_new == 1
assert dq.msg_count == 2

# close and re-open, messages in both new and regular file
dq.close()
dq = DiskQueue(BaseOptions, 'DiskQueue_Integration')
assert len(dq) == 3
assert dq.msg_count_new == 1
assert dq.msg_count == 2





0 comments on commit c3e7c8b

Please sign in to comment.