Skip to content

Commit

Permalink
Refactor and improve documentation grammar (#814)
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Jan 9, 2024
1 parent 618659d commit 5ec87a6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 92 deletions.
4 changes: 2 additions & 2 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dlt.destinations.impl.duckdb.factory import duckdb
from dlt.destinations.impl.dummy.factory import dummy
from dlt.destinations.impl.mssql.factory import mssql
from dlt.destinations.impl.bigquery.factory import bigquery
from dlt.destinations.impl.bigquery.factory import BigQuery
from dlt.destinations.impl.athena.factory import athena
from dlt.destinations.impl.redshift.factory import redshift
from dlt.destinations.impl.qdrant.factory import qdrant
Expand All @@ -19,7 +19,7 @@
"duckdb",
"dummy",
"mssql",
"bigquery",
"BigQuery",
"athena",
"redshift",
"qdrant",
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/impl/bigquery/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Loader account setup

1. Create new services account, add private key to it and download the `services.json` file
1. Create a new services account, add private key to it and download the `services.json` file
2. Make sure that this newly created account has access to BigQuery API
3. You must add following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API)
3. You must add the following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API)
4. IAM to add roles is here https://console.cloud.google.com/iam-admin/iam?project=chat-analytics-rasa-ci
2 changes: 1 addition & 1 deletion dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient


class bigquery(Destination[BigQueryClientConfiguration, "BigQueryClient"]):
class BigQuery(Destination[BigQueryClientConfiguration, "BigQueryClient"]):
spec = BigQueryClientConfiguration

def capabilities(self) -> DestinationCapabilitiesContext:
Expand Down
162 changes: 75 additions & 87 deletions dlt/destinations/impl/bigquery/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
from contextlib import contextmanager
from typing import Any, AnyStr, ClassVar, Iterator, List, Optional, Sequence, Type
from typing import Any, AnyStr, ClassVar, Iterator, List, Optional, Sequence

import google.cloud.bigquery as bigquery # noqa: I250
from google.api_core import exceptions as api_core_exceptions
from google.cloud import exceptions as gcp_exceptions
from google.cloud.bigquery import dbapi as bq_dbapi
from google.cloud.bigquery.dbapi import Connection as DbApiConnection, Cursor as BQDbApiCursor
from google.cloud import exceptions as gcp_exceptions
from google.cloud.bigquery.dbapi import exceptions as dbapi_exceptions
from google.api_core import exceptions as api_core_exceptions

from dlt.common.configuration.specs import GcpServiceAccountCredentialsWithoutDefaults
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.typing import StrAny

from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame
from dlt.destinations.exceptions import (
DatabaseTerminalException,
DatabaseTransientException,
DatabaseUndefinedRelation,
)
from dlt.destinations.impl.bigquery import capabilities
from dlt.destinations.sql_client import (
DBApiCursorImpl,
SqlClientBase,
raise_database_error,
raise_open_connection_error,
)
from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame

from dlt.destinations.impl.bigquery import capabilities

# terminal reasons as returned in BQ gRPC error response
# https://cloud.google.com/bigquery/docs/error-messages
Expand All @@ -38,7 +37,7 @@
"stopped",
"tableUnavailable",
]
# invalidQuery is an transient error -> must be fixed by programmer
# invalidQuery is a transient error -> must be fixed by programmer


class BigQueryDBApiCursorImpl(DBApiCursorImpl):
Expand All @@ -47,16 +46,15 @@ class BigQueryDBApiCursorImpl(DBApiCursorImpl):
native_cursor: BQDbApiCursor # type: ignore

def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
if chunk_size is not None:
return super().df(chunk_size=chunk_size)
query_job: bigquery.QueryJob = self.native_cursor._query_job

if chunk_size is None:
try:
return query_job.to_dataframe(**kwargs)
except ValueError:
# no pyarrow/db-types, fallback to our implementation
return super().df()
else:
return super().df(chunk_size=chunk_size)
try:
return query_job.to_dataframe(**kwargs)
except ValueError:
# no pyarrow/db-types, fallback to our implementation
return super().df()


class BigQuerySqlClient(SqlClientBase[bigquery.Client], DBTransaction):
Expand Down Expand Up @@ -116,34 +114,32 @@ def close_connection(self) -> None:
@raise_database_error
def begin_transaction(self) -> Iterator[DBTransaction]:
try:
# start the transaction if not yet started
if not self._session_query:
job = self._client.query(
"BEGIN TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=True,
default_dataset=self.fully_qualified_dataset_name(escape=False),
),
)
self._session_query = bigquery.QueryJobConfig(
create_session=False,
default_dataset=self.fully_qualified_dataset_name(escape=False),
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=job.session_info.session_id
)
],
)
try:
job.result()
except Exception:
# if session creation fails
self._session_query = None
raise
else:
if self._session_query:
raise dbapi_exceptions.ProgrammingError(
"Nested transactions not supported on BigQuery"
)
job = self._client.query(
"BEGIN TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=True,
default_dataset=self.fully_qualified_dataset_name(escape=False),
),
)
self._session_query = bigquery.QueryJobConfig(
create_session=False,
default_dataset=self.fully_qualified_dataset_name(escape=False),
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=job.session_info.session_id
)
],
)
try:
job.result()
except Exception:
# if session creation fails
self._session_query = None
raise
yield self
self.commit_transaction()
except Exception:
Expand All @@ -152,7 +148,7 @@ def begin_transaction(self) -> Iterator[DBTransaction]:

