Skip to content

Commit 288e0b3

Browse files
committed
Draft
1 parent af01e50 commit 288e0b3

File tree

5 files changed

+104
-46
lines changed

5 files changed

+104
-46
lines changed

src/apify/storage_clients/_apify/_request_queue_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
from ._request_queue_shared_client import _ApifyRequestQueueSharedClient
1616
from ._request_queue_single_client import _ApifyRequestQueueSingleClient
1717
from ._utils import AliasResolver
18+
from crawlee import Request
1819

1920
if TYPE_CHECKING:
2021
from collections.abc import Sequence
2122

2223
from apify_client.clients import RequestQueueClientAsync
23-
from crawlee import Request
24+
2425

2526
from apify import Configuration
2627

src/apify/storage_clients/_apify/_request_queue_single_client.py

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1111

1212
from apify import Request
13-
from apify.storage_clients._apify._utils import unique_key_to_request_id
13+
from apify.storage_clients._apify._utils import unique_key_to_request_id, _Request
1414

1515
if TYPE_CHECKING:
1616
from collections.abc import Sequence
@@ -101,29 +101,36 @@ async def add_batch_of_requests(
101101
await self._init_caches()
102102
self._initialized_caches = True
103103

104+
105+
104106
new_requests: list[Request] = []
105107
already_present_requests: list[ProcessedRequest] = []
106108

107109
for request in requests:
110+
# Calculate id for request
111+
_request = _Request.model_validate(request.model_dump())
112+
108113
# Check if request is known to be already handled (it has to be present as well.)
109-
if request.unique_key in self._requests_already_handled:
114+
if _request.id in self._requests_already_handled:
110115
already_present_requests.append(
111116
ProcessedRequest.model_validate(
112117
{
113-
'uniqueKey': request.unique_key,
118+
'id': _request.id,
119+
'uniqueKey': _request.unique_key,
114120
'wasAlreadyPresent': True,
115121
'wasAlreadyHandled': True,
116122
}
117123
)
118124
)
119125
# Check if request is known to be already present, but unhandled
120-
elif self._requests_cache.get(request.unique_key):
126+
elif self._requests_cache.get(_request.id):
121127
already_present_requests.append(
122128
ProcessedRequest.model_validate(
123129
{
124-
'uniqueKey': request.unique_key,
130+
'id': _request.id,
131+
'uniqueKey': _request.unique_key,
125132
'wasAlreadyPresent': True,
126-
'wasAlreadyHandled': request.was_already_handled,
133+
'wasAlreadyHandled': _request.was_already_handled,
127134
}
128135
)
129136
)
@@ -132,11 +139,11 @@ async def add_batch_of_requests(
132139
new_requests.append(request)
133140

134141
# Update local caches
135-
self._requests_cache[request.unique_key] = request
142+
self._requests_cache[_request.id] = request
136143
if forefront:
137-
self._head_requests.append(request.unique_key)
144+
self._head_requests.append(_request.id)
138145
else:
139-
self._head_requests.appendleft(request.unique_key)
146+
self._head_requests.appendleft(_request.id)
140147

141148
if new_requests:
142149
# Prepare requests for API by converting to dictionaries.
@@ -155,7 +162,7 @@ async def add_batch_of_requests(
155162
api_response.processed_requests.extend(already_present_requests)
156163
# Remove unprocessed requests from the cache
157164
for unprocessed_request in api_response.unprocessed_requests:
158-
self._requests_cache.pop(unprocessed_request.unique_key, None)
165+
self._requests_cache.pop(unprocessed_request.id, None)
159166

160167
else:
161168
api_response = AddRequestsResponse.model_validate(
@@ -181,10 +188,22 @@ async def get_request(self, unique_key: str) -> Request | None:
181188
Returns:
182189
The request or None if not found.
183190
"""
184-
if unique_key in self._requests_cache:
185-
return self._requests_cache[unique_key]
191+
return await self._get_request(id=unique_key_to_request_id(unique_key))
186192

187-
response = await self._api_client.get_request(unique_key_to_request_id(unique_key))
193+
async def _get_request(self, id: str) -> Request | None:
194+
"""Get a request by unique key.
195+
196+
Args:
197+
id: Id of request to get.
198+
199+
Returns:
200+
The request or None if not found.
201+
"""
202+
203+
if id in self._requests_cache:
204+
return self._requests_cache[id]
205+
206+
response = await self._api_client.get_request(id)
188207

189208
if response is None:
190209
return None
@@ -205,13 +224,13 @@ async def fetch_next_request(self) -> Request | None:
205224
await self._ensure_head_is_non_empty()
206225

207226
while self._head_requests:
208-
request_unique_key = self._head_requests.pop()
227+
request_id = self._head_requests.pop()
209228
if (
210-
request_unique_key not in self._requests_in_progress
211-
and request_unique_key not in self._requests_already_handled
229+
request_id not in self._requests_in_progress
230+
and request_id not in self._requests_already_handled
212231
):
213-
self._requests_in_progress.add(request_unique_key)
214-
return await self.get_request(request_unique_key)
232+
self._requests_in_progress.add(request_id)
233+
return await self._get_request(request_id)
215234
# No request locally and the ones returned from the platform are already in progress.
216235
return None
217236

@@ -236,31 +255,24 @@ async def _list_head(self) -> None:
236255

237256
# Update the cached data
238257
for request_data in response.get('items', []):
239-
request = Request.model_validate(request_data)
258+
request = _Request.model_validate(request_data)
240259

241-
if request.unique_key in self._requests_in_progress:
260+
if request.id in self._requests_in_progress:
242261
# Ignore requests that are already in progress, we will not process them again.
243262
continue
244263
if request.was_already_handled:
245264
# Do not cache fully handled requests, we do not need them. Just cache their unique_key.
246-
self._requests_already_handled.add(request.unique_key)
265+
self._requests_already_handled.add(request.id)
247266
else:
248267
# Only fetch the request if we do not know it yet.
249-
if request.unique_key not in self._requests_cache:
250-
request_id = unique_key_to_request_id(request.unique_key)
251-
complete_request_data = await self._api_client.get_request(request_id)
252-
253-
if complete_request_data is not None:
254-
request = Request.model_validate(complete_request_data)
255-
self._requests_cache[request.unique_key] = request
256-
else:
257-
logger.warning(
258-
f'Could not fetch request data for unique_key=`{request.unique_key}` (id=`{request_id}`)'
259-
)
268+
if request.id not in self._requests_cache:
269+
complete_request_data = await self._api_client.get_request(request_data["id"])
270+
request = Request.model_validate(complete_request_data)
271+
self._requests_cache[request.id] = request
260272

261273
# Add new requests to the end of the head, unless already present in head
262-
if request.unique_key not in self._head_requests:
263-
self._head_requests.appendleft(request.unique_key)
274+
if request.id not in self._head_requests:
275+
self._head_requests.appendleft(request.id)
264276

265277
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
266278
"""Mark a request as handled after successful processing.
@@ -275,12 +287,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
275287
"""
276288
# Set the handled_at timestamp if not already set
277289

290+
_request = _Request.model_validate(request.model_dump())
291+
278292
if request.handled_at is None:
279293
request.handled_at = datetime.now(tz=timezone.utc)
280294
self.metadata.handled_request_count += 1
281295
self.metadata.pending_request_count -= 1
282296

283-
if cached_request := self._requests_cache.get(request.unique_key):
297+
if cached_request := self._requests_cache.get(_request.id):
284298
cached_request.handled_at = request.handled_at
285299

286300
try:
@@ -289,13 +303,13 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
289303
# adding to the queue.)
290304
processed_request = await self._update_request(request)
291305
# Remember that we handled this request, to optimize local deduplication.
292-
self._requests_already_handled.add(request.unique_key)
306+
self._requests_already_handled.add(_request.id)
293307
# Remove request from cache. It will most likely not be needed.
294-
self._requests_cache.pop(request.unique_key)
295-
self._requests_in_progress.discard(request.unique_key)
308+
self._requests_cache.pop(_request.id)
309+
self._requests_in_progress.discard(_request.id)
296310

297311
except Exception as exc:
298-
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
312+
logger.debug(f'Error marking request {_request.unique_key} as handled: {exc!s}')
299313
return None
300314
else:
301315
return processed_request
@@ -319,23 +333,27 @@ async def reclaim_request(
319333
"""
320334
# Check if the request was marked as handled and clear it. When reclaiming,
321335
# we want to put the request back for processing.
336+
337+
_request = _Request.model_validate(request.model_dump())
338+
322339
if request.was_already_handled:
323340
request.handled_at = None
324341

325342
try:
326343
# Make sure request is in the local cache. We might need it.
327-
self._requests_cache[request.unique_key] = request
344+
self._requests_cache[_request.id] = request
328345

329346
# No longer in progress
330-
self._requests_in_progress.discard(request.unique_key)
347+
self._requests_in_progress.discard(_request.id)
331348
# No longer handled
332-
self._requests_already_handled.discard(request.unique_key)
349+
self._requests_already_handled.discard(_request.id)
333350

334351
if forefront:
335352
# Append to top of the local head estimation
336-
self._head_requests.append(request.unique_key)
353+
self._head_requests.append(_request.id)
337354

338355
processed_request = await self._update_request(request, forefront=forefront)
356+
processed_request.id = _request.id
339357
processed_request.unique_key = request.unique_key
340358
# If the request was previously handled, decrement our handled count since
341359
# we're putting it back for processing.
@@ -396,7 +414,7 @@ async def _init_caches(self) -> None:
396414
"""
397415
response = await self._api_client.list_requests(limit=10_000)
398416
for request_data in response.get('items', []):
399-
request = Request.model_validate(request_data)
417+
request = _Request.model_validate(request_data)
400418
if request.was_already_handled:
401419
# Cache just unique_key for deduplication
402420
self._requests_already_handled.add(request.unique_key)

src/apify/storage_clients/_apify/_utils.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
from base64 import b64encode
77
from hashlib import sha256
88
from logging import getLogger
9-
from typing import TYPE_CHECKING, ClassVar
9+
from typing import TYPE_CHECKING, ClassVar, Annotated
1010

1111
from apify_client import ApifyClientAsync
12+
from crawlee import Request
1213
from crawlee._utils.crypto import compute_short_hash
14+
from pydantic import Field, model_validator
1315

1416
from apify._configuration import Configuration
1517

@@ -192,3 +194,14 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) ->
192194

193195
# Truncate the key to the desired length
194196
return url_safe_key[:request_id_length]
197+
198+
199+
class _Request(Request):
200+
201+
id: Annotated[str, Field(default="")]
202+
203+
@model_validator(mode='after')
204+
def calculate_id(self) -> _Request:
205+
if self.id == "":
206+
self.id = unique_key_to_request_id(self.unique_key)
207+
return self

tests/integration/test_actor_lifecycle.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,5 @@ async def default_handler(context: BasicCrawlingContext) -> None:
158158
run_result = await run_actor(actor)
159159

160160
assert run_result.status == 'SUCCEEDED'
161+
162+

tests/integration/test_request_queue.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from crawlee import Request, service_locator
1111
from crawlee.crawlers import BasicCrawler
1212

13+
from apify.storage_clients._apify._utils import unique_key_to_request_id
1314
from ._utils import generate_unique_resource_name
1415
from apify import Actor
1516
from apify.storage_clients import ApifyStorageClient
@@ -1189,3 +1190,26 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
11891190
assert hasattr(metadata, 'stats')
11901191
apify_metadata = cast('ApifyRequestQueueMetadata', metadata)
11911192
assert apify_metadata.stats.write_count == add_request_count
1193+
1194+
1195+
async def test_long_request(request_queue_apify: RequestQueue, apify_client_async) -> None:
1196+
1197+
request = Request.from_url(
1198+
f'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1',
1199+
use_extended_unique_key=True,
1200+
always_enqueue=True)
1201+
1202+
1203+
request_id = unique_key_to_request_id(request.unique_key)
1204+
1205+
processed_request = await request_queue_apify.add_request(request)
1206+
assert processed_request.id == request_id
1207+
1208+
request_obtained = await request_queue_apify.fetch_next_request()
1209+
assert request_obtained is not None
1210+
1211+
await request_queue_apify.mark_request_as_handled(request_obtained)
1212+
1213+
is_finished = await request_queue_apify.is_finished()
1214+
assert is_finished
1215+

0 commit comments

Comments
 (0)