diff --git a/ingestion-edge/ingestion_edge/publish.py b/ingestion-edge/ingestion_edge/publish.py index 23ebf3693..b165f4252 100644 --- a/ingestion-edge/ingestion_edge/publish.py +++ b/ingestion-edge/ingestion_edge/publish.py @@ -106,7 +106,11 @@ def get_queue(config: dict) -> SQLiteAckQueue: for key, value in config.items() if key.startswith("QUEUE_") } - return SQLiteAckQueue(**queue_config) + q: SQLiteAckQueue = SQLiteAckQueue(**queue_config, auto_resume=False) + q.resume_unack_tasks() + # work around https://github.com/peter-wangxu/persist-queue/pull/154 + q.total = q._count() + return q def init_app(app: Sanic) -> Tuple[PublisherClient, SQLiteAckQueue]: diff --git a/ingestion-edge/tests/unit/publish/test_init_app.py b/ingestion-edge/tests/unit/publish/test_init_app.py index 271703de0..0d15d2234 100644 --- a/ingestion-edge/tests/unit/publish/test_init_app.py +++ b/ingestion-edge/tests/unit/publish/test_init_app.py @@ -2,6 +2,7 @@ from ingestion_edge.config import Route from sanic import Sanic from sanic.request import Request +from unittest import mock import pytest ROUTE_TABLE = [ @@ -43,7 +44,7 @@ def app(): ], ) async def test_endpoint(app, kwargs, method, mocker, uri_bytes): - mocker.patch("ingestion_edge.publish.SQLiteAckQueue", dict) + Q = mocker.patch("ingestion_edge.publish.SQLiteAckQueue") client = mocker.patch("ingestion_edge.publish.PublisherClient").return_value mocker.patch("ingestion_edge.publish.submit", lambda _, **kw: kw) app.config["ROUTE_TABLE"] = ROUTE_TABLE @@ -52,7 +53,12 @@ async def test_endpoint(app, kwargs, method, mocker, uri_bytes): request = Request(uri_bytes, {}, "1.1", method, None, app) await app.handle_request(request, lambda r: responses.append(r), None) assert responses == [ - dict(client=client, q={"path": ":memory:"}, metadata_headers={}, **kwargs) + dict(client=client, q=Q.return_value, metadata_headers={}, **kwargs) + ] + assert Q.mock_calls == [ + mock.call(path=':memory:', auto_resume=False), + mock.call().resume_unack_tasks(), + mock.call()._count() ]