def commit_transaction(self) -> None:
if not self._session_query:
# allow to commit without transaction
# allow committing without transaction
return
self.execute_sql("COMMIT TRANSACTION;CALL BQ.ABORT_SESSION();")
self._session_query = None
Expand All @@ -179,12 +175,7 @@ def has_dataset(self) -> bool:
return False

def create_dataset(self) -> None:
self._client.create_dataset(
self.fully_qualified_dataset_name(escape=False),
exists_ok=False,
retry=self._default_retry,
timeout=self.http_timeout,
)
self._client.create_dataset(self.fully_qualified_dataset_name(escape=False), retry=self._default_retry, timeout=self.http_timeout)

def drop_dataset(self) -> None:
self._client.delete_dataset(
Expand All @@ -201,21 +192,18 @@ def execute_sql(
with self.execute_query(sql, *args, **kwargs) as curr:
if not curr.description:
return None
else:
try:
f = curr.fetchall()
return f
except api_core_exceptions.InvalidArgument as ia_ex:
if "non-table entities cannot be read" in str(ia_ex):
return None
raise
try:
return curr.fetchall()
except api_core_exceptions.InvalidArgument as ia_ex:
if "non-table entities cannot be read" in str(ia_ex):
return None
raise

@contextmanager
@raise_database_error
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
conn: DbApiConnection = None
curr: DBApiCursor = None
db_args = args if args else kwargs if kwargs else None
db_args = args or (kwargs or None)
try:
conn = DbApiConnection(client=self._client)
curr = conn.cursor()
Expand All @@ -238,37 +226,37 @@ def fully_qualified_dataset_name(self, escape: bool = True) -> str:

@classmethod
def _make_database_exception(cls, ex: Exception) -> Exception:
if cls.is_dbapi_exception(ex):
# google cloud exception in first argument: https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/dbapi/cursor.py#L205
cloud_ex = ex.args[0]
reason = cls._get_reason_from_errors(cloud_ex)
if reason is None:
if isinstance(ex, (dbapi_exceptions.DataError, dbapi_exceptions.IntegrityError)):
return DatabaseTerminalException(ex)
elif isinstance(ex, dbapi_exceptions.ProgrammingError):
return DatabaseTransientException(ex)
if reason == "notFound":
return DatabaseUndefinedRelation(ex)
if reason == "invalidQuery" and "was not found" in str(ex) and "Dataset" in str(ex):
return DatabaseUndefinedRelation(ex)
if (
reason == "invalidQuery"
and "Not found" in str(ex)
and ("Dataset" in str(ex) or "Table" in str(ex))
):
return DatabaseUndefinedRelation(ex)
if reason == "accessDenied" and "Dataset" in str(ex) and "not exist" in str(ex):
return DatabaseUndefinedRelation(ex)
if reason == "invalidQuery" and (
"Unrecognized name" in str(ex) or "cannot be null" in str(ex)
):
# unknown column, inserting NULL into required field
return DatabaseTerminalException(ex)
if reason in BQ_TERMINAL_REASONS:
if not cls.is_dbapi_exception(ex):
return ex
# google cloud exception in first argument: https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/dbapi/cursor.py#L205
cloud_ex = ex.args[0]
reason = cls._get_reason_from_errors(cloud_ex)
if reason is None:
if isinstance(ex, (dbapi_exceptions.DataError, dbapi_exceptions.IntegrityError)):
return DatabaseTerminalException(ex)
# anything else is transient
return DatabaseTransientException(ex)
return ex
elif isinstance(ex, dbapi_exceptions.ProgrammingError):
return DatabaseTransientException(ex)
if reason == "notFound":
return DatabaseUndefinedRelation(ex)
if reason == "invalidQuery" and "was not found" in str(ex) and "Dataset" in str(ex):
return DatabaseUndefinedRelation(ex)
if (
reason == "invalidQuery"
and "Not found" in str(ex)
and ("Dataset" in str(ex) or "Table" in str(ex))
):
return DatabaseUndefinedRelation(ex)
if reason == "accessDenied" and "Dataset" in str(ex) and "not exist" in str(ex):
return DatabaseUndefinedRelation(ex)
if reason == "invalidQuery" and (
"Unrecognized name" in str(ex) or "cannot be null" in str(ex)
):
# unknown column, inserting NULL into required field
return DatabaseTerminalException(ex)
if reason in BQ_TERMINAL_REASONS:
return DatabaseTerminalException(ex)
# anything else is transient
return DatabaseTransientException(ex)

@staticmethod
def _get_reason_from_errors(gace: api_core_exceptions.GoogleAPICallError) -> Optional[str]:
Expand Down

0 comments on commit 5ec87a6

Please sign in to comment.