@@ -25,6 +25,7 @@ def get_args():
25
25
parser .add_argument ("--int_to_float" , action = 'store_true' , default = False , help = "Cast int to float" )
26
26
parser .add_argument ("--path_output" , type = str , help = "Path output" )
27
27
parser .add_argument ("--remove_null" , action = 'store_true' , default = False , help = "Remove null values (kept by default)" )
28
+ parser .add_argument ("--is_json" , action = 'store_true' , default = False , help = "Indicate if input file is a json" )
28
29
29
30
args = parser .parse_args ()
30
31
return args
@@ -190,7 +191,78 @@ def update_columns_list(columns_list, json_list, sep, int_to_float, remove_null)
190
191
return columns_list
191
192
192
193
193
- def get_columns (list_data_paths , sep , logger , int_to_float , remove_null ):
194
+ def read_jsons_chunks (file_object , chunk_size = 10000 ):
195
+ """Lazy function to read a json by chunk.
196
+ Default chunk size: 10k"""
197
+
198
+ # Parse the next real chunk_size lines
199
+ chunk = file_object .read (1000000 )
200
+ data = []
201
+ i = 0
202
+ nb_bracket = 0
203
+ nb_quotes = 0
204
+ example = ""
205
+ count_escape_char = 0
206
+ while True :
207
+ # Read cahracter by character
208
+ for k , c in enumerate (chunk ):
209
+ # Check quoting
210
+ if c == '"' :
211
+ # Check only when '"' is a delimiter of field or value in json
212
+ if count_escape_char % 2 == 0 :
213
+ nb_quotes += 1
214
+ # Check beginning of brackets
215
+ elif c == '{' and nb_quotes % 2 == 0 :
216
+ # Check only when '{' is a delimiter of field or value in json
217
+ if count_escape_char % 2 == 0 :
218
+ nb_bracket += 1
219
+ # Check ending of brackets
220
+ elif c == '}' and nb_quotes % 2 == 0 :
221
+ # Check only when '"' is a delimiter of field or value in json
222
+ if count_escape_char % 2 == 0 :
223
+ nb_bracket -= 1
224
+ # This means we finished to read one json
225
+ if nb_bracket == 0 and nb_quotes % 2 == 0 :
226
+ example += c
227
+ data .append (json .loads (example ))
228
+ i += 1
229
+ # When chunk_size jsons obtained, dump those
230
+ if i % chunk_size == 0 :
231
+ yield (data )
232
+ data = []
233
+
234
+ # Initialize those
235
+ example = ""
236
+ continue
237
+ # If we are in between 2 json examples or at the beginning
238
+ elif c in ['[' , ',' , '\n ' ] and nb_bracket == 0 and nb_quotes % 2 == 0 :
239
+ continue
240
+ # If we are at the end of the file
241
+ if c in [']' , '' ] and nb_bracket == 0 and nb_quotes % 2 == 0 :
242
+ # If EOF obtained or end of jsonarray send what's left of the data
243
+ if example == "" or example == "]" :
244
+ yield (data )
245
+ return
246
+ if c == "\\ " :
247
+ count_escape_char += 1
248
+ else :
249
+ count_escape_char = 0
250
+ # Append character to the json example
251
+ example += c
252
+
253
+ # If at the end of the chunk, read new chunk
254
+ if k == len (chunk ) - 1 :
255
+ chunk = file_object .read (1000000 )
256
+ # Keep what's left of the chunk
257
+ elif len (chunk ) != 0 :
258
+ chunk = chunk [k :]
259
+ # if k == 0 that means that we read the whole file
260
+ else :
261
+ break
262
+
263
+
264
+
265
+ def get_columns (list_data_paths , sep , logger , int_to_float , remove_null , is_json ):
194
266
"""
195
267
Get the columns created accordingly to a list of files containing json
196
268
@@ -199,28 +271,48 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null):
199
271
:param logger: logger (used to print)
200
272
:param int_to_float: if set to true int will be casted to float
201
273
:param remove_null: if set to true, will remove_null from json arrays
274
+ :param is_json: if set to true, inputs are considered as valid json
202
275
203
276
:return: Exhaustive list of columns
204
277
"""
205
278
206
279
columns_list = []
207
280
208
281
j = 0
282
+ chunk_size = 50000
209
283
for data_file in list_data_paths :
210
284
logger .info (data_file )
211
285
json_list = []
212
- with open (data_file ) as f :
213
- for i , line in enumerate (f ):
214
- j += 1
215
- if (j % 500000 == 0 ):
286
+ # If we deal with json (or json array) file
287
+ if is_json :
288
+ f = open (data_file )
289
+ # Read json file by chunk
290
+ for x in read_jsons_chunks (f , chunk_size = chunk_size ):
291
+ if j != 0 and (j % chunk_size == 0 ):
216
292
columns_list = update_columns_list (columns_list , json_list , sep , int_to_float , remove_null )
217
293
logger .info ('Iteration ' + str (j ) + ': Updating columns ===> ' + str (len (columns_list )) + ' columns found' )
218
294
json_list = []
219
295
try :
220
- json_list .append (json .loads (line ))
296
+ json_list .extend (x )
297
+ # Maximum of chunk_size elements were added
298
+ j += chunk_size
221
299
except :
222
- logger .info ("Json in line " + str (i ) + " (in file: " + data_file + ") does not seem well formed. Example was skipped" )
300
+ logger .info ("Json in line " + str (j ) + " (in file: " + data_file + ") does not seem well formed. Example was skipped" )
223
301
continue
302
+ # If we deal with ljson
303
+ else :
304
+ with open (data_file ) as f :
305
+ for i , line in enumerate (f ):
306
+ j += 1
307
+ if (j % 50000 == 0 ):
308
+ columns_list = update_columns_list (columns_list , json_list , sep , int_to_float , remove_null )
309
+ logger .info ('Iteration ' + str (j ) + ': Updating columns ===> ' + str (len (columns_list )) + ' columns found' )
310
+ json_list = []
311
+ try :
312
+ json_list .append (json .loads (line ))
313
+ except :
314
+ logger .info ("Json in line " + str (i ) + " (in file: " + data_file + ") does not seem well formed. Example was skipped" )
315
+ continue
224
316
# A quicker solution would be to join directly to create a valid json
225
317
if (len (json_list ) > 0 ):
226
318
columns_list = update_columns_list (columns_list , json_list , sep , int_to_float , remove_null )
@@ -231,7 +323,7 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null):
231
323
return columns_list
232
324
233
325
234
- def get_dataframe (list_data_paths , columns = None , path_csv = None , logger = None , sep = '.' , int_to_float = False , remove_null = False ):
326
+ def get_dataframe (list_data_paths , columns = None , path_csv = None , logger = None , sep = '.' , int_to_float = False , remove_null = False , is_json = False ):
235
327
"""
236
328
Get dataframe from files containing one json per line
237
329
@@ -242,30 +334,52 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep
242
334
:param sep: separator to use when creating columns' names
243
335
:param int_to_float: if set to true int will be casted to float
244
336
:param remove_null: if set to true, will remove_null from json arrays
337
+ :param is_json: if set to true, inputs are considered as valid json
245
338
246
339
:return: dataframe or nothing if the dataframe is generated while streaming the files
247
340
"""
248
341
249
342
json_list = []
250
343
j = 0
344
+ chunk_size = 50000
251
345
for data_file in list_data_paths :
252
346
logger .info (data_file )
253
- with open (data_file ) as f :
254
- for i , line in enumerate (f ):
255
- j += 1
256
- if (j % 500000 == 0 ):
347
+ json_list = []
348
+ # If we deal with json (or json array) file
349
+ if is_json :
350
+ f = open (data_file )
351
+ # Read json file by chunk
352
+ for x in read_jsons_chunks (f , chunk_size = chunk_size ):
353
+ if j != 0 and (j % chunk_size == 0 ):
257
354
logger .info ('Iteration ' + str (j ) + ': Creating sub dataframe' )
258
355
if columns :
259
356
update_csv (path_csv , json_list , columns , sep , int_to_float , remove_null )
260
- json_list .clear ()
261
-
262
- if (j % 100000 == 0 ):
263
- logger .info (str (i ) + ' documents processed' )
357
+ json_list = []
264
358
try :
265
- json_list .append (json .loads (line ))
359
+ json_list .extend (x )
360
+ # Maximum of chunk_size elements were added
361
+ j += chunk_size # -1 because we add 1 at the beginning of the loop
266
362
except :
267
363
logger .info ("Json in line " + str (i ) + " (in file: " + data_file + ") does not seem well formed. Example was skipped" )
268
364
continue
365
+ # If we deal with ljson
366
+ else :
367
+ with open (data_file ) as f :
368
+ for i , line in enumerate (f ):
369
+ j += 1
370
+ if (j % 50000 == 0 ):
371
+ logger .info ('Iteration ' + str (j ) + ': Creating sub dataframe' )
372
+ if columns :
373
+ update_csv (path_csv , json_list , columns , sep , int_to_float , remove_null )
374
+ json_list .clear ()
375
+
376
+ if (j % 100000 == 0 ):
377
+ logger .info (str (i ) + ' documents processed' )
378
+ try :
379
+ json_list .append (json .loads (line ))
380
+ except :
381
+ logger .info ("Json in line " + str (i ) + " (in file: " + data_file + ") does not seem well formed. Example was skipped" )
382
+ continue
269
383
270
384
# A quicker solution would be to join directly to create a valid json
271
385
logger .info ('Convert to DataFrame' )
@@ -313,10 +427,10 @@ def main():
313
427
logger .info ("Reading " + opt .path_data_jsonperline )
314
428
data = [opt .path_data_jsonperline ]
315
429
316
- # Get list of columns if not in streaming
430
+ # Get list of columns if in streaming
317
431
columns_list = None
318
432
if opt .streaming :
319
- columns_list = get_columns (data , opt .sep , logger , opt .int_to_float , opt .remove_null )
433
+ columns_list = get_columns (data , opt .sep , logger , opt .int_to_float , opt .remove_null , opt . is_json )
320
434
# Sort columns in alphabetical order
321
435
columns_list .sort ()
322
436
df = pd .DataFrame (columns = columns_list )
@@ -326,7 +440,7 @@ def main():
326
440
df .to_csv (opt .path_output , encoding = "utf-8" , index = None , quoting = 1 )
327
441
328
442
# Get dataframe
329
- df = get_dataframe (data , columns = columns_list , path_csv = opt .path_output , logger = logger , sep = opt .sep , int_to_float = opt .int_to_float , remove_null = opt .remove_null )
443
+ df = get_dataframe (data , columns = columns_list , path_csv = opt .path_output , logger = logger , sep = opt .sep , int_to_float = opt .int_to_float , remove_null = opt .remove_null , is_json = opt . is_json )
330
444
331
445
if not opt .streaming :
332
446
logger .info ("saving data to " + opt .path_output )
0 commit comments