Skip to content

Commit 4238086

Browse files
authored
fix: Handle truncated unique_key in list_head by fetching full request data (#631)
- Closes: #627 - cc @honzajavorek - Tmp hot fix, proper solution in follow-up issue: #630 - I also reverted the #623 changes. - And added an integration test for a long URL (with an extended unique key and always enqueued).
1 parent af01e50 commit 4238086

File tree

4 files changed

+76
-41
lines changed

4 files changed

+76
-41
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
from crawlee.storages import RequestQueue
1313

1414
from ._models import ApifyRequestQueueMetadata, RequestQueueStats
15-
from ._request_queue_shared_client import _ApifyRequestQueueSharedClient
16-
from ._request_queue_single_client import _ApifyRequestQueueSingleClient
15+
from ._request_queue_shared_client import ApifyRequestQueueSharedClient
16+
from ._request_queue_single_client import ApifyRequestQueueSingleClient
1717
from ._utils import AliasResolver
1818

1919
if TYPE_CHECKING:
@@ -47,14 +47,14 @@ def __init__(
4747
self._api_client = api_client
4848
"""The Apify request queue client for API operations."""
4949

50-
self._implementation: _ApifyRequestQueueSingleClient | _ApifyRequestQueueSharedClient
50+
self._implementation: ApifyRequestQueueSingleClient | ApifyRequestQueueSharedClient
5151
"""Internal implementation used to communicate with the Apify platform based Request Queue."""
5252
if access == 'single':
53-
self._implementation = _ApifyRequestQueueSingleClient(
53+
self._implementation = ApifyRequestQueueSingleClient(
5454
api_client=self._api_client, metadata=metadata, cache_size=self._MAX_CACHED_REQUESTS
5555
)
5656
elif access == 'shared':
57-
self._implementation = _ApifyRequestQueueSharedClient(
57+
self._implementation = ApifyRequestQueueSharedClient(
5858
api_client=self._api_client,
5959
metadata=metadata,
6060
cache_size=self._MAX_CACHED_REQUESTS,

src/apify/storage_clients/_apify/_request_queue_shared_client.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
logger = getLogger(__name__)
2424

2525

26-
class _ApifyRequestQueueSharedClient:
26+
class ApifyRequestQueueSharedClient:
2727
"""An Apify platform implementation of the request queue client.
2828
2929
This implementation supports multiple producers and multiple consumers scenario.
@@ -106,23 +106,19 @@ async def add_batch_of_requests(
106106
# It could have been handled by another client in the meantime, so cached information about
107107
# `request.was_already_handled` is not reliable.
108108
already_present_requests.append(
109-
ProcessedRequest.model_validate(
110-
{
111-
'uniqueKey': request.unique_key,
112-
'wasAlreadyPresent': True,
113-
'wasAlreadyHandled': request.was_already_handled,
114-
}
109+
ProcessedRequest(
110+
unique_key=request.unique_key,
111+
was_already_present=True,
112+
was_already_handled=request.was_already_handled,
115113
)
116114
)
117115

118116
else:
119117
# Add new request to the cache.
120-
processed_request = ProcessedRequest.model_validate(
121-
{
122-
'uniqueKey': request.unique_key,
123-
'wasAlreadyPresent': True,
124-
'wasAlreadyHandled': request.was_already_handled,
125-
}
118+
processed_request = ProcessedRequest(
119+
unique_key=request.unique_key,
120+
was_already_present=True,
121+
was_already_handled=request.was_already_handled,
126122
)
127123
self._cache_request(
128124
request.unique_key,

src/apify/storage_clients/_apify/_request_queue_single_client.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
logger = getLogger(__name__)
2222

2323

24-
class _ApifyRequestQueueSingleClient:
24+
class ApifyRequestQueueSingleClient:
2525
"""An Apify platform implementation of the request queue client with limited capability.
2626
2727
This client is designed to use as little resources as possible, but has to be used in constrained context.
@@ -108,23 +108,19 @@ async def add_batch_of_requests(
108108
# Check if request is known to be already handled (it has to be present as well.)
109109
if request.unique_key in self._requests_already_handled:
110110
already_present_requests.append(
111-
ProcessedRequest.model_validate(
112-
{
113-
'uniqueKey': request.unique_key,
114-
'wasAlreadyPresent': True,
115-
'wasAlreadyHandled': True,
116-
}
111+
ProcessedRequest(
112+
unique_key=request.unique_key,
113+
was_already_present=True,
114+
was_already_handled=True,
117115
)
118116
)
119117
# Check if request is known to be already present, but unhandled
120118
elif self._requests_cache.get(request.unique_key):
121119
already_present_requests.append(
122-
ProcessedRequest.model_validate(
123-
{
124-
'uniqueKey': request.unique_key,
125-
'wasAlreadyPresent': True,
126-
'wasAlreadyHandled': request.was_already_handled,
127-
}
120+
ProcessedRequest(
121+
unique_key=request.unique_key,
122+
was_already_present=True,
123+
was_already_handled=request.was_already_handled,
128124
)
129125
)
130126
else:
@@ -158,8 +154,9 @@ async def add_batch_of_requests(
158154
self._requests_cache.pop(unprocessed_request.unique_key, None)
159155

160156
else:
161-
api_response = AddRequestsResponse.model_validate(
162-
{'unprocessedRequests': [], 'processedRequests': already_present_requests}
157+
api_response = AddRequestsResponse(
158+
unprocessed_requests=[],
159+
processed_requests=already_present_requests,
163160
)
164161

165162
# Update assumed total count for newly added requests.
@@ -236,28 +233,41 @@ async def _list_head(self) -> None:
236233

237234
# Update the cached data
238235
for request_data in response.get('items', []):
236+
# Due to https://github.com/apify/apify-core/blob/v0.1377.0/src/api/src/lib/request_queues/request_queue.ts#L53,
237+
# the list_head endpoint may return truncated fields for long requests (e.g., long URLs or unique keys).
238+
# If truncation is detected, fetch the full request data by its ID from the API.
239+
# This is a temporary workaround - the caching will be refactored to use request IDs instead of unique keys.
240+
# See https://github.com/apify/apify-sdk-python/issues/630 for details.
241+
if '[truncated]' in request_data['uniqueKey'] or '[truncated]' in request_data['url']:
242+
request_data = await self._api_client.get_request(request_id=request_data['id']) # noqa: PLW2901
243+
239244
request = Request.model_validate(request_data)
240245

241246
if request.unique_key in self._requests_in_progress:
242247
# Ignore requests that are already in progress, we will not process them again.
243248
continue
249+
244250
if request.was_already_handled:
245251
# Do not cache fully handled requests, we do not need them. Just cache their unique_key.
246252
self._requests_already_handled.add(request.unique_key)
247253
else:
248254
# Only fetch the request if we do not know it yet.
249255
if request.unique_key not in self._requests_cache:
250256
request_id = unique_key_to_request_id(request.unique_key)
251-
complete_request_data = await self._api_client.get_request(request_id)
252257

253-
if complete_request_data is not None:
254-
request = Request.model_validate(complete_request_data)
255-
self._requests_cache[request.unique_key] = request
256-
else:
258+
if request_data is not None and request_id != request_data['id']:
257259
logger.warning(
258-
f'Could not fetch request data for unique_key=`{request.unique_key}` (id=`{request_id}`)'
260+
f'Request ID mismatch: {request_id} != {request_data["id"]}, '
261+
'this may cause unexpected behavior.'
259262
)
260263

264+
# See https://github.com/apify/apify-sdk-python/issues/630 for details.
265+
if '[truncated]' not in request.unique_key:
266+
request_data = await self._api_client.get_request(request_id=request_id) # noqa: PLW2901
267+
request = Request.model_validate(request_data)
268+
269+
self._requests_cache[request.unique_key] = request
270+
261271
# Add new requests to the end of the head, unless already present in head
262272
if request.unique_key not in self._head_requests:
263273
self._head_requests.appendleft(request.unique_key)

tests/integration/test_request_queue.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
import pytest
88

99
from apify_shared.consts import ApifyEnvVars
10-
from crawlee import Request, service_locator
10+
from crawlee import service_locator
1111
from crawlee.crawlers import BasicCrawler
1212

1313
from ._utils import generate_unique_resource_name
14-
from apify import Actor
14+
from apify import Actor, Request
1515
from apify.storage_clients import ApifyStorageClient
16+
from apify.storage_clients._apify._request_queue_shared_client import ApifyRequestQueueSharedClient
17+
from apify.storage_clients._apify._utils import unique_key_to_request_id
1618
from apify.storages import RequestQueue
1719

1820
if TYPE_CHECKING:
@@ -1189,3 +1191,30 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
11891191
assert hasattr(metadata, 'stats')
11901192
apify_metadata = cast('ApifyRequestQueueMetadata', metadata)
11911193
assert apify_metadata.stats.write_count == add_request_count
1194+
1195+
1196+
async def test_rq_long_url(request_queue_apify: RequestQueue) -> None:
1197+
# TODO: Remove the skip when issue #630 is resolved.
1198+
if isinstance(request_queue_apify._client._implementation, ApifyRequestQueueSharedClient): # type: ignore[attr-defined]
1199+
pytest.skip('Skipping for the "shared" request queue - unskip after issue #630 is resolved.')
1200+
1201+
url = 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1'
1202+
1203+
req = Request.from_url(
1204+
url=url,
1205+
use_extended_unique_key=True,
1206+
always_enqueue=True,
1207+
)
1208+
1209+
request_id = unique_key_to_request_id(req.unique_key)
1210+
1211+
processed_request = await request_queue_apify.add_request(req)
1212+
assert processed_request.id == request_id
1213+
1214+
request_obtained = await request_queue_apify.fetch_next_request()
1215+
assert request_obtained is not None
1216+
1217+
await request_queue_apify.mark_request_as_handled(request_obtained)
1218+
1219+
is_finished = await request_queue_apify.is_finished()
1220+
assert is_finished

0 commit comments

Comments
 (0)