diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index ab47cadce..64f239654 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -361,6 +361,7 @@ def copy_from_gcs( compression_type: str = "gzip", new_file_extension: str = "csv", template_table: Optional[str] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -421,6 +422,8 @@ def copy_from_gcs( template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -463,12 +466,14 @@ def copy_from_gcs( job_config=job_config, compression_type=compression_type, new_file_extension=new_file_extension, + max_timeout=max_timeout, ) else: return self._load_table_from_uri( source_uris=gcs_blob_uri, destination=table_ref, job_config=job_config, + max_timeout=max_timeout, **load_kwargs, ) except exceptions.BadRequest as e: @@ -493,6 +498,7 @@ def copy_from_gcs( job_config=job_config, compression_type=compression_type, new_file_extension=new_file_extension, + max_timeout=max_timeout, ) elif "Schema has no field" in str(e): logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file") @@ -501,6 +507,10 @@ def copy_from_gcs( else: raise e + except exceptions.DeadlineExceeded as e: + logger.error(f"Max timeout exceeded for {gcs_blob_uri.split('/')[-1]}") + raise e + def copy_large_compressed_file_from_gcs( self, gcs_blob_uri: str, @@ -519,6 +529,7 @@ def copy_large_compressed_file_from_gcs( compression_type: str = "gzip", new_file_extension: str = "csv", template_table: Optional[str] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -577,6 +588,8 @@ def copy_large_compressed_file_from_gcs( template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -621,6 +634,7 @@ def copy_large_compressed_file_from_gcs( source_uris=uncompressed_gcs_uri, destination=table_ref, job_config=job_config, + max_timeout=max_timeout, **load_kwargs, ) @@ -647,6 +661,7 @@ def copy_s3( tmp_gcs_bucket: Optional[str] = None, template_table: Optional[str] = None, job_config: Optional[LoadJobConfig] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -692,6 +707,8 @@ def copy_s3( on the BigQuery client. The function will create its own if not provided. Note if there are any conflicts between the job_config and other parameters, the job_config values are preferred. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. `Returns` Parsons Table or ``None`` @@ -724,6 +741,7 @@ def copy_s3( nullas=nullas, job_config=job_config, template_table=template_table, + max_timeout=max_timeout, **load_kwargs, ) finally: @@ -745,6 +763,7 @@ def copy( allow_jagged_rows: bool = True, quote: Optional[str] = None, schema: Optional[List[dict]] = None, + max_timeout: int = 30, **load_kwargs, ): """ @@ -774,6 +793,8 @@ def copy( template_table: str Table name to be used as the load schema. Load operation wil use the same columns and data types as the template table. + max_timeout: int + The maximum number of seconds to wait for a request before the job fails. **load_kwargs: kwargs Arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -823,6 +844,7 @@ def copy( source_uris=temp_blob_uri, destination=self.get_table_ref(table_name=table_name), job_config=job_config, + max_timeout=max_timeout, **load_kwargs, ) finally: @@ -1339,7 +1361,8 @@ def _validate_copy_inputs(self, if_exists: str, data_type: str): if data_type not in ["csv", "json"]: raise ValueError(f"Only supports csv or json files [data_type = {data_type}]") - def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwargs): + def _load_table_from_uri(self, source_uris, destination, job_config, max_timeout, + **load_kwargs): load_job = self.client.load_table_from_uri( source_uris=source_uris, destination=destination, @@ -1348,7 +1371,7 @@ def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwar ) try: - load_job.result() + load_job.result(timeout=max_timeout) return load_job except exceptions.BadRequest as e: for idx, error_ in enumerate(load_job.errors):