diff --git a/dlt/common/runtime/collector.py b/dlt/common/runtime/collector.py index eec379564c..e478d713b2 100644 --- a/dlt/common/runtime/collector.py +++ b/dlt/common/runtime/collector.py @@ -194,8 +194,8 @@ def dump_counters(self) -> None: elapsed_time = current_time - info.start_time items_per_second = (count / elapsed_time) if elapsed_time > 0 else 0 - progress = f"{count}/{info.total}" if info.total is not None else f"{count}" - percentage = f"({count / info.total * 100:.1f}%)" if info.total is not None else "" + progress = f"{count}/{info.total}" if info.total else f"{count}" + percentage = f"({count / info.total * 100:.1f}%)" if info.total else "" elapsed_time_str = f"{elapsed_time:.2f}s" items_per_second_str = f"{items_per_second:.2f}/s" message = f"[{self.messages[name]}]" if self.messages[name] is not None else "" diff --git a/dlt/destinations/impl/bigquery/README.md b/dlt/destinations/impl/bigquery/README.md index 47c54a690a..d949323a5b 100644 --- a/dlt/destinations/impl/bigquery/README.md +++ b/dlt/destinations/impl/bigquery/README.md @@ -1,6 +1,6 @@ # Loader account setup -1. Create new services account, add private key to it and download the `services.json` file +1. Create a new services account, add private key to it and download the `services.json` file 2. Make sure that this newly created account has access to BigQuery API -3. You must add following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API) +3. You must add the following roles to the account above: `BigQuery Data Editor`, `BigQuey Job User` and `BigQuery Read Session User` (storage API) 4. IAM to add roles is here https://console.cloud.google.com/iam-admin/iam?project=chat-analytics-rasa-ci \ No newline at end of file diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index fa4f5f0419..254184b96d 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -1,9 +1,10 @@ import os from pathlib import Path -from typing import ClassVar, Dict, Optional, Sequence, Tuple, List, cast, Type, Any +from typing import ClassVar, Optional, Sequence, Tuple, List, cast, Any + import google.cloud.bigquery as bigquery # noqa: I250 -from google.cloud import exceptions as gcp_exceptions from google.api_core import exceptions as api_core_exceptions +from google.cloud import exceptions as gcp_exceptions from dlt.common import json, logger from dlt.common.destination import DestinationCapabilitiesContext @@ -14,30 +15,26 @@ LoadJob, SupportsStagingDestination, ) -from dlt.common.data_types import TDataType -from dlt.common.storages.file_storage import FileStorage from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns -from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat from dlt.common.schema.exceptions import UnknownTableException - -from dlt.destinations.job_client_impl import SqlJobClientWithStaging +from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat +from dlt.common.schema.utils import table_schema_has_type +from dlt.common.storages.file_storage import FileStorage from dlt.destinations.exceptions import ( DestinationSchemaWillNotUpdate, DestinationTransientException, LoadJobNotExistsException, LoadJobTerminalException, ) - from dlt.destinations.impl.bigquery import capabilities from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS -from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams +from dlt.destinations.job_client_impl import SqlJobClientWithStaging from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams from dlt.destinations.type_mapping import TypeMapper -from dlt.common.schema.utils import table_schema_has_type - class BigQueryTypeMapper(TypeMapper): sct_to_unbound_dbt = { @@ -49,7 +46,7 @@ class BigQueryTypeMapper(TypeMapper): "timestamp": "TIMESTAMP", "bigint": "INTEGER", "binary": "BYTES", - "wei": "BIGNUMERIC", # non parametrized should hold wei values + "wei": "BIGNUMERIC", # non-parametrized should hold wei values "time": "TIME", } @@ -73,12 +70,12 @@ class BigQueryTypeMapper(TypeMapper): "TIME": "time", } + # noinspection PyTypeChecker,PydanticTypeChecker def from_db_type( self, db_type: str, precision: Optional[int], scale: Optional[int] ) -> TColumnType: - if db_type == "BIGNUMERIC": - if precision is None: # biggest numeric possible - return dict(data_type="wei") + if db_type == "BIGNUMERIC" and precision is None: + return dict(data_type="wei") return super().from_db_type(db_type, precision, scale) @@ -96,29 +93,24 @@ def __init__( super().__init__(file_name) def state(self) -> TLoadJobState: - # check server if done - done = self.bq_load_job.done(retry=self.default_retry, timeout=self.http_timeout) - if done: - # rows processed - if self.bq_load_job.output_rows is not None and self.bq_load_job.error_result is None: - return "completed" - else: - reason = self.bq_load_job.error_result.get("reason") - if reason in BQ_TERMINAL_REASONS: - # the job permanently failed for the reason above - return "failed" - elif reason in ["internalError"]: - logger.warning( - f"Got reason {reason} for job {self.file_name}, job considered still" - f" running. ({self.bq_load_job.error_result})" - ) - # status of the job could not be obtained, job still running - return "running" - else: - # retry on all other reasons, including `backendError` which requires retry when the job is done - return "retry" - else: + if not self.bq_load_job.done(retry=self.default_retry, timeout=self.http_timeout): + return "running" + if self.bq_load_job.output_rows is not None and self.bq_load_job.error_result is None: + return "completed" + reason = self.bq_load_job.error_result.get("reason") + if reason in BQ_TERMINAL_REASONS: + # the job permanently failed for the reason above + return "failed" + elif reason in ["internalError"]: + logger.warning( + f"Got reason {reason} for job {self.file_name}, job considered still" + f" running. ({self.bq_load_job.error_result})" + ) + # the status of the job couldn't be obtained, job still running return "running" + else: + # retry on all other reasons, including `backendError` which requires retry when the job is done + return "retry" def bigquery_job_id(self) -> str: return BigQueryLoadJob.get_job_id_from_file_path(super().file_name()) @@ -149,13 +141,11 @@ def gen_key_table_clauses( key_clauses: Sequence[str], for_delete: bool, ) -> List[str]: - # generate several clauses: BigQuery does not support OR nor unions - sql: List[str] = [] - for clause in key_clauses: - sql.append( - f"FROM {root_table_name} AS d WHERE EXISTS (SELECT 1 FROM" - f" {staging_root_table_name} AS s WHERE {clause.format(d='d', s='s')})" - ) + sql: List[str] = [ + f"FROM {root_table_name} AS d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} AS" + f" s WHERE {clause.format(d='d', s='s')})" + for clause in key_clauses + ] return sql @@ -172,10 +162,12 @@ def generate_sql( with sql_client.with_staging_dataset(staging=True): staging_table_name = sql_client.make_qualified_table_name(table["name"]) table_name = sql_client.make_qualified_table_name(table["name"]) - # drop destination table - sql.append(f"DROP TABLE IF EXISTS {table_name};") - # recreate destination table with data cloned from staging table - sql.append(f"CREATE TABLE {table_name} CLONE {staging_table_name};") + sql.extend( + ( + f"DROP TABLE IF EXISTS {table_name};", + f"CREATE TABLE {table_name} CLONE {staging_table_name};", + ) + ) return sql @@ -208,7 +200,8 @@ def _create_replace_followup_jobs( def restore_file_load(self, file_path: str) -> LoadJob: """Returns a completed SqlLoadJob or restored BigQueryLoadJob - See base class for details on SqlLoadJob. BigQueryLoadJob is restored with job id derived from `file_path` + See base class for details on SqlLoadJob. + BigQueryLoadJob is restored with a job id derived from `file_path` Args: file_path (str): a path to a job file @@ -228,11 +221,13 @@ def restore_file_load(self, file_path: str) -> LoadJob: except api_core_exceptions.GoogleAPICallError as gace: reason = BigQuerySqlClient._get_reason_from_errors(gace) if reason == "notFound": - raise LoadJobNotExistsException(file_path) + raise LoadJobNotExistsException(file_path) from gace elif reason in BQ_TERMINAL_REASONS: - raise LoadJobTerminalException(file_path, f"The server reason was: {reason}") + raise LoadJobTerminalException( + file_path, f"The server reason was: {reason}" + ) from gace else: - raise DestinationTransientException(gace) + raise DestinationTransientException(gace) from gace return job def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: @@ -250,15 +245,17 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> reason = BigQuerySqlClient._get_reason_from_errors(gace) if reason == "notFound": # google.api_core.exceptions.NotFound: 404 - table not found - raise UnknownTableException(table["name"]) + raise UnknownTableException(table["name"]) from gace elif reason == "duplicate": # google.api_core.exceptions.Conflict: 409 PUT - already exists return self.restore_file_load(file_path) elif reason in BQ_TERMINAL_REASONS: # google.api_core.exceptions.BadRequest - will not be processed ie bad job name - raise LoadJobTerminalException(file_path, f"The server reason was: {reason}") + raise LoadJobTerminalException( + file_path, f"The server reason was: {reason}" + ) from gace else: - raise DestinationTransientException(gace) + raise DestinationTransientException(gace) from gace return job def _get_table_update_sql( @@ -274,23 +271,31 @@ def _get_table_update_sql( cluster_list = [ self.capabilities.escape_identifier(c["name"]) for c in new_columns if c.get("cluster") ] - partition_list = [ - self.capabilities.escape_identifier(c["name"]) - for c in new_columns - if c.get("partition") - ] - - # partition by must be added first - if len(partition_list) > 0: + if partition_list := [c for c in new_columns if c.get("partition")]: if len(partition_list) > 1: + col_names = [self.capabilities.escape_identifier(c["name"]) for c in partition_list] raise DestinationSchemaWillNotUpdate( - canonical_name, partition_list, "Partition requested for more than one column" + canonical_name, col_names, "Partition requested for more than one column" ) - else: - sql[0] = sql[0] + f"\nPARTITION BY DATE({partition_list[0]})" - if len(cluster_list) > 0: + elif (c := partition_list[0])["data_type"] == "date": + sql[0] = f"{sql[0]}\nPARTITION BY {self.capabilities.escape_identifier(c['name'])}" + elif (c := partition_list[0])["data_type"] == "timestamp": + sql[0] = ( + f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})" + ) + # Automatic partitioning of an INT64 type requires us to be prescriptive - we treat the column as a UNIX timestamp. + # This is due to the bounds requirement of GENERATE_ARRAY function for partitioning. + # The 10,000 partitions limit makes it infeasible to cover the entire `bigint` range. + # The array bounds, with daily partitions (86400 seconds in a day), are somewhat arbitrarily chosen. + # See: https://dlthub.com/devel/dlt-ecosystem/destinations/bigquery#supported-column-hints + elif (c := partition_list[0])["data_type"] == "bigint": + sql[0] = ( + f"{sql[0]}\nPARTITION BY" + f" RANGE_BUCKET({self.capabilities.escape_identifier(c['name'])}," + " GENERATE_ARRAY(-172800000, 691200000, 86400))" + ) + if cluster_list: sql[0] = sql[0] + "\nCLUSTER BY " + ",".join(cluster_list) - return sql def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: @@ -329,14 +334,14 @@ def _create_load_job(self, table: TTableSchema, file_path: str) -> bigquery.Load # append to table for merge loads (append to stage) and regular appends table_name = table["name"] - # determine wether we load from local or uri + # determine whether we load from local or uri bucket_path = None ext: str = os.path.splitext(file_path)[1][1:] if NewReferenceJob.is_reference_job(file_path): bucket_path = NewReferenceJob.resolve_reference(file_path) ext = os.path.splitext(bucket_path)[1][1:] - # choose correct source format + # choose a correct source format source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON decimal_target_types: List[str] = None if ext == "parquet": @@ -347,7 +352,7 @@ def _create_load_job(self, table: TTableSchema, file_path: str) -> bigquery.Load "Bigquery cannot load into JSON data type from parquet. Use jsonl instead.", ) source_format = bigquery.SourceFormat.PARQUET - # parquet needs NUMERIC type autodetection + # parquet needs NUMERIC type auto-detection decimal_target_types = ["NUMERIC", "BIGNUMERIC"] job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path) diff --git a/dlt/destinations/impl/bigquery/configuration.py b/dlt/destinations/impl/bigquery/configuration.py index bf41d38aff..0dbc8959c2 100644 --- a/dlt/destinations/impl/bigquery/configuration.py +++ b/dlt/destinations/impl/bigquery/configuration.py @@ -52,5 +52,17 @@ def __init__( file_upload_timeout: float = 30 * 60.0, retry_deadline: float = 60.0, destination_name: str = None, - environment: str = None, - ) -> None: ... + environment: str = None + ) -> None: + super().__init__( + credentials=credentials, + dataset_name=dataset_name, + default_schema_name=default_schema_name, + destination_name=destination_name, + environment=environment, + ) + self.retry_deadline = retry_deadline + self.file_upload_timeout = file_upload_timeout + self.http_timeout = http_timeout + self.location = location + ... diff --git a/dlt/destinations/impl/bigquery/factory.py b/dlt/destinations/impl/bigquery/factory.py index fc92c3c087..bee55fa164 100644 --- a/dlt/destinations/impl/bigquery/factory.py +++ b/dlt/destinations/impl/bigquery/factory.py @@ -9,6 +9,7 @@ from dlt.destinations.impl.bigquery.bigquery import BigQueryClient +# noinspection PyPep8Naming class bigquery(Destination[BigQueryClientConfiguration, "BigQueryClient"]): spec = BigQueryClientConfiguration diff --git a/dlt/destinations/impl/bigquery/sql_client.py b/dlt/destinations/impl/bigquery/sql_client.py index cf5d2ecbd4..ce38e3fe29 100644 --- a/dlt/destinations/impl/bigquery/sql_client.py +++ b/dlt/destinations/impl/bigquery/sql_client.py @@ -1,31 +1,30 @@ from contextlib import contextmanager -from typing import Any, AnyStr, ClassVar, Iterator, List, Optional, Sequence, Type +from typing import Any, AnyStr, ClassVar, Iterator, List, Optional, Sequence import google.cloud.bigquery as bigquery # noqa: I250 +from google.api_core import exceptions as api_core_exceptions +from google.cloud import exceptions as gcp_exceptions from google.cloud.bigquery import dbapi as bq_dbapi from google.cloud.bigquery.dbapi import Connection as DbApiConnection, Cursor as BQDbApiCursor -from google.cloud import exceptions as gcp_exceptions from google.cloud.bigquery.dbapi import exceptions as dbapi_exceptions -from google.api_core import exceptions as api_core_exceptions from dlt.common.configuration.specs import GcpServiceAccountCredentialsWithoutDefaults from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.typing import StrAny - -from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame from dlt.destinations.exceptions import ( DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation, ) +from dlt.destinations.impl.bigquery import capabilities from dlt.destinations.sql_client import ( DBApiCursorImpl, SqlClientBase, raise_database_error, raise_open_connection_error, ) +from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame -from dlt.destinations.impl.bigquery import capabilities # terminal reasons as returned in BQ gRPC error response # https://cloud.google.com/bigquery/docs/error-messages @@ -38,7 +37,7 @@ "stopped", "tableUnavailable", ] -# invalidQuery is an transient error -> must be fixed by programmer +# invalidQuery is a transient error -> must be fixed by programmer class BigQueryDBApiCursorImpl(DBApiCursorImpl): @@ -47,16 +46,15 @@ class BigQueryDBApiCursorImpl(DBApiCursorImpl): native_cursor: BQDbApiCursor # type: ignore def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame: + if chunk_size is not None: + return super().df(chunk_size=chunk_size) query_job: bigquery.QueryJob = self.native_cursor._query_job - if chunk_size is None: - try: - return query_job.to_dataframe(**kwargs) - except ValueError: - # no pyarrow/db-types, fallback to our implementation - return super().df() - else: - return super().df(chunk_size=chunk_size) + try: + return query_job.to_dataframe(**kwargs) + except ValueError: + # no pyarrow/db-types, fallback to our implementation + return super().df() class BigQuerySqlClient(SqlClientBase[bigquery.Client], DBTransaction): @@ -116,34 +114,32 @@ def close_connection(self) -> None: @raise_database_error def begin_transaction(self) -> Iterator[DBTransaction]: try: - # start the transaction if not yet started - if not self._session_query: - job = self._client.query( - "BEGIN TRANSACTION;", - job_config=bigquery.QueryJobConfig( - create_session=True, - default_dataset=self.fully_qualified_dataset_name(escape=False), - ), - ) - self._session_query = bigquery.QueryJobConfig( - create_session=False, - default_dataset=self.fully_qualified_dataset_name(escape=False), - connection_properties=[ - bigquery.query.ConnectionProperty( - key="session_id", value=job.session_info.session_id - ) - ], - ) - try: - job.result() - except Exception: - # if session creation fails - self._session_query = None - raise - else: + if self._session_query: raise dbapi_exceptions.ProgrammingError( "Nested transactions not supported on BigQuery" ) + job = self._client.query( + "BEGIN TRANSACTION;", + job_config=bigquery.QueryJobConfig( + create_session=True, + default_dataset=self.fully_qualified_dataset_name(escape=False), + ), + ) + self._session_query = bigquery.QueryJobConfig( + create_session=False, + default_dataset=self.fully_qualified_dataset_name(escape=False), + connection_properties=[ + bigquery.query.ConnectionProperty( + key="session_id", value=job.session_info.session_id + ) + ], + ) + try: + job.result() + except Exception: + # if session creation fails + self._session_query = None + raise yield self self.commit_transaction() except Exception: @@ -152,7 +148,7 @@ def begin_transaction(self) -> Iterator[DBTransaction]: def commit_transaction(self) -> None: if not self._session_query: - # allow to commit without transaction + # allow committing without transaction return self.execute_sql("COMMIT TRANSACTION;CALL BQ.ABORT_SESSION();") self._session_query = None @@ -181,7 +177,6 @@ def has_dataset(self) -> bool: def create_dataset(self) -> None: self._client.create_dataset( self.fully_qualified_dataset_name(escape=False), - exists_ok=False, retry=self._default_retry, timeout=self.http_timeout, ) @@ -201,21 +196,18 @@ def execute_sql( with self.execute_query(sql, *args, **kwargs) as curr: if not curr.description: return None - else: - try: - f = curr.fetchall() - return f - except api_core_exceptions.InvalidArgument as ia_ex: - if "non-table entities cannot be read" in str(ia_ex): - return None - raise + try: + return curr.fetchall() + except api_core_exceptions.InvalidArgument as ia_ex: + if "non-table entities cannot be read" in str(ia_ex): + return None + raise @contextmanager @raise_database_error def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: conn: DbApiConnection = None - curr: DBApiCursor = None - db_args = args if args else kwargs if kwargs else None + db_args = args or (kwargs or None) try: conn = DbApiConnection(client=self._client) curr = conn.cursor() @@ -238,37 +230,37 @@ def fully_qualified_dataset_name(self, escape: bool = True) -> str: @classmethod def _make_database_exception(cls, ex: Exception) -> Exception: - if cls.is_dbapi_exception(ex): - # google cloud exception in first argument: https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/dbapi/cursor.py#L205 - cloud_ex = ex.args[0] - reason = cls._get_reason_from_errors(cloud_ex) - if reason is None: - if isinstance(ex, (dbapi_exceptions.DataError, dbapi_exceptions.IntegrityError)): - return DatabaseTerminalException(ex) - elif isinstance(ex, dbapi_exceptions.ProgrammingError): - return DatabaseTransientException(ex) - if reason == "notFound": - return DatabaseUndefinedRelation(ex) - if reason == "invalidQuery" and "was not found" in str(ex) and "Dataset" in str(ex): - return DatabaseUndefinedRelation(ex) - if ( - reason == "invalidQuery" - and "Not found" in str(ex) - and ("Dataset" in str(ex) or "Table" in str(ex)) - ): - return DatabaseUndefinedRelation(ex) - if reason == "accessDenied" and "Dataset" in str(ex) and "not exist" in str(ex): - return DatabaseUndefinedRelation(ex) - if reason == "invalidQuery" and ( - "Unrecognized name" in str(ex) or "cannot be null" in str(ex) - ): - # unknown column, inserting NULL into required field - return DatabaseTerminalException(ex) - if reason in BQ_TERMINAL_REASONS: + if not cls.is_dbapi_exception(ex): + return ex + # google cloud exception in first argument: https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/dbapi/cursor.py#L205 + cloud_ex = ex.args[0] + reason = cls._get_reason_from_errors(cloud_ex) + if reason is None: + if isinstance(ex, (dbapi_exceptions.DataError, dbapi_exceptions.IntegrityError)): return DatabaseTerminalException(ex) - # anything else is transient - return DatabaseTransientException(ex) - return ex + elif isinstance(ex, dbapi_exceptions.ProgrammingError): + return DatabaseTransientException(ex) + if reason == "notFound": + return DatabaseUndefinedRelation(ex) + if reason == "invalidQuery" and "was not found" in str(ex) and "Dataset" in str(ex): + return DatabaseUndefinedRelation(ex) + if ( + reason == "invalidQuery" + and "Not found" in str(ex) + and ("Dataset" in str(ex) or "Table" in str(ex)) + ): + return DatabaseUndefinedRelation(ex) + if reason == "accessDenied" and "Dataset" in str(ex) and "not exist" in str(ex): + return DatabaseUndefinedRelation(ex) + if reason == "invalidQuery" and ( + "Unrecognized name" in str(ex) or "cannot be null" in str(ex) + ): + # unknown column, inserting NULL into required field + return DatabaseTerminalException(ex) + if reason in BQ_TERMINAL_REASONS: + return DatabaseTerminalException(ex) + # anything else is transient + return DatabaseTransientException(ex) @staticmethod def _get_reason_from_errors(gace: api_core_exceptions.GoogleAPICallError) -> Optional[str]: diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index 8a93df9970..1ba330e4ca 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -9,6 +9,7 @@ ) from dlt.common.typing import DictStrAny, StrAny + def _initialize_sheets( credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] ) -> Any: @@ -16,6 +17,7 @@ def _initialize_sheets( service = build("sheets", "v4", credentials=credentials.to_native_credentials()) return service + @dlt.source def google_spreadsheet( spreadsheet_id: str, @@ -55,6 +57,7 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: for name in sheet_names ] + if __name__ == "__main__": pipeline = dlt.pipeline(destination="duckdb") # see example.secrets.toml to where to put credentials @@ -67,4 +70,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: sheet_names=range_names, ) ) - print(info) \ No newline at end of file + print(info) diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index d1ba3537ea..fecd842214 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -10,11 +10,7 @@ def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) - yield { - "file_name": filename, - "file_path": file_path, - "mtime": os.path.getmtime(file_path) - } + yield {"file_name": filename, "file_path": file_path, "mtime": os.path.getmtime(file_path)} @dlt.transformer(primary_key="page_id", write_disposition="merge") @@ -30,10 +26,8 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item -pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' -) + +pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -46,9 +40,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column -load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") -) +load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") @@ -58,4 +50,4 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above -print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) \ No newline at end of file +print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) diff --git a/docs/website/docs/.dlt/.gitignore b/docs/website/docs/.dlt/.gitignore new file mode 100644 index 0000000000..da95bc542a --- /dev/null +++ b/docs/website/docs/.dlt/.gitignore @@ -0,0 +1 @@ +/secrets.toml diff --git a/docs/website/docs/dlt-ecosystem/destinations/bigquery.md b/docs/website/docs/dlt-ecosystem/destinations/bigquery.md index 9b34450c12..f44aab20b7 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/bigquery.md +++ b/docs/website/docs/dlt-ecosystem/destinations/bigquery.md @@ -118,15 +118,25 @@ When staging is enabled: > ❗ **Bigquery cannot load JSON columns from `parquet` files**. `dlt` will fail such jobs permanently. Switch to `jsonl` to load and parse JSON properly. ## Supported column hints + BigQuery supports the following [column hints](https://dlthub.com/docs/general-usage/schema#tables-and-columns): -* `partition` - creates a partition with a day granularity on decorated column (`PARTITION BY DATE`). May be used with `datetime`, `date` data types and `bigint` and `double` if they contain valid UNIX timestamps. Only one column per table is supported and only when a new table is created. -* `cluster` - creates a cluster column(s). Many column per table are supported and only when a new table is created. + +* `partition` - creates a partition with a day granularity on decorated column (`PARTITION BY DATE`). + May be used with `datetime`, `date` and `bigint` data types. + Only one column per table is supported and only when a new table is created. + For more information on BigQuery partitioning, read the [official docs](https://cloud.google.com/bigquery/docs/partitioned-tables). + + > ❗ `bigint` maps to BigQuery's **INT64** data type. + > Automatic partitioning requires converting an INT64 column to a UNIX timestamp, which `GENERATE_ARRAY` doesn't natively support. + > With a 10,000 partition limit, we can’t cover the full INT64 range. + > Instead, we set 86,400 second boundaries to enable daily partitioning. + > This captures typical values, but extremely large/small outliers go to an `__UNPARTITIONED__` catch-all partition. + +* `cluster` - creates a cluster column(s). Many columns per table are supported and only when a new table is created. ## Staging Support BigQuery supports gcs as a file staging destination. dlt will upload files in the parquet format to gcs and ask BigQuery to copy their data directly into the db. Please refer to the [Google Storage filesystem documentation](./filesystem.md#google-storage) to learn how to set up your gcs bucket with the bucket_url and credentials. If you use the same service account for gcs and your redshift deployment, you do not need to provide additional authentication for BigQuery to be able to read from your bucket. -```toml -``` Alternatively to parquet files, you can also specify jsonl as the staging file format. For this set the `loader_file_format` argument of the `run` command of the pipeline to `jsonl`. diff --git a/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py b/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py index db96efab86..1a6b77da1b 100644 --- a/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py +++ b/docs/website/docs/examples/connector_x_arrow/code/load_arrow-snippets.py @@ -20,13 +20,13 @@ def read_sql_x( def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( - name="genome", + name="acanthochromis_polyacanthus", write_disposition="merge", - primary_key="upid", + primary_key="analysis_id", standalone=True, )(read_sql_x)( - "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] - "SELECT * FROM genome ORDER BY created LIMIT 1000", + "mysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type] + "SELECT * FROM analysis LIMIT 20", ) # add incremental on created at genome.apply_hints(incremental=dlt.sources.incremental("created")) @@ -47,6 +47,6 @@ def genome_resource(): # check that stuff was loaded # @@@DLT_REMOVE row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE - assert row_counts["genome"] == 1000 # @@@DLT_REMOVE + assert row_counts["acanthochromis_polyacanthus"] == 20 # @@@DLT_REMOVE # @@@DLT_SNIPPET_END example diff --git a/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py b/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py index 4f32f65370..f56861e9e9 100644 --- a/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py +++ b/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py @@ -80,8 +80,8 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: ) ) print(info) - # @@@DLT_SNIPPET_END google_sheets_run - # @@@DLT_SNIPPET_END example + # @@@DLT_SNIPPET_END google_sheets_run + # @@@DLT_SNIPPET_END example row_counts = pipeline.last_trace.last_normalize_info.row_counts print(row_counts.keys()) assert row_counts["hidden_columns_merged_cells"] == 7 diff --git a/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py b/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py index fddae74ddf..1ad7cc8159 100644 --- a/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py +++ b/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py @@ -1,5 +1,6 @@ from tests.pipeline.utils import assert_load_info + def pdf_to_weaviate_snippet() -> None: # @@@DLT_SNIPPET_START example # @@@DLT_SNIPPET_START pdf_to_weaviate @@ -9,7 +10,6 @@ def pdf_to_weaviate_snippet() -> None: from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader - @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) @@ -18,10 +18,9 @@ def list_files(folder_path: str): yield { "file_name": filename, "file_path": file_path, - "mtime": os.path.getmtime(file_path) + "mtime": os.path.getmtime(file_path), } - @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): if not separate_pages: @@ -35,10 +34,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item - pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' - ) + pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -51,9 +47,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column - load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") - ) + load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") diff --git a/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md b/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md new file mode 100644 index 0000000000..8b33a852a8 --- /dev/null +++ b/docs/website/docs/general-usage/data-enrichments/user_agent_device_data_enrichment.md @@ -0,0 +1,305 @@ +--- +title: User-agent device data enrichment +description: Enriching the user-agent device data with average device price. +keywords: [data enrichment, user-agent data, device enrichment] +--- + +# Data enrichment part one: User-agent device data enrichment + +Data enrichment enhances raw data with valuable information from multiple sources, increasing its +analytical and decision-making value. + +This part covers enriching sample data with device price. Understanding the price segment +of the device that the user used to access your service can be helpful in personalized marketing, +customer segmentation, and many more. + +This documentation will discuss how to enrich the user device information with the average market +price. + +## Setup Guide + +We use SerpAPI to retrieve device prices using Google Shopping, but alternative services or APIs are +viable. + +:::note +SerpAPI free tier offers 100 free calls monthly. For production, consider upgrading to a higher +plan. +::: + + +## Creating data enrichment pipeline +You can either follow the example in the linked Colab notebook or follow this documentation to +create the user-agent device data enrichment pipeline. + +### A. Colab notebook +The Colab notebook combines three data enrichment processes for a sample dataset, starting with "Data +enrichment part one: User-agent device data". + +Here's the link to the notebook: +**[Colab Notebook](https://colab.research.google.com/drive/1ZKEkf1LRSld7CWQFS36fUXjhJKPAon7P?usp=sharing).** + +### B. Create a pipeline +Alternatively, to create a data enrichment pipeline, you can start by creating the following directory structure: + +```python +user_device_enrichment/ +├── .dlt/ +│ └── secrets.toml +└── device_enrichment_pipeline.py +``` +### 1. Creating resource + + `dlt` works on the principle of [sources](https://dlthub.com/docs/general-usage/source) + and [resources.](https://dlthub.com/docs/general-usage/resource) + + This data resource yields data typical of what many web analytics and + tracking tools can collect. However, the specifics of what data is collected + and how it's used can vary significantly among different tracking services. + + Let's examine a synthetic dataset created for this article. It includes: + + `user_id`: Web trackers typically assign unique ID to users for + tracking their journeys and interactions over time. + + `device_name`: User device information helps in understanding the user base's device. + + `page_refer`: The referer URL is tracked to analyze traffic sources and user navigation behavior. + + Here's the resource that yields the sample data as discussed above: + + ```python + import dlt + + @dlt.resource(write_disposition="append") + def tracked_data(): + """ + A generator function that yields a series of dictionaries, each representing + user tracking data. + + This function is decorated with `dlt.resource` to integrate into the DLT (Data + Loading Tool) pipeline. The `write_disposition` parameter is set to "append" to + ensure that data from this generator is appended to the existing data in the + destination table. + + Yields: + dict: A dictionary with keys 'user_id', 'device_name', and 'page_referer', + representing the user's tracking data including their device and the page + they were referred from. + """ + + # Sample data representing tracked user data + sample_data = [ + {"user_id": 1, "device_name": "Sony Experia XZ", "page_referer": + "https://b2venture.lightning.force.com/"}, + {"user_id": 2, "device_name": "Samsung Galaxy S23 Ultra 5G", + "page_referer": "https://techcrunch.com/2023/07/20/can-dlthub-solve-the-python-library-problem-for-ai-dig-ventures-thinks-so/"}, + {"user_id": 3, "device_name": "Apple iPhone 14 Pro Max", + "page_referer": "https://dlthub.com/success-stories/freelancers-perspective/"}, + {"user_id": 4, "device_name": "OnePlus 11R", + "page_referer": "https://www.reddit.com/r/dataengineering/comments/173kp9o/ideas_for_data_validation_on_data_ingestion/"}, + {"user_id": 5, "device_name": "Google Pixel 7 Pro", "page_referer": "https://pypi.org/"}, + ] + + # Yielding each user's data as a dictionary + for user_data in sample_data: + yield user_data + ``` + +### 2. Create `fetch_average_price` function + +This particular function retrieves the average price of a device by utilizing SerpAPI and Google +shopping listings. To filter the data, the function uses `dlt` state, and only fetches prices +from SerpAPI for devices that have not been updated in the most recent run or for those that were +loaded more than 180 days in the past. + +The first step is to register on [SerpAPI](https://serpapi.com/) and obtain the API token key. + +1. In the `.dlt`folder, there's a file called `secrets.toml`. It's where you store sensitive + information securely, like access tokens. Keep this file safe. Here's its format for service + account authentication: + + ```python + [sources] + api_key= "Please set me up!" #Serp Api key. + ``` + +1. Replace the value of the `api_key`. + +1. Create `fetch_average_price()` function as follows: + ```python + import datetime + import requests + + # Uncomment transformer function if it is to be used as a transformer, + # otherwise, it is being used with the `add_map` functionality. + + # @dlt.transformer(data_from=tracked_data) + def fetch_average_price(user_tracked_data): + """ + Fetches the average price of a device from an external API and + updates the user_data dictionary. + + This function retrieves the average price of a device specified in the + user_data dictionary by making an API request. The price data is cached + in the device_info state to reduce API calls. If the data for the device + is older than 180 days, a new API request is made. + + Args: + user_tracked_data (dict): A dictionary containing user data, including + the device name. + + Returns: + dict: The updated user_data dictionary with added device price and + updated timestamp. + """ + + # Retrieve the API key from dlt secrets + api_key = dlt.secrets.get("sources.api_key") + + # Get the current resource state for device information + device_info = dlt.current.resource_state().setdefault("devices", {}) + + # Current timestamp for checking the last update + current_timestamp = datetime.datetime.now() + + # Print the current device information + # print(device_info) # if you need to check state + + # Extract the device name from user data + device = user_tracked_data['device_name'] + device_data = device_info.get(device, {}) + + # Calculate the time since the last update + last_updated = ( + current_timestamp - + device_data.get('timestamp', datetime.datetime.min) + ) + # Check if the device is not in state or data is older than 180 days + if device not in device_info or last_updated > datetime.timedelta(days=180): + try: + # Make an API request to fetch device prices + response = requests.get("https://serpapi.com/search", params={ + "engine": "google_shopping", "q": device, + "api_key": api_key, "num": 10 + }) + except requests.RequestException as e: + print(f"Request failed: {e}") + return None + + if response.status_code != 200: + print(f"Failed to retrieve data: {response.status_code}") + return None + + # Process the response to extract prices + results = response.json().get("shopping_results", []) + prices = [] + for r in results: + if r.get("price"): + # Split the price string and convert each part to float + price = r.get("price") + price_parts = price.replace('$', '').replace(',', '').split() + for part in price_parts: + try: + prices.append(float(part)) + except ValueError: + pass # Ignore parts that can't be converted to float + + # Calculate the average price and update the device_info + device_price = round(sum(prices) / len(prices), 2) if prices else None + device_info[device] = { + 'timestamp': current_timestamp, + 'price': device_price + } + + # Add the device price and timestamp to the user data + user_tracked_data['device_price_USD'] = device_price + user_tracked_data['price_updated_at'] = current_timestamp + + else: + # Use cached price data if available and not outdated + user_tracked_data['device_price_USD'] = device_data.get('price') + user_tracked_data['price_updated_at'] = device_data.get('timestamp') + + return user_tracked_data + ``` + +### 3. Create your pipeline + +1. In creating the pipeline, the `fetch_average_price` can be used in the following ways: + - Add map function + - Transformer function + + + The `dlt` library's `transformer` and `add_map` functions serve distinct purposes in data + processing. + + `Transformers` used to process a resource and are ideal for post-load data transformations in a + pipeline, compatible with tools like `dbt`, the `dlt SQL client`, or Pandas for intricate data + manipulation. To read more: + [Click here.](../../general-usage/resource#process-resources-with-dlttransformer) + + Conversely, `add_map` used to customize a resource applies transformations at an item level + within a resource. It's useful for tasks like anonymizing individual data records. More on this + can be found under + [Customize resources](../../general-usage/resource#customize-resources) in the + documentation. + + +1. Here, we create the pipeline and use the `add_map` functionality: + + ```python + # Create the pipeline + pipeline = dlt.pipeline( + pipeline_name="data_enrichment_one", + destination="duckdb", + dataset_name="user_device_enrichment", + ) + + # Run the pipeline with the transformed source + load_info = pipeline.run(tracked_data.add_map(fetch_average_price)) + + print(load_info) + ``` + + :::info + Please note that the same outcome can be achieved by using the transformer function. To + do so, you need to add the transformer decorator at the top of the `fetch_average_price` function. + For `pipeline.run`, you can use the following code: + + ```python + # using fetch_average_price as a transformer function + load_info = pipeline.run( + tracked_data | fetch_average_price, + table_name="tracked_data" + ) + ``` + + This will execute the `fetch_average_price` function with the tracked data and return the average + price. + ::: + +### Run the pipeline + +1. Install necessary dependencies for the preferred + [destination](https://dlthub.com/docs/dlt-ecosystem/destinations/), For example, duckdb: + + ``` + pip install dlt[duckdb] + ``` + +1. Run the pipeline with the following command: + + ``` + python device_enrichment_pipeline.py + ``` + +1. To ensure that everything loads as expected, use the command: + + ``` + dlt pipeline show + ``` + + For example, the "pipeline_name" for the above pipeline example is `data_enrichment_one`; you can use + any custom name instead. + + diff --git a/docs/website/docs/getting-started-snippets.py b/docs/website/docs/getting-started-snippets.py index eb00df9986..8b7a01e192 100644 --- a/docs/website/docs/getting-started-snippets.py +++ b/docs/website/docs/getting-started-snippets.py @@ -99,21 +99,25 @@ def db_snippet() -> None: # use any sql database supported by SQLAlchemy, below we use a public mysql instance to get data # NOTE: you'll need to install pymysql with "pip install pymysql" # NOTE: loading data from public mysql instance may take several seconds - engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") + engine = create_engine( + "mysql+pymysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1" + ) with engine.connect() as conn: # select genome table, stream data in batches of 100 elements rows = conn.execution_options(yield_per=100).exec_driver_sql( - "SELECT * FROM genome LIMIT 1000" + "SELECT * FROM analysis LIMIT 1000" ) pipeline = dlt.pipeline( pipeline_name="from_database", destination="duckdb", - dataset_name="genome_data", + dataset_name="acanthochromis_polyacanthus_data", ) # here we convert the rows into dictionaries on the fly with a map function - load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome") + load_info = pipeline.run( + map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" + ) print(load_info) # @@@DLT_SNIPPET_END db diff --git a/docs/website/docs/intro-snippets.py b/docs/website/docs/intro-snippets.py index 340a6ff262..f1edfb0d9e 100644 --- a/docs/website/docs/intro-snippets.py +++ b/docs/website/docs/intro-snippets.py @@ -18,14 +18,13 @@ def intro_snippet() -> None: response.raise_for_status() data.append(response.json()) # Extract, normalize, and load the data - load_info = pipeline.run(data, table_name='player') + load_info = pipeline.run(data, table_name="player") # @@@DLT_SNIPPET_END api assert_load_info(load_info) def csv_snippet() -> None: - # @@@DLT_SNIPPET_START csv import dlt import pandas as pd @@ -50,8 +49,8 @@ def csv_snippet() -> None: assert_load_info(load_info) -def db_snippet() -> None: +def db_snippet() -> None: # @@@DLT_SNIPPET_START db import dlt from sqlalchemy import create_engine @@ -60,27 +59,27 @@ def db_snippet() -> None: # MySQL instance to get data. # NOTE: you'll need to install pymysql with `pip install pymysql` # NOTE: loading data from public mysql instance may take several seconds - engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") + engine = create_engine( + "mysql+pymysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1" + ) with engine.connect() as conn: # Select genome table, stream data in batches of 100 elements - query = "SELECT * FROM genome LIMIT 1000" + query = "SELECT * FROM analysis LIMIT 1000" rows = conn.execution_options(yield_per=100).exec_driver_sql(query) pipeline = dlt.pipeline( pipeline_name="from_database", destination="duckdb", - dataset_name="genome_data", + dataset_name="acanthochromis_polyacanthus_data", ) # Convert the rows into dictionaries on the fly with a map function load_info = pipeline.run( - map(lambda row: dict(row._mapping), rows), - table_name="genome" + map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" ) print(load_info) # @@@DLT_SNIPPET_END db assert_load_info(load_info) - diff --git a/docs/website/docs/tutorial/__init__.py b/docs/website/docs/tutorial/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py index cd7004bdbe..d53af9e3d9 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py +++ b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py @@ -3,7 +3,6 @@ def basic_api_snippet() -> None: - # @@@DLT_SNIPPET_START basic_api import dlt from dlt.sources.helpers import requests @@ -15,9 +14,9 @@ def basic_api_snippet() -> None: response.raise_for_status() pipeline = dlt.pipeline( - pipeline_name='github_issues', - destination='duckdb', - dataset_name='github_data', + pipeline_name="github_issues", + destination="duckdb", + dataset_name="github_data", ) # The response contains a list of issues load_info = pipeline.run(response.json(), table_name="issues") @@ -29,19 +28,15 @@ def basic_api_snippet() -> None: def replace_snippet() -> None: - # @@@DLT_SNIPPET_START replace import dlt - data = [ - {'id': 1, 'name': 'Alice'}, - {'id': 2, 'name': 'Bob'} - ] + data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] pipeline = dlt.pipeline( - pipeline_name='replace_data', - destination='duckdb', - dataset_name='mydata', + pipeline_name="replace_data", + destination="duckdb", + dataset_name="mydata", ) load_info = pipeline.run(data, table_name="users", write_disposition="replace") @@ -52,7 +47,6 @@ def replace_snippet() -> None: def incremental_snippet() -> None: - # @@@DLT_SNIPPET_START incremental import dlt from dlt.sources.helpers import requests @@ -86,9 +80,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_incremental', - destination='duckdb', - dataset_name='github_data_append', + pipeline_name="github_issues_incremental", + destination="duckdb", + dataset_name="github_data_append", ) load_info = pipeline.run(get_issues) @@ -103,7 +97,6 @@ def get_issues( def incremental_merge_snippet() -> None: - # @@@DLT_SNIPPET_START incremental_merge import dlt from dlt.sources.helpers import requests @@ -114,15 +107,15 @@ def incremental_merge_snippet() -> None: primary_key="id", ) def get_issues( - updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): # NOTE: we read only open issues to minimize number of calls to # the API. There's a limit of ~50 calls for not authenticated # Github users url = ( - f"https://api.github.com/repos/dlt-hub/dlt/issues" + "https://api.github.com/repos/dlt-hub/dlt/issues" f"?since={updated_at.last_value}&per_page=100&sort=updated" - f"&directions=desc&state=open" + "&directions=desc&state=open" ) while True: @@ -136,9 +129,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_merge', - destination='duckdb', - dataset_name='github_data_merge', + pipeline_name="github_issues_merge", + destination="duckdb", + dataset_name="github_data_merge", ) load_info = pipeline.run(get_issues) row_counts = pipeline.last_trace.last_normalize_info @@ -152,15 +145,12 @@ def get_issues( def table_dispatch_snippet() -> None: - # @@@DLT_SNIPPET_START table_dispatch import dlt from dlt.sources.helpers import requests @dlt.resource(primary_key="id", table_name=lambda i: i["type"], write_disposition="append") - def repo_events( - last_created_at = dlt.sources.incremental("created_at") - ): + def repo_events(last_created_at=dlt.sources.incremental("created_at")): url = "https://api.github.com/repos/dlt-hub/dlt/events?per_page=100" while True: @@ -179,9 +169,9 @@ def repo_events( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_events', - destination='duckdb', - dataset_name='github_events_data', + pipeline_name="github_events", + destination="duckdb", + dataset_name="github_events_data", ) load_info = pipeline.run(repo_events) row_counts = pipeline.last_trace.last_normalize_info diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index 5939ec134e..2c9b55e6da 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -221,6 +221,13 @@ const sidebars = { 'general-usage/customising-pipelines/pseudonymizing_columns', ] }, + { + type: 'category', + label: 'Data enrichments', + items: [ + 'general-usage/data-enrichments/user_agent_device_data_enrichment', + ] + }, { type: 'category', label: 'Run in production', diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index 91d8c3c77f..3319480c4f 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -67,8 +67,8 @@ def assert_sample_files( assert len(lines) >= 1 assert isinstance(lines[0], str) - assert len(all_file_items) == 10 - assert set([item["file_name"] for item in all_file_items]) == { + assert len(all_file_items) >= 10 + assert set([item["file_name"] for item in all_file_items]) >= { "csv/freshman_kgs.csv", "csv/freshman_lbs.csv", "csv/mlb_players.csv", diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 2e7930c339..ac17bb8316 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -69,15 +69,20 @@ def test_service_credentials_with_default(environment: Any) -> None: # now set the env with custom_environ({"GOOGLE_APPLICATION_CREDENTIALS": dest_path}): - gcpc = GcpServiceAccountCredentials() - resolve_configuration(gcpc) - # project id recovered from credentials - assert gcpc.project_id == "level-dragon-333019" - # check if credentials can be created - assert gcpc.to_native_credentials() is not None - # the default credentials are available - assert gcpc.has_default_credentials() is True - assert gcpc.default_credentials() is not None + _extracted_from_test_service_credentials_with_default_22() + + +# TODO Rename this here and in `test_service_credentials_with_default` +def _extracted_from_test_service_credentials_with_default_22(): + gcpc = GcpServiceAccountCredentials() + resolve_configuration(gcpc) + # project id recovered from credentials + assert gcpc.project_id == "level-dragon-333019" + # check if credentials can be created + assert gcpc.to_native_credentials() is not None + # the default credentials are available + assert gcpc.has_default_credentials() is True + assert gcpc.default_credentials() is not None def test_service_credentials_native_credentials_object(environment: Any) -> None: @@ -103,7 +108,7 @@ def _assert_credentials(gcp_credentials): gcpc.parse_native_representation(credentials) _assert_credentials(gcpc) - # oauth credentials should fail on invalid type + # oauth credentials should fail on an invalid type with pytest.raises(InvalidGoogleNativeCredentialsType): gcoauth = GcpOAuthCredentialsWithoutDefaults() gcoauth.parse_native_representation(credentials) @@ -133,15 +138,20 @@ def test_oauth_credentials_with_default(environment: Any) -> None: # now set the env _, dest_path = prepare_service_json() with custom_environ({"GOOGLE_APPLICATION_CREDENTIALS": dest_path}): - gcoauth = GcpOAuthCredentials() - resolve_configuration(gcoauth) - # project id recovered from credentials - assert gcoauth.project_id == "level-dragon-333019" - # check if credentials can be created - assert gcoauth.to_native_credentials() - # the default credentials are available - assert gcoauth.has_default_credentials() is True - assert gcoauth.default_credentials() is not None + _extracted_from_test_oauth_credentials_with_default_25() + + +# TODO Rename this here and in `test_oauth_credentials_with_default` +def _extracted_from_test_oauth_credentials_with_default_25(): + gcoauth = GcpOAuthCredentials() + resolve_configuration(gcoauth) + # project id recovered from credentials + assert gcoauth.project_id == "level-dragon-333019" + # check if credentials can be created + assert gcoauth.to_native_credentials() + # the default credentials are available + assert gcoauth.has_default_credentials() is True + assert gcoauth.default_credentials() is not None def test_oauth_credentials_native_credentials_object(environment: Any) -> None: @@ -175,7 +185,7 @@ def _assert_credentials(gcp_credentials): gcoauth.parse_native_representation(credentials) _assert_credentials(gcoauth) - # oauth credentials should fail on invalid type + # oauth credentials should fail on an invalid type with pytest.raises(InvalidGoogleNativeCredentialsType): gcpc = GcpServiceAccountCredentials() gcpc.parse_native_representation(credentials) @@ -202,7 +212,7 @@ def test_bigquery_configuration() -> None: assert config.file_upload_timeout == 1800.0 assert config.fingerprint() == digest128("chat-analytics-rasa-ci") - # credentials location is deprecated + # credential location is deprecated os.environ["CREDENTIALS__LOCATION"] = "EU" config = resolve_configuration( BigQueryClientConfiguration(dataset_name="dataset"), sections=("destination", "bigquery") @@ -229,7 +239,7 @@ def test_bigquery_configuration() -> None: def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) -> None: # non existing job with pytest.raises(LoadJobNotExistsException): - client.restore_file_load(uniq_id() + ".") + client.restore_file_load(f"{uniq_id()}.") # bad name with pytest.raises(LoadJobTerminalException): @@ -237,11 +247,15 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) user_table_name = prepare_table(client) - # start job with non existing file + # start a job with non-existing file with pytest.raises(FileNotFoundError): - client.start_file_load(client.schema.get_table(user_table_name), uniq_id() + ".", uniq_id()) + client.start_file_load( + client.schema.get_table(user_table_name), + f"{uniq_id()}.", + uniq_id(), + ) - # start job with invalid name + # start a job with invalid name dest_path = file_storage.save("!!aaaa", b"data") with pytest.raises(LoadJobTerminalException): client.start_file_load(client.schema.get_table(user_table_name), dest_path, uniq_id()) @@ -255,7 +269,7 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) } job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) - # start a job from the same file. it should fallback to retrieve job silently + # start a job from the same file. it should be a fallback to retrieve a job silently r_job = client.start_file_load( client.schema.get_table(user_table_name), file_storage.make_full_path(job.file_name()), @@ -265,7 +279,7 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) @pytest.mark.parametrize("location", ["US", "EU"]) -def test_bigquery_location(location: str, file_storage: FileStorage) -> None: +def test_bigquery_location(location: str, file_storage: FileStorage, client) -> None: with cm_yield_client_with_storage( "bigquery", default_config_values={"credentials": {"location": location}} ) as client: @@ -278,7 +292,7 @@ def test_bigquery_location(location: str, file_storage: FileStorage) -> None: } job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) - # start a job from the same file. it should fallback to retrieve job silently + # start a job from the same file. it should be a fallback to retrieve a job silently client.start_file_load( client.schema.get_table(user_table_name), file_storage.make_full_path(job.file_name()), @@ -313,7 +327,7 @@ def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> No ) assert "Only optional fields can be set to NULL. Field: timestamp;" in job.exception() - # insert wrong type + # insert a wrong type insert_json = copy(load_json) insert_json["timestamp"] = "AA" job = expect_load_file( @@ -338,9 +352,7 @@ def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> No above_limit = Decimal(10**29) # this will pass insert_json["parse_data__intent__id"] = below_limit - job = expect_load_file( - client, file_storage, json.dumps(insert_json), user_table_name, status="completed" - ) + expect_load_file(client, file_storage, json.dumps(insert_json), user_table_name) # this will fail insert_json["parse_data__intent__id"] = above_limit job = expect_load_file( @@ -370,9 +382,7 @@ def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> No def prepare_oauth_json() -> Tuple[str, str]: # prepare real service.json storage = FileStorage("_secrets", makedirs=True) - with open( - common_json_case_path("oauth_client_secret_929384042504"), mode="r", encoding="utf-8" - ) as f: + with open(common_json_case_path("oauth_client_secret_929384042504"), encoding="utf-8") as f: oauth_str = f.read() dest_path = storage.save("oauth_client_secret_929384042504.json", oauth_str) return oauth_str, dest_path diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 4f40524196..bd45895af2 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -1,18 +1,22 @@ import os +from copy import deepcopy +from typing import Iterator, Dict + import pytest import sqlfluff -from copy import deepcopy -from dlt.common.utils import custom_environ, uniq_id -from dlt.common.schema import Schema -from dlt.common.schema.utils import new_table +import dlt from dlt.common.configuration import resolve_configuration from dlt.common.configuration.specs import GcpServiceAccountCredentialsWithoutDefaults - +from dlt.common.pendulum import pendulum +from dlt.common.schema import Schema +from dlt.common.utils import custom_environ +from dlt.common.utils import uniq_id +from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate from dlt.destinations.impl.bigquery.bigquery import BigQueryClient from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration -from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate - +from dlt.extract import DltResource +from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration from tests.load.utils import TABLE_UPDATE @@ -26,24 +30,27 @@ def test_configuration() -> None: os.environ["MYBG__CREDENTIALS__PRIVATE_KEY"] = "1234" os.environ["MYBG__CREDENTIALS__PROJECT_ID"] = "1234" - # check names normalized + # check names normalised with custom_environ({"MYBG__CREDENTIALS__PRIVATE_KEY": "---NO NEWLINE---\n"}): - C = resolve_configuration(GcpServiceAccountCredentialsWithoutDefaults(), sections=("mybg",)) - assert C.private_key == "---NO NEWLINE---\n" + c = resolve_configuration(GcpServiceAccountCredentialsWithoutDefaults(), sections=("mybg",)) + assert c.private_key == "---NO NEWLINE---\n" with custom_environ({"MYBG__CREDENTIALS__PRIVATE_KEY": "---WITH NEWLINE---\n"}): - C = resolve_configuration(GcpServiceAccountCredentialsWithoutDefaults(), sections=("mybg",)) - assert C.private_key == "---WITH NEWLINE---\n" + c = resolve_configuration(GcpServiceAccountCredentialsWithoutDefaults(), sections=("mybg",)) + assert c.private_key == "---WITH NEWLINE---\n" @pytest.fixture def gcp_client(schema: Schema) -> BigQueryClient: - # return client without opening connection + # return a client without opening connection creds = GcpServiceAccountCredentialsWithoutDefaults() creds.project_id = "test_project_id" + # noinspection PydanticTypeChecker return BigQueryClient( schema, - BigQueryClientConfiguration(dataset_name="test_" + uniq_id(), credentials=creds), # type: ignore[arg-type] + BigQueryClientConfiguration( + dataset_name=f"test_{uniq_id()}", credentials=creds # type: ignore[arg-type] + ), ) @@ -109,14 +116,14 @@ def test_alter_table(gcp_client: BigQueryClient) -> None: def test_create_table_with_partition_and_cluster(gcp_client: BigQueryClient) -> None: mod_update = deepcopy(TABLE_UPDATE) # timestamp - mod_update[3]["partition"] = True + mod_update[9]["partition"] = True mod_update[4]["cluster"] = True mod_update[1]["cluster"] = True sql = gcp_client._get_table_update_sql("event_test_table", mod_update, False)[0] sqlfluff.parse(sql, dialect="bigquery") # clustering must be the last assert sql.endswith("CLUSTER BY `col2`,`col5`") - assert "PARTITION BY DATE(`col4`)" in sql + assert "PARTITION BY `col10`" in sql def test_double_partition_exception(gcp_client: BigQueryClient) -> None: @@ -128,3 +135,239 @@ def test_double_partition_exception(gcp_client: BigQueryClient) -> None: with pytest.raises(DestinationSchemaWillNotUpdate) as excc: gcp_client._get_table_update_sql("event_test_table", mod_update, False) assert excc.value.columns == ["`col4`", "`col5`"] + + +def test_create_table_with_time_partition(gcp_client: BigQueryClient) -> None: + mod_update = deepcopy(TABLE_UPDATE) + mod_update[3]["partition"] = True + sql = gcp_client._get_table_update_sql("event_test_table", mod_update, False)[0] + sqlfluff.parse(sql, dialect="bigquery") + assert "PARTITION BY DATE(`col4`)" in sql + + +def test_create_table_with_date_partition(gcp_client: BigQueryClient) -> None: + mod_update = deepcopy(TABLE_UPDATE) + mod_update[9]["partition"] = True + sql = gcp_client._get_table_update_sql("event_test_table", mod_update, False)[0] + sqlfluff.parse(sql, dialect="bigquery") + assert "PARTITION BY `col10`" in sql + + +def test_create_table_with_integer_partition(gcp_client: BigQueryClient) -> None: + mod_update = deepcopy(TABLE_UPDATE) + mod_update[0]["partition"] = True + sql = gcp_client._get_table_update_sql("event_test_table", mod_update, False)[0] + sqlfluff.parse(sql, dialect="bigquery") + assert "PARTITION BY RANGE_BUCKET(`col1`, GENERATE_ARRAY(-172800000, 691200000, 86400))" in sql + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_staging_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_bigquery_partition_by_date(destination_config: DestinationTestConfiguration) -> None: + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + + @dlt.resource( + write_disposition="merge", + primary_key="my_date_column", + columns={"my_date_column": {"data_type": "date", "partition": True, "nullable": False}}, + ) + def demo_resource() -> Iterator[Dict[str, pendulum.Date]]: + for i in range(10): + yield { + "my_date_column": pendulum.from_timestamp(1700784000 + i * 50_000).date(), + } + + @dlt.source(max_table_nesting=0) + def demo_source() -> DltResource: + return demo_resource + + pipeline.run(demo_source()) + + with pipeline.sql_client() as c: + with c.execute_query( + "SELECT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.PARTITIONS WHERE partition_id IS NOT" + " NULL);" + ) as cur: + has_partitions = cur.fetchone()[0] + assert isinstance(has_partitions, bool) + assert has_partitions + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_staging_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_bigquery_no_partition_by_date(destination_config: DestinationTestConfiguration) -> None: + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + + @dlt.resource( + write_disposition="merge", + primary_key="my_date_column", + columns={"my_date_column": {"data_type": "date", "partition": False, "nullable": False}}, + ) + def demo_resource() -> Iterator[Dict[str, pendulum.Date]]: + for i in range(10): + yield { + "my_date_column": pendulum.from_timestamp(1700784000 + i * 50_000).date(), + } + + @dlt.source(max_table_nesting=0) + def demo_source() -> DltResource: + return demo_resource + + pipeline.run(demo_source()) + + with pipeline.sql_client() as c: + with c.execute_query( + "SELECT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.PARTITIONS WHERE partition_id IS NOT" + " NULL);" + ) as cur: + has_partitions = cur.fetchone()[0] + assert isinstance(has_partitions, bool) + assert not has_partitions + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_staging_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_bigquery_partition_by_timestamp(destination_config: DestinationTestConfiguration) -> None: + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + + @dlt.resource( + write_disposition="merge", + primary_key="my_timestamp_column", + columns={ + "my_timestamp_column": {"data_type": "timestamp", "partition": True, "nullable": False} + }, + ) + def demo_resource() -> Iterator[Dict[str, pendulum.DateTime]]: + for i in range(10): + yield { + "my_timestamp_column": pendulum.from_timestamp(1700784000 + i * 50_000), + } + + @dlt.source(max_table_nesting=0) + def demo_source() -> DltResource: + return demo_resource + + pipeline.run(demo_source()) + + with pipeline.sql_client() as c: + with c.execute_query( + "SELECT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.PARTITIONS WHERE partition_id IS NOT" + " NULL);" + ) as cur: + has_partitions = cur.fetchone()[0] + assert isinstance(has_partitions, bool) + assert has_partitions + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_staging_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_bigquery_no_partition_by_timestamp( + destination_config: DestinationTestConfiguration, +) -> None: + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + + @dlt.resource( + write_disposition="merge", + primary_key="my_timestamp_column", + columns={ + "my_timestamp_column": {"data_type": "timestamp", "partition": False, "nullable": False} + }, + ) + def demo_resource() -> Iterator[Dict[str, pendulum.DateTime]]: + for i in range(10): + yield { + "my_timestamp_column": pendulum.from_timestamp(1700784000 + i * 50_000), + } + + @dlt.source(max_table_nesting=0) + def demo_source() -> DltResource: + return demo_resource + + pipeline.run(demo_source()) + + with pipeline.sql_client() as c: + with c.execute_query( + "SELECT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.PARTITIONS WHERE partition_id IS NOT" + " NULL);" + ) as cur: + has_partitions = cur.fetchone()[0] + assert isinstance(has_partitions, bool) + assert not has_partitions + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_staging_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_bigquery_partition_by_integer(destination_config: DestinationTestConfiguration) -> None: + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + + @dlt.resource( + columns={"some_int": {"data_type": "bigint", "partition": True, "nullable": False}}, + ) + def demo_resource() -> Iterator[Dict[str, int]]: + for i in range(10): + yield { + "some_int": i, + } + + @dlt.source(max_table_nesting=0) + def demo_source() -> DltResource: + return demo_resource + + pipeline.run(demo_source()) + + with pipeline.sql_client() as c: + with c.execute_query( + "SELECT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.PARTITIONS WHERE partition_id IS NOT" + " NULL);" + ) as cur: + has_partitions = cur.fetchone()[0] + assert isinstance(has_partitions, bool) + assert has_partitions + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_staging_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_bigquery_no_partition_by_integer(destination_config: DestinationTestConfiguration) -> None: + pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True) + + @dlt.resource( + columns={"some_int": {"data_type": "bigint", "partition": False, "nullable": False}}, + ) + def demo_resource() -> Iterator[Dict[str, int]]: + for i in range(10): + yield { + "some_int": i, + } + + @dlt.source(max_table_nesting=0) + def demo_source() -> DltResource: + return demo_resource + + pipeline.run(demo_source()) + + with pipeline.sql_client() as c: + with c.execute_query( + "SELECT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.PARTITIONS WHERE partition_id IS NOT" + " NULL);" + ) as cur: + has_partitions = cur.fetchone()[0] + assert isinstance(has_partitions, bool) + assert not has_partitions