From 1e5e9e5a7f55757f9d0fddd35b8480e0af8e682e Mon Sep 17 00:00:00 2001 From: Wojciech Chmiel Date: Tue, 23 Jul 2024 12:53:52 +0200 Subject: [PATCH] Add batching, to not go over 10MB size --- plugin_scripts/insert_rows.py | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/plugin_scripts/insert_rows.py b/plugin_scripts/insert_rows.py index c155a7b..57f883c 100755 --- a/plugin_scripts/insert_rows.py +++ b/plugin_scripts/insert_rows.py @@ -1,13 +1,25 @@ import json import logging import sys +from itertools import islice from google.cloud import bigquery + from .config import Config, read_config sys.tracebacklimit = 0 +BATCH_SIZE = 20000 + +def batched(iterable, n): + # batched('ABCDEFG', 3) → ABC DEF G + if n < 1: + raise ValueError('n must be at least one') + iterator = iter(iterable) + while batch := tuple(islice(iterator, n)): + yield batch + def insert_rows(config: Config) -> None: """ @@ -31,14 +43,24 @@ def insert_rows(config: Config) -> None: with open(config.bq_rows_as_json_path, "r") as row_file: rows = json.load(row_file) - logging.info(f"Loaded {len(rows)} rows. Inserting...") + if not isinstance(rows, list): + raise ValueError(f"Expected JSON file to be a list of rows, was: {type(rows)}") + + + + logging.info(f"Loaded {len(rows)} rows. Inserting in batches {BATCH_SIZE}...") + + total_errors = [] + for batch in batched(rows, BATCH_SIZE): + errors = client.insert_rows_json(table_ref, batch) + total_errors.extend(errors) errors = client.insert_rows_json(table_ref, rows) logging.info(f"Inserted rows with {len(errors)} errors") - for e in errors: + for e in total_errors: logging.error(e) - if len(errors) > 0: + if len(total_errors) > 0: raise Exception("Got exceptions on returning rows, see above.")