diff --git a/plugin_scripts/insert_rows.py b/plugin_scripts/insert_rows.py index c155a7b..419e4bf 100755 --- a/plugin_scripts/insert_rows.py +++ b/plugin_scripts/insert_rows.py @@ -1,6 +1,7 @@ import json import logging import sys +from itertools import islice from google.cloud import bigquery @@ -8,6 +9,17 @@ sys.tracebacklimit = 0 +BATCH_SIZE = 20000 + + +def batched(iterable, n): + # batched('ABCDEFG', 3) → ABC DEF G + if n < 1: + raise ValueError("n must be at least one") + iterator = iter(iterable) + while batch := tuple(islice(iterator, n)): + yield batch + def insert_rows(config: Config) -> None: """ @@ -31,14 +43,22 @@ def insert_rows(config: Config) -> None: with open(config.bq_rows_as_json_path, "r") as row_file: rows = json.load(row_file) - logging.info(f"Loaded {len(rows)} rows. Inserting...") + if not isinstance(rows, list): + raise ValueError(f"Expected JSON file to be a list of rows, was: {type(rows)}") + + logging.info(f"Loaded {len(rows)} rows. Inserting in batches {BATCH_SIZE}...") + + total_errors = [] + for batch in batched(rows, BATCH_SIZE): + errors = client.insert_rows_json(table_ref, batch) + total_errors.extend(errors) errors = client.insert_rows_json(table_ref, rows) logging.info(f"Inserted rows with {len(errors)} errors") - for e in errors: + for e in total_errors: logging.error(e) - if len(errors) > 0: + if len(total_errors) > 0: raise Exception("Got exceptions on returning rows, see above.") diff --git a/tests/test_insert_rows.py b/tests/test_insert_rows.py index c29d211..18e2670 100644 --- a/tests/test_insert_rows.py +++ b/tests/test_insert_rows.py @@ -8,7 +8,7 @@ def test__main_true( ): mocker.patch("json.loads") mocker.patch("plugin_scripts.insert_rows.bigquery") - mocker.patch("json.load") + mocker.patch("json.load", return_value=[{"a": 1}, {"b": 2}]) mocker.patch("builtins.open") insert_rows.main()