Skip to content

Commit 84aec09

Browse files
committed
improve expiry performance (#2800)
- use elastic to get expired items - only search for broadcast if enabled - introduce `archive_internal` resource to avoid additional filtering
1 parent eeab185 commit 84aec09

File tree

5 files changed

+75
-23
lines changed

5 files changed

+75
-23
lines changed

apps/archive/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
from superdesk.celery_app import celery
1919

2020
from .archive import (
21+
ArchiveInternalResource,
2122
ArchiveResource,
2223
ArchiveService,
2324
ArchiveVersionsResource,
2425
ArchiveVersionsService,
2526
AutoSaveResource,
2627
ArchiveSaveService,
28+
archive_internal_service,
2729
)
2830
from .commands import RemoveExpiredContent, LOCK_EXPIRY
2931
from .ingest import IngestResource, AppIngestService
@@ -97,6 +99,9 @@ def init_app(app) -> None:
9799
service = NewsService(endpoint_name, backend=superdesk.get_backend())
98100
NewsResource(endpoint_name, app=app, service=service)
99101

102+
endpoint_name = "archive_internal"
103+
ArchiveInternalResource(endpoint_name, app=app, service=archive_internal_service)
104+
100105
from apps.item_autosave.components.item_autosave import ItemAutosave
101106
from apps.item_autosave.models.item_autosave import ItemAutosaveModel
102107

apps/archive/archive.py

+50-14
Original file line numberDiff line numberDiff line change
@@ -1178,33 +1178,47 @@ def get_expired_items(self, expiry_datetime, last_id=None, invalid_only=False):
11781178
"""
11791179
for i in range(app.config["MAX_EXPIRY_LOOPS"]): # avoid blocking forever just in case
11801180
query = {
1181-
"$and": [
1182-
{"expiry": {"$lte": expiry_datetime}},
1183-
{"$or": [{"task.desk": {"$ne": None}}, {ITEM_STATE: CONTENT_STATE.SPIKED, "task.desk": None}]},
1184-
]
1181+
"bool": {
1182+
"must": [
1183+
{"range": {"expiry": {"lte": expiry_datetime}}},
1184+
{
1185+
"bool": {
1186+
"should": [
1187+
{"exists": {"field": "task.desk"}},
1188+
{"term": {ITEM_STATE: CONTENT_STATE.SPIKED}},
1189+
],
1190+
}
1191+
},
1192+
],
1193+
"must_not": [],
1194+
}
11851195
}
11861196

11871197
if invalid_only:
1188-
query["$and"].append({"expiry_status": "invalid"})
1198+
query["bool"]["must"].append({"term": {"expiry_status": "invalid"}})
11891199
else:
1190-
query["$and"].append({"expiry_status": {"$ne": "invalid"}})
1200+
query["bool"]["must_not"].append({"term": {"expiry_status": "invalid"}})
11911201

1192-
if last_id:
1193-
query["$and"].append({"_id": {"$gt": last_id}})
1202+
if last_id: # elastic does not support range query on _id, so using guid
1203+
query["bool"]["must"].append({"range": {"guid": {"gt": last_id}}})
11941204

1195-
req = ParsedRequest()
1196-
req.sort = "_id"
1197-
req.max_results = app.config["MAX_EXPIRY_QUERY_LIMIT"]
1198-
req.where = json.dumps(query)
1205+
source = {
1206+
"query": query,
1207+
"sort": [{"guid": "asc"}, {"versioncreated": "asc"}],
1208+
"size": app.config["MAX_EXPIRY_QUERY_LIMIT"],
1209+
}
11991210

1200-
items = list(self.get_from_mongo(req=req, lookup={}))
1211+
items = list(archive_internal_service.search(source))
12011212

12021213
yield items # we need to yield the empty list too to signal it's the end
12031214

12041215
if not len(items):
12051216
break
12061217
else:
1207-
last_id = items[-1]["_id"]
1218+
try:
1219+
last_id = items[-1]["guid"]
1220+
except KeyError:
1221+
pass
12081222

12091223
else:
12101224
logger.warning("get_expired_items did not finish in %d loops", app.config["MAX_EXPIRY_LOOPS"])
@@ -1444,6 +1458,28 @@ def on_fetched_item(self, item):
14441458
return item
14451459

14461460

1461+
class ArchiveInternalResource(Resource):
1462+
"""Archive Internal Resource without additional filtering."""
1463+
1464+
schema = item_schema()
1465+
datasource = {
1466+
"source": "archive",
1467+
"search_backend": "elastic",
1468+
}
1469+
resource_methods = []
1470+
item_methods = []
1471+
versioning = False
1472+
collation = False
1473+
internal_resource = True
1474+
1475+
1476+
class ArchiveInternalService(BaseService):
1477+
pass
1478+
1479+
1480+
archive_internal_service = ArchiveInternalService("archive_internal", backend=superdesk.get_backend())
1481+
1482+
14471483
superdesk.workflow_state("in_progress")
14481484
superdesk.workflow_action(
14491485
name="save",

apps/archive/commands.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,12 @@ def _remove_expired_items(self, expiry_datetime):
125125
config_service = get_resource_service("config")
126126
archive_service = get_resource_service(ARCHIVE)
127127
published_service = get_resource_service("published")
128-
preserve_published_desks = {
129-
desk.get(config.ID_FIELD): 1
130-
for desk in get_resource_service("desks").find(where={"preserve_published_content": True})
131-
}
128+
preserve_published_desks = set(
129+
[
130+
str(desk.get(config.ID_FIELD))
131+
for desk in get_resource_service("desks").find(where={"preserve_published_content": True})
132+
]
133+
)
132134

133135
last_id = config_service.get(LAST_ID_CONFIG)
134136
if last_id:
@@ -310,20 +312,23 @@ def _can_remove_item(self, item, now, processed_item=None, preserve_published_de
310312
# Get the item references for is package
311313
item_refs = package_service.get_residrefs(item)
312314

313-
if item.get(ITEM_TYPE) in [CONTENT_TYPE.TEXT, CONTENT_TYPE.PREFORMATTED]:
315+
if item.get(ITEM_TYPE) in [CONTENT_TYPE.TEXT, CONTENT_TYPE.PREFORMATTED] and app.config.get(
316+
"BROADCAST_ENABLED", True
317+
):
314318
broadcast_items = get_resource_service("archive_broadcast").get_broadcast_items_from_master_story(item)
315319
# If master story expires then check if broadcast item is included in a package.
316320
# If included in a package then check the package expiry.
317321
item_refs.extend([broadcast_item.get(config.ID_FIELD) for broadcast_item in broadcast_items])
318322

323+
if item.get(ITEM_TYPE) in [CONTENT_TYPE.TEXT, CONTENT_TYPE.PREFORMATTED]:
319324
if item.get("rewrite_of"):
320325
item_refs.append(item.get("rewrite_of"))
321326

322327
if item.get("rewritten_by"):
323328
item_refs.append(item.get("rewritten_by"))
324329

325330
# get the list of associated item ids
326-
if item.get(ITEM_TYPE) in MEDIA_TYPES:
331+
elif item.get(ITEM_TYPE) in MEDIA_TYPES:
327332
item_refs.extend(self._get_associated_items(item))
328333

329334
# get item reference where this referred
@@ -337,7 +342,7 @@ def _can_remove_item(self, item, now, processed_item=None, preserve_published_de
337342
if (
338343
preserve_published_desks
339344
and item.get(ITEM_STATE) in {CONTENT_STATE.PUBLISHED, CONTENT_STATE.CORRECTED}
340-
and item.get("task").get("desk") in preserve_published_desks
345+
and str(item.get("task").get("desk")) in preserve_published_desks
341346
):
342347
is_expired = False
343348
reason = "Desk config"

superdesk/default_settings.py

+6
Original file line numberDiff line numberDiff line change
@@ -1140,3 +1140,9 @@ def local_to_utc_hour(hour):
11401140
#: .. versionadded:: 2.8
11411141
#:
11421142
PICTURE_METADATA_MAPPING = {}
1143+
1144+
#: Enable broadcast feature
1145+
#:
1146+
#: .. versionadded:: 2.8
1147+
#:
1148+
BROADCAST_ENABLED = strtobool(env("BROADCAST_ENABLED", "true"))

tests/archive/archive_test.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def test_get_expired_items(self):
459459
for i in range(1000):
460460
items.append(
461461
{
462-
"_id": generate_guid(type=GUID_TAG),
462+
"guid": generate_guid(type=GUID_TAG),
463463
"type": "text",
464464
"expiry": now - timedelta(days=1),
465465
"task": {"desk": "foo"},
@@ -478,7 +478,7 @@ def test_get_expired_items(self):
478478
assert 1000 == counter
479479

480480
counter = 0
481-
ids = sorted([item["_id"] for item in items])
481+
ids = sorted([item["guid"] for item in items])
482482
last_id = ids[499]
483483
for _items in service.get_expired_items(now, last_id=last_id):
484484
for item in _items:

0 commit comments

Comments
 (0)