11import asyncio
22import json
3- from pathlib import Path
3+ import logging
44
5- from typing_extensions import override
5+ from more_itertools import flatten
6+ from typing_extensions import Self , override
67
78from crawlee ._consts import METADATA_FILENAME
9+ from crawlee .configuration import Configuration as CrawleeConfiguration
810from crawlee .storage_clients ._file_system import FileSystemKeyValueStoreClient
11+ from crawlee .storage_clients .models import KeyValueStoreRecord
912
1013from apify ._configuration import Configuration
1114
15+ logger = logging .getLogger (__name__ )
16+
1217
1318class ApifyFileSystemKeyValueStoreClient (FileSystemKeyValueStoreClient ):
1419 """Apify-specific implementation of the `FileSystemKeyValueStoreClient`.
@@ -17,23 +22,39 @@ class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient):
1722 directory, except for the metadata file and the `INPUT.json` file.
1823 """
1924
25+ @override
26+ @classmethod
27+ async def open (
28+ cls ,
29+ * ,
30+ id : str | None ,
31+ name : str | None ,
32+ alias : str | None ,
33+ configuration : CrawleeConfiguration ,
34+ ) -> Self :
35+ client = await super ().open (id = id , name = name , alias = alias , configuration = configuration )
36+
37+ await client ._sanitize_input_json_files () # noqa: SLF001 - it's okay, this is a factory method
38+
39+ return client
40+
2041 @override
2142 async def purge (self ) -> None :
2243 """Purges the key-value store by deleting all its contents.
2344
2445 It deletes all files in the key-value store directory, except for the metadata file and
2546 the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
2647 """
27- kvs_input_key = Configuration .get_global_configuration ().input_key
28-
29- # First try to find the alternative format of the input file and process it if it exists.
30- for file_path in self .path_to_kvs .glob ('*' ):
31- if file_path .name == f'{ kvs_input_key } .json' :
32- await self ._process_input_json (file_path )
48+ configuration = Configuration .get_global_configuration ()
3349
3450 async with self ._lock :
51+ files_to_keep = set (
52+ flatten ([key , f'{ key } .{ METADATA_FILENAME } ' ] for key in configuration .input_key_candidates )
53+ )
54+ files_to_keep .add (METADATA_FILENAME )
55+
3556 for file_path in self .path_to_kvs .glob ('*' ):
36- if file_path .name in { METADATA_FILENAME , kvs_input_key , f' { kvs_input_key } . { METADATA_FILENAME } ' } :
57+ if file_path .name in files_to_keep :
3758 continue
3859 if file_path .is_file ():
3960 await asyncio .to_thread (file_path .unlink , missing_ok = True )
@@ -43,15 +64,40 @@ async def purge(self) -> None:
4364 update_modified_at = True ,
4465 )
4566
46- async def _process_input_json (self , path : Path ) -> None :
47- """Process simple input json file to format expected by the FileSystemKeyValueStoreClient.
67+ async def _sanitize_input_json_files (self ) -> None :
68+ """Handle missing metadata for input files."""
69+ configuration = Configuration .get_global_configuration ()
70+ alternative_keys = configuration .input_key_candidates - {configuration .canonical_input_key }
4871
49- For example: INPUT.json -> INPUT, INPUT.json.metadata
50- """
51- try :
52- f = await asyncio .to_thread (path .open )
53- input_data = json .load (f )
54- finally :
55- f .close ()
56- await asyncio .to_thread (path .unlink , missing_ok = True )
57- await self .set_value (key = path .stem , value = input_data )
72+ if (self .path_to_kvs / configuration .canonical_input_key ).exists ():
73+ # Refresh metadata to prevent inconsistencies
74+ input_data = await asyncio .to_thread (
75+ lambda : json .loads ((self .path_to_kvs / configuration .canonical_input_key ).read_text ())
76+ )
77+ await self .set_value (key = configuration .canonical_input_key , value = input_data )
78+
79+ for alternative_key in alternative_keys :
80+ if (alternative_input_file := self .path_to_kvs / alternative_key ).exists ():
81+ logger .warning (f'Redundant input file found: { alternative_input_file } ' )
82+ else :
83+ for alternative_key in alternative_keys :
84+ alternative_input_file = self .path_to_kvs / alternative_key
85+
86+ # Only process files that actually exist
87+ if alternative_input_file .exists ():
88+ # Refresh metadata to prevent inconsistencies
89+ with alternative_input_file .open () as f :
90+ input_data = await asyncio .to_thread (lambda : json .load (f ))
91+ await self .set_value (key = alternative_key , value = input_data )
92+
93+ @override
94+ async def get_value (self , * , key : str ) -> KeyValueStoreRecord | None :
95+ configuration = Configuration .get_global_configuration ()
96+
97+ if key in configuration .input_key_candidates :
98+ for candidate in configuration .input_key_candidates :
99+ value = await super ().get_value (key = candidate )
100+ if value is not None :
101+ return value
102+
103+ return await super ().get_value (key = key )
0 commit comments