If the table does not already exist, it will be created.
-
This function only supports writer protocol version 2 currently. When
-attempting to write to an existing table with a higher min_writer_version,
-this function will throw DeltaProtocolError.
-
Note that this function does NOT register this table in a data catalog.
+
The pyarrow writer supports protocol version 2 currently and won't be updated.
+For higher protocol support use engine='rust', this will become the default
+eventually.
A locking mechanism is needed to prevent unsafe concurrent writes to a
-delta lake directory when writing to S3. DynamoDB is the only available
-locking provider at the moment in delta-rs. To enable DynamoDB as the
-locking provider, you need to set the AWS_S3_LOCKING_PROVIDER to 'dynamodb'
-as a storage_option or as an environment variable.
-
Additionally, you must create a DynamoDB table with the name 'delta_rs_lock_table'
-so that it can be automatically discovered by delta-rs. Alternatively, you can
-use a table name of your choice, but you must set the DELTA_DYNAMO_TABLE_NAME
-variable to match your chosen table name. The required schema for the DynamoDB
-table is as follows:
Please note that this locking mechanism is not compatible with any other
-locking mechanisms, including the one used by Spark.
+delta lake directory when writing to S3. For more information on the setup, follow
+this usage guide: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
Delta tables store file-level metadata information, which allows for a powerful optimization called file skipping.
+
This page explains how Delta Lake implements file skipping, how to optimize your tables to maximize file skipping, and the benefits of file skipping.
+
Let’s start by looking at the file-level metadata in Delta tables.
+
Delta Lake file metadata
+
Delta Lake stores metadata about each file's min/max values in the table. Query engines can skip entire files when they don’t contain data that’s relevant to the query.
+
Suppose you have a Delta table with data stored in two files and has the following metadata.
+
filename min_name max_name min_age max_age
+fileA alice joy 12 46
+fileB allan linda 34 78
+
+
Suppose you want to run the following query: select * from the_table where age < 20.
+
The engine only needs to read fileA to execute this query. fileB has a min_age of 34, so we know there aren’t any rows of data with an age less than 20.
+
The benefit of file skipping depends on the query and the data layout of the Delta table. Some queries cannot take advantage of any file skipping. Here’s an example query that does not benefit from file skipping: select * from the_table group by age.
+
Let’s recreate this example with Polars to drive the point home.
+-------+-----+
+| name | age |
+| --- | --- |
+| str | i64 |
++=============+
+| alice | 12 |
++-------+-----+
+
+
Polars can use the Delta table metadata to skip the file that does not contain data relevant to the query.
+
How Delta Lake implements file skipping
+
Here’s how engines execute queries on Delta tables:
+
+
Start by reading the transaction log to get the file paths, file sizes, and min/max value for each column
+
Parse the query and push down the predicates to skip files
+
Read the minimal subset of the files needed for the query
+
+
Some file formats don’t allow for file skipping. For example, CSV files don’t have file-level metadata, so query engines can’t read a minimal subset of the data. The query engine has to check all the files, even if they don’t contain any relevant data.
+
When data is in Parquet files, the query engine can open up all the files, read the footers, build the file-level metadata, and perform file skipping. Fetching metadata in each file is slower than grabbing the pre-built file-level metadata from the transaction log.
+
Now, let’s see how to structure your tables to allow for more file skipping.
+
File skipping for different file sizes
+
Delta tables store data in files. Smaller files allow for more file skipping compared to bigger files.
+
However, an excessive number of small files isn’t good because it creates I/O overhead and slows down queries.
+
Your Delta tables should have files that are “right-sized”. For a table with 150 GB of data, 5 GB files would probably be too large, and 10 KB files would be too small. It’s generally best to store data in files that are between 100 MB and 1 GB.
+
Delta Lake has an optimize function that performs small file compaction, so you don’t need to program this logic yourself.
+
Now, let's investigate how to store data in files to maximize the file skipping opportunities.
+
How to maximize file skipping
+
You can maximize file-skipping by colocating similar data in the same files.
+
Suppose you have a table with test scores and frequently run queries that filter based on the test_score column.
Suppose you want to run the following query: select * from exams where test_score > 90.
+
This query cannot skip files, given the current organization of the data. You can rearrange the data to colocate similar test scores in the same files to allow for file skipping. Here’s the new layout:
The query (select * from exams where test_score > 90) can skip two of the three files with the new Delta table layout. The query engine only has to read fileF for this query.
+
Now, let’s look at how file skipping works with string values.
+
How file skipping works with strings
+
File skipping is also effective when filtering on string values.
+
Suppose you have a table with person_name and country columns. There are millions of rows of data. Here are the first three rows of data:
+
person_name country
+person1 angola
+person2 china
+person3 mexico
+
+
The Delta table contains three files with the following metadata:
+
filename min_country max_country
+fileA albania mali
+fileB libia paraguay
+fileC oman zimbabwe
+
+
Suppose you want to run the following query: select * from some_people where country = 'austria'.
+
You only need to read the data in fileA to run this query. The min_country value for fileB and fileC are greater than “austria”, so we know those files don’t contain any data relevant to the query.
+
File skipping can also be a robust optimization for string values. Now, let’s see how file skipping works for partitioned tables.
+
File skipping for partitioned tables
+
You can partition Delta tables for file skipping as well. Suppose we have the same data as in the previous section, but the table is partitioned by country.
+
Here’s the Delta table:
+
filename partition
+fileA albania
+fileB libia
+fileC oman
+fileD jamaica
+fileE albania
+fileF oman
+
+
Suppose you want to run the following query on this partitioned table: select * from some_partitioned_table where country = 'albania'.
+
You only need to read fileA and fileE to execute this query. Delta Lake provides the file-level partition metadata in the transaction log so that this query will run quickly.
+
Conclusion
+
Delta Lake allows for file skipping, which is a powerful performance optimization.
+
Delta Lake also provides built-in utilities to colocate data in the same files like partitioning, Z Ordering, and compaction to improve file skipping.
+
Delta Lake users need to know how to assess the tradeoffs of these techniques to optimize file skipping. Users also need to understand the most frequent query patterns of their tables to best allow for file maximal file skipping.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/index.html b/index.html
index adf8bd7894..c8e9ff3bc5 100644
--- a/index.html
+++ b/index.html
@@ -707,21 +707,85 @@
-
This is the documentation for the native Rust/Python implementation of Delta Lake. It is based on the delta-rs Rust library and requires no Spark or JVM dependencies. For the PySpark implementation, see delta-spark instead.
This module provides the capability to read, write, and manage Delta Lake tables with Python or Rust without Spark or Java. It uses Apache Arrow under the hood, so is compatible with other Arrow-native or integrated libraries such as pandas, DuckDB, and Polars.
\"Rust deltalake\" refers to the Rust API of delta-rs (no Spark dependency)
\"Python deltalake\" refers to the Python API of delta-rs (no Spark dependency)
\"Delta Spark\" refers to the Scala impementation of the Delta Lake transaction log protocol. This depends on Spark and Java.
"},{"location":"#why-implement-the-delta-lake-transaction-log-protocol-in-rust-and-scala","title":"Why implement the Delta Lake transaction log protocol in Rust and Scala?","text":"
Delta Spark depends on Java and Spark, which is fine for many use cases, but not all Delta Lake users want to depend on these libraries. delta-rs allows using Delta Lake in Rust or other native projects when using a JVM is often not an option.
Python deltalake lets you query Delta tables without depending on Java/Scala.
Suppose you want to query a Delta table with pandas on your local machine. Python deltalake makes it easy to query the table with a simple pip install command - no need to install Java.
Delta tables store data in Parquet files and persist file-level metadata in the transaction log.
This offers two main performance advantages:
File skipping based on metadata that\u2019s quickly accessible
Easy identification of all file paths for the table, compared to file listing operations that can be slow, especially on cloud object stores
Delta Lake stores min/max values for each column of each file in the table. Certain queries can skip entire files based on the metadata. File skipping can be a massive performance optimization.
Delta Lake also makes it easy to rearrange data in the table, so more file skipping is possible. For example, the table can be partitioned or Z Ordered, so that similar data is colocated in the same files and data skipping is optimal for your query patterns.
For data lakes, you need to run file listing operations to get the file paths before you can actually read the data. Listing all the files in a data lake can take a long time, especially if there are a lot of files and they are stored in Hive-style partitions.
Delta Lake stores all the file paths in the transaction log. So you can quickly get the file paths directly from the log and then run your query. Delta Lake also stores the file-level metadata in the transaction log which is quicker than opening all the files in the data lake and grabbing the metadata from the file footer.
Many basic data operations are hard in data lakes but quite easy with Delta Lake. The only data operation that\u2019s easy with in data lake is appending data. Delta Lake makes all data operations easy including the following:
Appends
Upserts
Deletes
Replace where
Even deleting a few rows of data from a data lake is hard. It\u2019s even harder if you want to run the operation in a performant manner.
Delta Lake makes it easy to run common data operations and executes them performantly under the hood.
Delta Lake also executes write operations as transactions, which makes data operations safer and prevents downtime. Write operations will cause data lakes to be in an unstable state while the computations is running. For example, if you read a data lake while a delete operation is running, then you may get the wrong data.
Let\u2019s explore the benefits of reliable transactions in more detail.
Delta Lake tables are interoperable and can be read/written by multiple different query engines.
For example, you can create a Delta table with Spark, append to it with pandas, and then read it with Polars.
Delta tables are powerful because they are interoperable with various query engines and computation runtimes.
Suppose you have a Delta table that\u2019s updated with an AWS Lambda function every 5 minutes. There is only a small amount of data collected every 5 minutes, so a lightweight runtime like AWS Lambda is sufficient.
Further suppose that the overall table is quite large. So when you want to perform DML operations or query the whole table, your team uses a Spark cluster.
Delta Lake is flexible to allow these types of operations from multiple readers and writers. This provides teams with the flexibility to choose the right tool for the job.
"},{"location":"why-use-delta-lake/#support-for-many-languages","title":"Support for many languages","text":"
Delta tables can be queried with a variety of different languages. This project provides APIs for Rust and Python users and does not depend on Java or Scala. This project is a great alternative for pandas, Polars, DuckDB, or DataFusion.
Delta Lake supports many languages and even more language support is coming soon!
"},{"location":"why-use-delta-lake/#support-on-multiple-clouds","title":"Support on multiple clouds","text":"
Delta Lake supports multiple clouds including GCP, AWS, and Azure.
You can also use Delta Lake on your local machine or in an on-prem environment.
Delta Lake is a mature table format that offers users tons of advantages over a data lake with virtually no downsides.
Once you start using Delta Lake, you will never want to go back to data lakes that expose you to a variety of dangerous bugs, poor performance, and reliability issues.
The Delta Lake community is also welcome and open. We gladly accept new contributors and help users with their questions.
Refers to the Databricks Unity Catalog <https://docs.databricks.com/data-governance/unity-catalog/index.html>_
","boost":2},{"location":"api/delta_writer/","title":"Writer","text":"","boost":10},{"location":"api/delta_writer/#write-to-delta-tables","title":"Write to Delta Tables","text":"","boost":10},{"location":"api/delta_writer/#deltalake.write_deltalake","title":"deltalake.write_deltalake","text":"
If the table does not already exist, it will be created.
This function only supports writer protocol version 2 currently. When attempting to write to an existing table with a higher min_writer_version, this function will throw DeltaProtocolError.
Note that this function does NOT register this table in a data catalog.
A locking mechanism is needed to prevent unsafe concurrent writes to a delta lake directory when writing to S3. DynamoDB is the only available locking provider at the moment in delta-rs. To enable DynamoDB as the locking provider, you need to set the AWS_S3_LOCKING_PROVIDER to 'dynamodb' as a storage_option or as an environment variable.
Additionally, you must create a DynamoDB table with the name 'delta_rs_lock_table' so that it can be automatically discovered by delta-rs. Alternatively, you can use a table name of your choice, but you must set the DELTA_DYNAMO_TABLE_NAME variable to match your chosen table name. The required schema for the DynamoDB table is as follows:
Data to write. If passing iterable, the schema must also be given.
required schemaOptional[Union[Schema, Schema]]
Optional schema to write.
Nonepartition_byOptional[Union[List[str], str]]
List of columns to partition the table by. Only required when creating a new table.
NonefilesystemOptional[FileSystem]
Optional filesystem to pass to PyArrow. If not provided will be inferred from uri. The file system has to be rooted in the table root. Use the pyarrow.fs.SubTreeFileSystem, to adopt the root of pyarrow file systems.
How to handle existing data. Default is to error if table already exists. If 'append', will add new data. If 'overwrite', will replace table with new data. If 'ignore', will not write anything if table already exists.
Optional write options for Parquet (ParquetFileWriteOptions). Can be provided with defaults using ParquetFileWriteOptions().make_write_options(). Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533 for the list of available options. Only used in pyarrow engine.
Nonemax_partitionsOptional[int]
the maximum number of partitions that will be used. Only used in pyarrow engine.
Nonemax_open_filesint
Limits the maximum number of files that can be left open while writing. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your data into many small files. Only used in pyarrow engine.
1024max_rows_per_fileint
Maximum number of rows per file. If greater than 0 then this will limit how many rows are placed in any single file. Otherwise there will be no limit and one file will be created in each output directory unless files need to be closed to respect max_open_files min_rows_per_group: Minimum number of rows per group. When the value is set, the dataset writer will batch incoming data and only write the row groups to the disk when sufficient rows have accumulated. Only used in pyarrow engine.
10 * 1024 * 1024max_rows_per_groupint
Maximum number of rows per group. If the value is set, then the dataset writer may split up large incoming batches into multiple row groups. If this value is set, then min_rows_per_group should also be set.
If none and compression has a level, the default level will be used, only relevant for GZIP: levels (1-9), BROTLI: levels (1-11), ZSTD: levels (1-22),
None","boost":10},{"location":"api/delta_writer/#convert-to-delta-tables","title":"Convert to Delta Tables","text":"","boost":10},{"location":"api/delta_writer/#deltalake.convert_to_deltalake","title":"deltalake.convert_to_deltalake","text":"
Currently only HIVE partitioned tables are supported. Convert to delta creates a transaction log commit with add actions, and additional properties provided such as configuration, name, and description.
Parameters:
Name Type Description Default uriUnion[str, Path]
URI of a table.
required partition_byOptional[Schema]
Optional partitioning schema if table is partitioned.
Nonepartition_strategyOptional[Literal['hive']]
Optional partition strategy to read and convert
NonemodeLiteral['error', 'ignore']
How to handle existing data. Default is to error if table already exists. If 'ignore', will not convert anything if table already exists.
","boost":2},{"location":"api/schema/","title":"Schema","text":"","boost":2},{"location":"api/schema/#schema-and-field","title":"Schema and field","text":"
Schemas, fields, and data types are provided in the deltalake.schema submodule.
get schema with all variable size types (list, binary, string) as large variants (with int64 indices). This is for compatibility with systems like Polars that only support the large versions of Arrow types.
Create the Delta Table from a path with an optional version. Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI. Depending on the storage backend used, you could provide options values using the storage_options parameter.
Parameters:
Name Type Description Default table_uriUnion[str, Path, PathLike[str]]
the path of the DeltaTable
required versionOptional[int]
version of the DeltaTable
Nonestorage_optionsOptional[Dict[str, str]]
a dictionary of the options to use for the storage backend
Nonewithout_filesbool
If True, will load table without tracking files. Some append-only applications might have no need of tracking any files. So, the DeltaTable will be loaded with a significant memory reduction.
Falselog_buffer_sizeOptional[int]
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.
Delete expired log files before current version from table. The table log retention is based on the configuration.logRetentionDuration value, 30 days by default.
How to handle existing data. Default is to error if table already exists. If 'append', returns not support error if table exists. If 'overwrite', will CREATE_OR_REPLACE table. If 'ignore', will not do anything if table already exists. Defaults to \"error\".
Delete records from a Delta Table that statisfy a predicate.
When a predicate is not provided then all records are deleted from the Delta Table. Otherwise a scan of the Delta table is performed to mark any files that contain records that satisfy the predicate. Once files are determined they are rewritten without the records.
Parameters:
Name Type Description Default predicateOptional[str]
a SQL where clause. If not passed, will delete all rows.
Nonewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer.
Nonecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
Get the list of files as absolute URIs, including the scheme (e.g. \"s3://\").
Local files will be just plain absolute paths, without a scheme. (That is, no 'file://' prefix.)
Use the partition_filters parameter to retrieve a subset of files that match the given filters.
Parameters:
Name Type Description Default partition_filtersOptional[List[Tuple[str, str, Any]]]
the partition filters that will be used for getting the matched files
None
Returns:
Type Description List[str]
list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable
Predicates are expressed in disjunctive normal form (DNF), like [(\"x\", \"=\", \"a\"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.
The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs.
Parameters:
Name Type Description Default partition_filtersOptional[List[Tuple[str, str, Any]]]
the partition filters that will be used for getting the matched files
None
Returns:
Type Description List[str]
list of the .parquet files referenced for the current version of the DeltaTable
Predicates are expressed in disjunctive normal form (DNF), like [(\"x\", \"=\", \"a\"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.
Name Type Description Default data_catalogDataCatalog
the Catalog to use for getting the storage location of the Delta Table
required database_namestr
the database name inside the Data Catalog
required table_namestr
the table name inside the Data Catalog
required data_catalog_idOptional[str]
the identifier of the Data Catalog
NoneversionOptional[int]
version of the DeltaTable
Nonelog_buffer_sizeOptional[int]
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.
Add actions represent the files that currently make up the table. This data is a low-level representation parsed from the transaction log.
Parameters:
Name Type Description Default flattenbool
whether to flatten the schema. Partition values columns are given the prefix partition., statistics (null_count, min, and max) are given the prefix null_count., min., and max., and tags the prefix tags.. Nested field names are concatenated with ..
False
Returns:
Type Description RecordBatch
a PyArrow RecordBatch containing the add action data.
Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format.
Parameters:
Name Type Description Default versionUnion[int, str, datetime]
the identifier of the version of the DeltaTable to load
Time travel Delta table to the latest version that's created at or before provided datetime_string argument. The datetime_string argument should be an RFC 3339 and ISO 8601 date and time string.
Deprecated
Load_version and load_with_datetime have been combined into DeltaTable.load_as_version.
Parameters:
Name Type Description Default datetime_stringstr
the identifier of the datetime point of the DeltaTable to load
Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not match the underlying table.
Parameters:
Name Type Description Default sourceUnion[Table, RecordBatch, RecordBatchReader, Dataset, DataFrame]
source data
required predicatestr
SQL like predicate on how to merge
required source_aliasOptional[str]
Alias for the source table
Nonetarget_aliasOptional[str]
Alias for the target table
Noneerror_on_type_mismatchbool
specify if merge will return error if data types are mismatching :default = True
Truewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer
Nonelarge_dtypesbool
If True, the data schema is kept in large_dtypes.
Truecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files.
Active files are ones that have an add action in the log, but no corresponding remove action. This operation creates a new FSCK transaction containing a remove action for each of the missing or corrupted files.
Parameters:
Name Type Description Default dry_runbool
when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False.
Falsecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
None
Returns: The metrics from repair (FSCK) action.
Example
from deltalake import DeltaTable\ndt = DeltaTable('TEST')\ndt.repair(dry_run=False)\n
Build a pandas dataframe using data from the DeltaTable.
Parameters:
Name Type Description Default partitionsOptional[List[Tuple[str, str, Any]]]
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
NonecolumnsOptional[List[str]]
The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
NonefilesystemOptional[Union[str, FileSystem]]
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
NonefiltersOptional[FilterType]
A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass partitions
Build a PyArrow Dataset using data from the DeltaTable.
Parameters:
Name Type Description Default partitionsOptional[List[Tuple[str, str, Any]]]
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
NonefilesystemOptional[Union[str, FileSystem]]
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
Build a PyArrow Table using data from the DeltaTable.
Parameters:
Name Type Description Default partitionsOptional[List[Tuple[str, str, Any]]]
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
NonecolumnsOptional[List[str]]
The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
NonefilesystemOptional[Union[str, FileSystem]]
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
NonefiltersOptional[FilterType]
A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass partitions
Update a matched table row based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
Name Type Description Default updatesDict[str, str]
a mapping of column name to update SQL expression.
Updating all source fields to target fields, source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
Name Type Description Default predicateOptional[str]
Update a target row that has no matches in the source based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
Name Type Description Default updatesDict[str, str]
a mapping of column name to update SQL expression.
Insert a new row to the target table based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the new row to be inserted.
Parameters:
Name Type Description Default updatesdict
a mapping of column name to insert SQL expression.
Insert a new row to the target table, updating all source fields to target fields. Source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the new row to be inserted.
Parameters:
Name Type Description Default predicateOptional[str]
Compacts small files to reduce the total number of files in the table.
This operation is idempotent; if run twice on the same table (assuming it has not been updated) it will do nothing the second time.
If this operation happens concurrently with any operations other than append, it will fail.
Parameters:
Name Type Description Default partition_filtersOptional[FilterType]
the partition filters that will be used for getting the matched files
Nonetarget_sizeOptional[int]
desired file size after bin-packing files, in bytes. If not provided, will attempt to read the table configuration value delta.targetFileSize. If that value isn't set, will use default value of 256MB.
Nonemax_concurrent_tasksOptional[int]
the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory.
minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition.
Nonewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer.
Nonecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
None
Returns:
Type Description Dict[str, Any]
the metrics from optimize
Example
Use a timedelta object to specify the seconds, minutes or hours of the interval.
Reorders the data using a Z-order curve to improve data skipping.
This also performs compaction, so the same parameters as compact() apply.
Parameters:
Name Type Description Default columnsIterable[str]
the columns to use for Z-ordering. There must be at least one column. partition_filters: the partition filters that will be used for getting the matched files
required target_sizeOptional[int]
desired file size after bin-packing files, in bytes. If not provided, will attempt to read the table configuration value delta.targetFileSize. If that value isn't set, will use default value of 256MB.
Nonemax_concurrent_tasksOptional[int]
the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory.
Nonemax_spill_sizeint
the maximum number of bytes to spill to disk. Defaults to 20GB.
minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition.
Nonewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer.
Nonecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
None
Returns:
Type Description Dict[str, Any]
the metrics from optimize
Example
Use a timedelta object to specify the seconds, minutes or hours of the interval.
The Parquet file stores the data that was written. The _delta_log directory stores metadata about the transactions. Let's inspect the _delta_log/00000000000000000000.json file.
This transaction adds a data file and marks the two exising data files for removal. Marking a file for removal in the transaction log is known as \"tombstoning the file\" or a \"logical delete\". This is different from a \"physical delete\" which actually removes the data file from storage.
"},{"location":"how-delta-lake-works/architecture-of-delta-table/#how-delta-table-operations-differ-from-data-lakes","title":"How Delta table operations differ from data lakes","text":"
Data lakes consist of data files persisted in storage. They don't have a transaction log that retain metadata about the transactions.
Data lakes perform transactions differently than Delta tables.
When you perform an overwrite tranasction with a Delta table, you logically delete the exiting data without physically removing it.
Data lakes don't support logical deletes, so you have to physically delete the data from storage.
Logical data operations are safer because they can be rolled back if they don't complete successfully. Physically removing data from storage can be dangerous, especially if it's before a transaction is complete.
We're now ready to look into Delta Lake ACID transactions in more detail.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/","title":"Delta Lake Transactions","text":"
This page teaches you about Delta Lake transactions and why transactions are important in production data settings. Data lakes don\u2019t support transactions and this is a huge downside because they offer a poor user experience, lack functionality, and can easily be corrupted.
Transactions on Delta Lake tables are operations that change the state of table and record descriptive entries (metadata) of those changes to the Delta Lake transaction log. Here are some examples of transactions:
Deleting rows
Appending to the table
Compacting small files
Upserting
Overwriting rows
All Delta Lake write operations are transactions in Delta tables. Reads actually aren\u2019t technically transactions because they don\u2019t result in new entries being appended to the transaction log.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#what-are-transactions","title":"What are transactions?","text":"
Transactions are any Delta operation that change the underlying files of a Delta table and result in new entries metadata entries in the transaction log. Some Delta operations rearrange data in the existing table (like Z Ordering the table or compacting the small files) and these are also transactions. Let\u2019s look at a simple example.
Suppose you have a Delta table with the following data:
Notice the 00000000000000000001.json file that was added to the transaction log to record this transaction. Let\u2019s inspect the content of the file.
Transactions are recorded in the transaction log. The transaction log is also referred to as the table metadata and is the _delta_log directory in storage.
Let\u2019s see how Delta Lake implements transactions.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#how-delta-lake-implements-transactions","title":"How Delta Lake implements transactions","text":"
Here is how Delta Lake implements transactions:
Read the existing metadata
Read the existing Parquet data files
Write the Parquet files for the current transaction
Record the new transaction in the transaction log (if there are no conflicts)
Let\u2019s recall our delete operation from the prior section and see how it fits into this transaction model:
We read the existing metadata to find the file paths for the existing Parquet files
We read the existing Parquet files and identify the files that contains data that should be removed
We write new Parquet files with the deleted data filtered out
Once the new Parquet files are written, we check for conflicts and then make an entry in the transaction log. The next section will discuss transaction conflicts in more detail.
Blind append operations can skip a few steps and are executed as follows:
Write the Parquet files for the current transaction
Record the new transaction in the metadata
Delta implements a non locking MVCC (multi version concurrency control) so writers optimistically write new data and simply abandon the transaction if it conflicts at the end. The alternative would be getting a lock at the start thereby guaranteeing the transaction immediately.
Let\u2019s look at the case when a Delta Lake transaction conflicts.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#how-delta-lake-transactions-can-conflict","title":"How Delta Lake transactions can conflict","text":"
Suppose you have a transaction that deletes a row of data that\u2019s stored in FileA (Transaction 1). While this job is running, there is another transaction that deletes some other rows in FileA (Transaction 2). Transaction 1 finishes running first and is recorded in the metadata.
Before Transaction 2 is recorded as a transaction, it will check the metadata, find that Transaction 2 conflicts with a transaction that was already recorded (from Transaction 1), and error without recording a new transaction.
Transactions 2 will write Parquet data files, but will not be recorded as a transaction, so the data files will be ignored. The zombie Parquet files can be easily cleaned up via subsequent vacuum operations.
Transaction 2 must fail otherwise it would cause the data to be incorrect.
Delta Lake transactions prevent users from making changes that would corrupt the table. Transaction conflict behavior can differ based on isolation level, which controls the degree to which a transaction must be isolated from modifications made by other concurrent transactions. More about this in the concurrency section.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#transactions-rely-on-atomic-primitives-storage-guarantees","title":"Transactions rely on atomic primitives storage guarantees","text":"
Suppose you have two transactions that are finishishing at the same exact time. Both of these transactions look at the existing Delta Lake transaction log, see that the latest transaction was 003.json and determine that the next entry should be 004.json.
If both transactions are recorded in the 004.json file, then one of them will be clobbered, and the transaction log entry for the clobbered metadata entry will be lost.
Delta tables rely on storage systems that provide atomic primitives for safe concurrency. The storage system must allow Delta Lake to write the file, only if it does not exist already, and error out otherwise. The storage system must NOT permit concurrent writers to overwrite existing metadata entries.
Some clouds have filesystems that don\u2019t explicitly support these atomic primitives, and therefore must be coupled with other services to provide the necessary guarantees.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#delta-lake-transactions-are-only-for-a-single-table","title":"Delta Lake transactions are only for a single table","text":"
Delta Lake transactions are only valid for a single table.
Some databases offer transaction support for operations that impact multiple tables. Delta Lake does not support multi-table transactions.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#data-lakes-dont-support-transactions","title":"Data lakes don\u2019t support transactions","text":"
Data lakes consist of many files in a storage system (e.g. a cloud storage system) and don\u2019t support transactions.
Data lakes don\u2019t have a metadata layer, conflict resolution, or any way to store information about transactions.
Data lakes are prone to multiple types of errors because they don\u2019t support transactions:
Easy to corrupt
Downtime/unstable state while jobs are running
Operations can conflict
Data lakes have many downsides and it\u2019s almost always better to use a lakehouse storage system like Delta Lake compared to a data lake.
We\u2019ve already explored how Delta Lake supports transactions. This section explains how Delta Lake transactions have the Atomic, Consistent, Isolated and Durable (ACID transaction) properties. Reading this section is optional.
ACID transactions are commonplace in databases but notably absent for data lakes.
Delta Lake\u2019s ACID transaction support is one of the major reasons it is almost always a better option than a data lake.
Let\u2019s explore how Delta Lake allows for ACID transactions.
Atomic transactions
An atomic transaction either fully completes or fully fails, with nothing in between.
Delta Lake transactions are atomic, unlike data lake transactions that are not atomic.
Suppose you have a job that\u2019s writing 100 files to a table. Further suppose that the job errors out and the cluster dies after writing 40 files:
For a Delta table, no additional data will be added to the table. Parquet files were written to the table, but the job errored, so no transaction log entry was added and no data was added to the table.
For a data lake, the 40 files are added and the transaction \u201cpartially succeeds\u201d.
For data tables, it\u2019s almost always preferable to have a transaction that \u201cfully fails\u201d instead of one that \u201cpartially succeeds\u201d because partial writes are hard to unwind and debug.
Delta Lake implements atomic transactions by writing data files first before making a new entry in the Delta transaction log.
These guarantees are provided at the protocol level through the \"transaction\" abstraction. We\u2019ve already discussed what constitutes a transaction for Delta Lake.
If there is an error with the transaction and some files don\u2019t get written, then no metadata entry is made and the partial data write is ignored. The zombie Parquet files can be easily cleaned up via subsequent vacuum operations.
Now let\u2019s look at how Delta Lake also provides consistent transactions.
Consistent transactions
Consistency means that transactions won\u2019t violate integrity constraints on the Delta table.
Delta Lake has two types of consistency checks:
Schema enforcement checks
Column constraints
Schema enforcement checks verify that new data appended to a Delta table matches the schema of the existing table. You cannot append data with a different schema, unless you enable schema evolution.
Delta Lake column constraints allow users to specify the requirements of data that\u2019s added to a Delta table. For example, if you have an age column with a constraint that requires the value to be positive, then Delta Lake will reject appends of any data that doesn\u2019t meet the constraint.
Data lakes don\u2019t support schema enforcement or column constraints. That\u2019s another reason why data lakes are not ACID-compliant.
Isolated transactions
Isolation means that transactions are applied to a Delta table sequentially.
Delta Lake transactions are persisted in monotonically increasing transaction files, as we saw in the previous example. First 00000000000000000000.json, then 00000000000000000001.json, then 00000000000000000002.json, and so on.
Delta Lake uses concurrency control to ensure that transactions are executed sequentially, even when user operations are performed concurrently. The next page of this guide explains concurrency in Delta Lake in detail.
Durable transactions
Delta tables are generally persisted in cloud object stores which provide durability guarantees.
Durability means that all transactions that are successfully completed will always remain persisted, even if there are service outages or program crashes.
Suppose you have a Delta table that\u2019s persisted in Azure blob storage. The Delta table transactions that are committed will always remain available, even in these circumstances:
When there are Azure service outages
If a computation cluster that\u2019s writing the Delta table crashes for some reason
Two operations are running concurrently and one of them fails
Successful transactions are always registered in the Delta table and persisted no matter what.
Delta Lake supports transactions which provide necessary reliability guarantees for production data systems.
Vanilla data lakes don\u2019t provide transactions and this can cause nasty bugs and a bad user experience. Let\u2019s look at a couple of scenarios when the lack of transactions cause a poor user experience:
While running a compaction operation on a data lake, newly compacted \u201cright sized\u201d files are added before the small files are deleted. If you read the data lake while this operation is running, you will see duplicate data.
While writing to a data lake, a job might fail, which leaves behind partially written files. These files are corrupt, which means that the data lake cannot be read until the corrupt files are manually removed.
Users want to run a simple DML operation like deleting a few rows of data which require a few files to be rewritten. This operation renders the data lake unusable until it\u2019s done running.
Transactions are a key advantage of Delta Lake vs. data lakes. There are many other advantages, but proper transactions are necessary in production data environments.
"},{"location":"integrations/delta-lake-arrow/","title":"Delta Lake Arrow Integrations","text":"
Delta Lake tables can be exposed as Arrow tables and Arrow datasets, which allows for interoperability with a variety of query engines.
This page shows you how to convert Delta tables to Arrow data structures and teaches you the difference between Arrow tables and Arrow datasets. Tables are \"eager\" and datasets are \"lazy\", which has important performance implications, keep reading to learn more!
"},{"location":"integrations/delta-lake-arrow/#delta-lake-to-arrow-dataset","title":"Delta Lake to Arrow Dataset","text":"
Delta tables can easily be exposed as Arrow datasets. This makes it easy for any query engine that can read Arrow datasets to read a Delta table.
Let's take a look at the h2o groupby dataset that contains 9 columns of data. Here are three representative rows of data:
Arrow datasets allow for the predicates to get pushed down to the query engine, so the query is executed quickly.
"},{"location":"integrations/delta-lake-arrow/#delta-lake-to-arrow-table","title":"Delta Lake to Arrow Table","text":"
You can also run the same query with DuckDB on an Arrow table:
quack = duckdb.arrow(table.to_pyarrow_table())\nquack.filter(\"id1 = 'id016' and v2 > 10\")\n
This returns the same result, but it runs slower.
"},{"location":"integrations/delta-lake-arrow/#difference-between-arrow-dataset-and-arrow-table","title":"Difference between Arrow Dataset and Arrow Table","text":"
Arrow Datasets are lazy and allow for full predicate pushdown unlike Arrow tables which are eagerly loaded into memory.
The previous DuckDB queries were run on a 1 billion row dataset that's roughly 50 GB when stored as an uncompressed CSV file. Here are the runtimes when the data is stored in a Delta table and the queries are executed on a 2021 Macbook M1 with 64 GB of RAM:
Arrow table: 17.1 seconds
Arrow dataset: 0.01 seconds
The query runs much faster on an Arrow dataset because the predicates can be pushed down to the query engine and lots of data can be skipped.
Arrow tables are eagerly materialized in memory and don't allow for the same amount of data skipping.
"},{"location":"integrations/delta-lake-arrow/#multiple-query-engines-can-query-arrow-datasets","title":"Multiple query engines can query Arrow Datasets","text":"
Other query engines like DataFusion can also query Arrow datasets, see the following example:
from datafusion import SessionContext\n\nctx = SessionContext()\nctx.register_dataset(\"my_dataset\", table.to_pyarrow_dataset())\nctx.sql(\"select * from my_dataset where v2 > 5\")\n
Delta tables can easily be exposed as Arrow tables/datasets.
Therefore any query engine that can read an Arrow table/dataset can also read a Delta table.
Arrow datasets allow for more predicates to be pushed down to the query engine, so they can perform better performance than Arrow tables.
"},{"location":"integrations/delta-lake-datafusion/","title":"Using Delta Lake with DataFusion","text":"
This page explains how to use Delta Lake with DataFusion.
Delta Lake offers DataFusion users better performance and more features compared to other formats like CSV or Parquet.
Delta Lake works well with the DataFusion Rust API and the DataFusion Python API. It's a great option for all DataFusion users.
Delta Lake also depends on DataFusion to implement SQL-related functionality under the hood. We will also discuss this dependency at the end of this guide in case you're interested in learning more about the symbiotic relationship between the two libraries.
"},{"location":"integrations/delta-lake-datafusion/#delta-lake-performance-benefits-for-datafusion-users","title":"Delta Lake performance benefits for DataFusion users","text":"
Let's run some DataFusion queries on a Parquet file and a Delta table with the same data to learn more about the performance benefits of Delta Lake.
Suppose you have the following dataset with 1 billion rows and 9 columns. Here are the first three rows of data:
ctx.sql(\"select id1, sum(v1) as v1 from my_delta_table where id1='id096' group by id1\")\n
That query takes 2.8 seconds to execute.
Let's register the same dataset as a Parquet table, run the same query, and compare the runtime difference.
Register the Parquet table and run the query:
path = \"G1_1e9_1e2_0_0.parquet\"\nctx.register_parquet(\"my_parquet_table\", path)\nctx.sql(\"select id1, sum(v1) as v1 from my_parquet_table where id1='id096' group by id1\")\n
This query takes 5.3 seconds to run.
Parquet stores data in row groups and DataFusion can intelligently skip row groups that don't contain relevant data, so the query is faster than a file format like CSV which doesn't support row group skipping.
Delta Lake stores file-level metadata information in the transaction log, so it can skip entire files when queries are executed. Delta Lake can skip entire files and then skip row groups within the individual files. This makes Delta Lake even faster than Parquet files, especially for larger datasets spread across many files.
"},{"location":"integrations/delta-lake-datafusion/#delta-lake-features-for-datafusion-users","title":"Delta Lake features for DataFusion users","text":"
Delta Lake also provides other features that are useful for DataFusion users like ACID transactions, concurrency protection, time travel, versioned data, and more.
"},{"location":"integrations/delta-lake-datafusion/#why-delta-lake-depends-on-datafusion","title":"Why Delta Lake depends on DataFusion","text":"
Delta Lake depends on DataFusion to provide some end-user features.
DataFusion is useful in providing SQL-related Delta Lake features. Some examples:
Update and merge are written in terms of SQL expressions.
Invariants and constraints are written in terms of SQL expressions.
Anytime we have to evaluate SQL, we need some sort of SQL engine. We use DataFusion for that.
Delta Lake is a great file format for DataFusion users.
Delta Lake also uses DataFusion to provide some end-user features.
DataFusion and Delta Lake have a wonderful symbiotic relationship and play very nicely with each other.
See this guide for more information on Delta Lake and PyArrow and why PyArrow Datasets are often a better option than PyArrow tables.
"},{"location":"integrations/delta-lake-pandas/","title":"Using Delta Lake with pandas","text":"
Delta Lake is a great storage system for pandas analyses. This page shows how it's easy to use Delta Lake with pandas, the unique features Delta Lake offers pandas users, and how Delta Lake can make your pandas analyses run faster.
Delta Lake is very easy to install for pandas analyses, just run pip install deltalake.
Delta Lake allows for performance optimizations, so pandas queries can run much faster than the query run on data stored in CSV or Parquet. See the following chart for the query runtime for the a Delta tables compared with CSV/Parquet.
Z Ordered Delta tables run this query much faster than when the data is stored in Parquet or CSV. Let's dive in deeper and see how Delta Lake makes pandas faster.
"},{"location":"integrations/delta-lake-pandas/#delta-lake-makes-pandas-queries-run-faster","title":"Delta Lake makes pandas queries run faster","text":"
There are a few reasons Delta Lake can make pandas queries run faster:
column pruning: only grabbing the columns relevant for a query
file skipping: only reading files with data for the query
row group skipping: only reading row groups with data for the query
Z ordering data: colocating similar data in the same files, so file skipping is more effective
Reading less data (fewer columns and/or fewer rows) is how Delta Lake makes pandas queries run faster.
Parquet allows for column pruning and row group skipping, but doesn't support file-level skipping or Z Ordering. CSV doesn't support any of these performance optimizations.
Let's take a look at a sample dataset and run a query to see the performance enhancements offered by Delta Lake.
Suppose you have a 1 billion row dataset with 9 columns, here are the first three rows of the dataset:
Parquet stores data in row groups and allows for skipping when the filters predicates are set. Run the Parquet query again with row group skipping enabled:
Here are the contents after the overwrite operation (version 2 of the Delta table):
+-------+----------+\n| num | letter |\n|-------+----------|\n| 8 | dd |\n| 9 | ee |\n+-------+----------+\n
Read in the Delta table and it will grab the latest version by default:
DeltaTable(\"tmp/some-table\").to_pandas()\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 11 | aa |\n| 22 | bb |\n+-------+----------+\n
You can easily time travel back to version 0 of the Delta table:
DeltaTable(\"tmp/some-table\", version=0).to_pandas()\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n| 3 | c |\n+-------+----------+\n
You can also time travel to version 1 of the Delta table:
DeltaTable(\"tmp/some-table\", version=1).to_pandas()\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n| 3 | c |\n| 8 | dd |\n| 9 | ee |\n+-------+----------+\n
Time travel is a powerful feature that pandas users cannot access with CSV or Parquet.
Delta tables only allow you to append DataFrame with matching schema by default. Suppose you have a DataFrame with num and animal columns, which is different from the Delta table that has columns with num and letter columns.
Try to append this DataFrame with a mismatched schema to the existing table:
This transaction will be rejected and will return the following error message:
ValueError: Schema of data does not match table schema\nData schema:\nnum: int64\nanimal: string\n-- schema metadata --\npandas: '{\"index_columns\": [{\"kind\": \"range\", \"name\": null, \"start\": 0, \"' + 474\nTable Schema:\nnum: int64\nletter: string\n
Schema enforcement protects your table from getting corrupted by appending data with mismatched schema. Parquet and CSV don't offer schema enforcement for pandas users.
"},{"location":"integrations/delta-lake-pandas/#overwriting-schema-of-table","title":"Overwriting schema of table","text":"
You can overwrite the table contents and schema by setting the overwrite_schema option. Here's how to overwrite the table contents:
Here are the contents of the table after the values and schema have been overwritten:
+-------+----------+\n| num | animal |\n|-------+----------|\n| 5 | cat |\n| 6 | dog |\n+-------+----------+\n
"},{"location":"integrations/delta-lake-pandas/#in-memory-vs-in-storage-data-changes","title":"In-memory vs. in-storage data changes","text":"
It's important to distinguish between data stored in-memory and data stored on disk when understanding the functionality offered by Delta Lake.
pandas loads data from storage (CSV, Parquet, or Delta Lake) into in-memory DataFrames.
pandas makes it easy to modify the data in memory, say update a column value. It's not easy to update a column value in storage systems like CSV or Parquet using pandas.
Delta Lake makes it easy for pandas users to update data in storage.
"},{"location":"integrations/delta-lake-pandas/#why-delta-lake-allows-for-faster-queries","title":"Why Delta Lake allows for faster queries","text":"
Delta tables store data in many files and metadata about the files in the transaction log. Delta Lake allows for certain queries to skip entire files, which makes pandas queries run much faster.
Delta Lake provides many features that make it an excellent format for pandas analyses:
performance optimizations make pandas queries run faster
data management features make pandas analyses more reliable
advanced features allow you to perform more complex pandas analyses
Python deltalake offers pandas users a better experience compared with CSV/Parquet.
"},{"location":"integrations/delta-lake-polars/","title":"Using Delta Lake with polars","text":"
This page explains why Delta Lake is a great storage system for Polars analyses.
You will learn how to create Delta tables with Polars, how to query Delta tables with Polars, and the unique advantages Delta Lake offers the Polars community.
Here are some amazing benefits that Delta Lake provides Polars users:
time travel
ACID transactions for reliable writes
better performance with file skipping
enhanced file skipping via Z Ordering
ability to rollback mistakes
and many, many more
Let's start by showing how to use Polars with Delta Lake, explore how Delta Lake can make Polars queries run faster, and then look at all the cool features Delta Lake offers Polars users.
"},{"location":"integrations/delta-lake-polars/#creating-a-delta-lake-table-with-polars","title":"Creating a Delta Lake table with Polars","text":"
Create a Polars DataFrame and write it out to a Delta table:
import polars as pl\n\ndf = pl.DataFrame({\"x\": [1, 2, 3]})\ndf.write_delta(\"tmp/bear_delta_lake\")\n
This dataset is 50GB when stored in an uncompressed CSV file. Let's run some queries on this dataset when it's stored in different file formats with Polars.
This section will show the runtime for a query when the data is stored in CSV, Parquet, and Delta Lake and explain why Delta tables are the fastest.
Start by running a query on an uncompressed CSV file with read_csv:
This query runs in 8.3 seconds. It's much faster because Polars is optimized to skip row groups in Parquet files that don't contain data that's relevant for the query.
This query runs in 7.2 seconds. Polars can run this query faster because it can inspect the Delta transaction log and skip entire files that don't contain relevant data before performing the ordinary Parquet row group skipping.
Finally run the query on the Delta table after it has been Z Ordered by id1:
This query runs in 3.5 seconds. The query on the Z Ordered Delta table is even faster because similar data has been co-located in the same files. This allows for even greater data skipping.
Polars can leverage file skipping to query Delta tables very quickly.
"},{"location":"integrations/delta-lake-polars/#why-polars-is-fast-with-delta-lake","title":"Why Polars is fast with Delta Lake","text":"
Delta tables consist of metadata in a transaction log and data stored in Parquet files.
When Polars queries a Delta table, it starts by consulting the transaction log to understand the metadata of each file in the Delta table. This allows for Polars to quickly identify which files should be skipped by the query.
CSV files don't contain any such metadata, so file skipping isn't an option. Polars can skip Parquet files based on metadata, but it needs to open up each file and read the metadata, which is slower that grabbing the file-level metadata directly from the transaction log.
Parquet doesn't allow users to easily Z Order the data and colocate similar data in the same row groups. The Z Order optimizations are only supported in Delta tables.
Delta Lake offers Polars users with unique performance optimizations.
"},{"location":"integrations/delta-lake-polars/#other-delta-lake-features-relevant-for-polars-users","title":"Other Delta Lake features relevant for Polars users","text":"
Here are the contents of the Delta table after the overwrite operation:
+-------+----------+\n| num | letter |\n|-------+----------|\n| 11 | aa |\n| 22 | bb |\n+-------+----------+\n
Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable.
dt = DeltaTable(\"tmp/some-table\", version=1)\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n| 3 | c |\n| 8 | dd |\n| 9 | ee |\n+-------+----------+\n
"},{"location":"usage/constraints/","title":"Adding a Constraint to a table","text":"
Check constraints are a way to enforce that only data that meets the constraint is allowed to be added to the table.
"},{"location":"usage/constraints/#add-the-constraint","title":"Add the Constraint","text":"Python Rust
DeltaTable
from deltalake import DeltaTable\n\ndt = DeltaTable(\"../rust/tests/data/simple_table\")\n\n# Check the schema before hand\nprint(dt.schema())\n# Add the constraint to the table.\ndt.alter.add_constraint({\"id_gt_0\": \"id > 0\"})\n
DeltaTable
let table = deltalake::open_table(\"../rust/tests/data/simple_table\").await?;\nlet ops = DeltaOps(table);\nops.with_constraint(\"id_gt_0\", \"id > 0\").await?;\n
After you have added the constraint to the table attempting to append data to the table that violates the constraint will instead throw an error.
"},{"location":"usage/constraints/#verify-the-constraint-by-trying-to-add-some-data","title":"Verify the constraint by trying to add some data","text":"Python Rust
from deltalake import write_deltalake\nimport pandas as pd\n\ndf = pd.DataFrame({\"id\": [-1]})\nwrite_deltalake(dt, df, mode=\"append\", engine=\"rust\")\n# _internal.DeltaProtocolError: Invariant violations: [\"Check or Invariant (id > 0) violated by value in row: [-1]\"]\n
Here are the contents of the Delta table after the delete operation has been performed:
+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n+-------+----------+\n
"},{"location":"usage/examining-table/","title":"Examining a Table","text":""},{"location":"usage/examining-table/#metadata","title":"Metadata","text":"
The delta log maintains basic metadata about a table, including:
A unique id
A name, if provided
A description, if provided
The list of partitionColumns.
The created_time of the table
A map of table configuration. This includes fields such as delta.appendOnly, which if true indicates the table is not meant to have data deleted from it.
Get metadata from a table with the DeltaTable.metadata() method:
The schema for the table is also saved in the transaction log. It can either be retrieved in the Delta Lake form as Schema or as a PyArrow schema. The first allows you to introspect any column-level metadata stored in the schema, while the latter represents the schema the table will be loaded into.
Use DeltaTable.schema to retrieve the delta lake schema:
Depending on what system wrote the table, the delta table may have provenance information describing what operations were performed on the table, when, and by whom. This information is retained for 30 days by default, unless otherwise specified by the table configuration delta.logRetentionDuration.
Note
This information is not written by all writers and different writers may use different schemas to encode the actions. For Spark\\'s format, see: https://docs.delta.io/latest/delta-utility.html#history-schema
To view the available history, use DeltaTable.history:
from deltalake import DeltaTable\n\ndt = DeltaTable(\"../rust/tests/data/simple_table\")\ndt.history()\n
The active state for a delta table is determined by the Add actions, which provide the list of files that are part of the table and metadata about them, such as creation time, size, and statistics. You can get a data frame of the add actions data using DeltaTable.get_add_actions:
The deltalake project can be installed via pip for Python or Cargo for Rust.
"},{"location":"usage/installation/#install-delta-lake-for-python","title":"Install Delta Lake for Python","text":"
With pip:
pip install deltalake\n
With Conda:
conda install -c conda-forge deltalake\n
"},{"location":"usage/installation/#install-delta-lake-for-rust","title":"Install Delta Lake for Rust","text":"
With Cargo:
cargo add deltalake\n
"},{"location":"usage/installation/#run-delta-lake-and-pandas-in-a-jupyter-notebook","title":"Run Delta Lake and pandas in a Jupyter Notebook","text":"
You can easily run Delta Lake and pandas in a Jupyter notebook.
Create an environment file with the dependencies as follows:
Create a virtual environment with the dependencies:
conda env create -f deltalake-minimal.yml\n
Open the Jupyter notebook and run commands as follows:
"},{"location":"usage/loading-table/","title":"Loading a Delta Table","text":"
A DeltaTable represents the state of a delta table at a particular version. This includes which files are currently part of the table, the schema of the table, and other metadata such as creation time.
Python Rust
DeltaTable
from deltalake import DeltaTable\n\ndt = DeltaTable(\"../rust/tests/data/delta-0.2.0\")\nprint(f\"Version: {dt.version()}\")\nprint(f\"Files: {dt.files()}\")\n
DeltaTable
let table = deltalake::open_table(\"../rust/tests/data/simple_table\").await.unwrap();\nprintln!(\"Version: {}\", table.version());\nprintln!(\"Files: {}\", table.get_files());\n
Depending on your storage backend, you could use the storage_options parameter to provide some configuration. Configuration is defined for specific backends - s3 options, azure options, gcs options.
The configuration can also be provided via the environment, and the basic service provider is derived from the URL being used. We try to support many of the well-known formats to identify basic service properties.
S3:
s3://\\<bucket>/\\<path>
s3a://\\<bucket>/\\<path>
Azure:
az://\\<container>/\\<path>
adl://\\<container>/\\<path>
abfs://\\<container>/\\<path>
GCS:
gs://\\<bucket>/\\<path>
Alternatively, if you have a data catalog you can load it by reference to a database and table name. Currently only AWS Glue is supported.
For AWS Glue catalog, use AWS environment variables to authenticate.
While delta always needs its internal storage backend to work and be properly configured, in order to manage the delta log, it may sometime be advantageous - and is common practice in the arrow world - to customize the storage interface used for reading the bulk data.
deltalake will work with any storage compliant with pyarrow.fs.FileSystem, however the root of the filesystem has to be adjusted to point at the root of the Delta table. We can achieve this by wrapping the custom filesystem into a pyarrow.fs.SubTreeFileSystem.
Previous table versions may not exist if they have been vacuumed, in which case an exception will be thrown. See Vacuuming tables for more information.
Vacuuming a table will delete any files that have been marked for deletion. This may make some past versions of a table invalid, so this can break time travel. However, it will save storage space. Vacuum will retain files in a certain window, by default one week, so time travel will still work in shorter ranges.
Delta tables usually don't delete old files automatically, so vacuuming regularly is considered good practice, unless the table is only appended to.
Use DeltaTable.vacuum to perform the vacuum operation. Note that to prevent accidental deletion, the function performs a dry-run by default: it will only list the files to be deleted. Pass dry_run=False to actually delete files.
>>> dt = DeltaTable(\"../rust/tests/data/simple_table\")\n>>> dt.vacuum()\n['../rust/tests/data/simple_table/part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet',\n '../rust/tests/data/simple_table/part-00190-8ac0ae67-fb1d-461d-a3d3-8dc112766ff5-c000.snappy.parquet',\n '../rust/tests/data/simple_table/part-00164-bf40481c-4afd-4c02-befa-90f056c2d77a-c000.snappy.parquet',\n ...]\n>>> dt.vacuum(dry_run=False) # Don't run this unless you are sure!\n
This guide teaches you how to use Delta Lake. You will learn how to create Delta tables, run queries, perform DML operations, and optimize your tables.
It's easy to use Delta Lake with pandas, Polars, Rust, or any other PyArrow-like DataFrame library.
See the Spark Delta Lake documentation if you're using Delta Lake with Spark.
Delta tables can be queried in several ways. By loading as Arrow data or an Arrow dataset, they can be used by compatible engines such as Pandas and DuckDB. By passing on the list of files, they can be loaded into other engines such as Dask.
Delta tables are often larger than can fit into memory on a single computer, so this module provides ways to read only the parts of the data you need. Partition filters allow you to skip reading files that are part of irrelevant partitions. Only loading the columns required also saves memory. Finally, some methods allow reading tables batch-by-batch, allowing you to process the whole table while only having a portion loaded at any given time.
To load into Pandas or a PyArrow table use the DeltaTable.to_pandas and DeltaTable.to_pyarrow_table methods, respectively. Both of these support filtering partitions and selecting particular columns.
Converting to a PyArrow Dataset allows you to filter on columns other than partition columns and load the result as a stream of batches rather than a single table. Convert to a dataset using DeltaTable.to_pyarrow_dataset. Filters applied to datasets will use the partition values and file statistics from the Delta transaction log and push down any other filters to the scanning operation.
PyArrow datasets may also be passed to compatible query engines, such as DuckDB
>>> import duckdb\n>>> ex_data = duckdb.arrow(dataset)\n>>> ex_data.filter(\"year = 2021 and value > 4\").project(\"value\")\n---------------------\n-- Expression Tree --\n---------------------\nProjection [value]\n Filter [year=2021 AND value>4]\n arrow_scan(140409099470144, 4828104688, 1000000)\n\n---------------------\n-- Result Columns --\n---------------------\n- value (VARCHAR)\n\n---------------------\n-- Result Preview --\n---------------------\nvalue\nVARCHAR\n[ Rows: 3]\n6\n7\n5\n
Finally, you can always pass the list of file paths to an engine. For example, you can pass them to dask.dataframe.read_parquet:
For overwrites and appends, use write_deltalake. If the table does not already exist, it will be created. The data parameter will accept a Pandas DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches.
Note: write_deltalake accepts a Pandas DataFrame, but will convert it to a Arrow table before writing. See caveats in pyarrow:python/pandas.
By default, writes create a new table and error if it already exists. This is controlled by the mode parameter, which mirrors the behavior of Spark's pyspark.sql.DataFrameWriter.saveAsTable DataFrame method. To overwrite pass in mode='overwrite' and to append pass in mode='append':
write_deltalake will raise ValueError if the schema of the data passed to it differs from the existing table's schema. If you wish to alter the schema as part of an overwrite pass in overwrite_schema=True.
"},{"location":"usage/writing-delta-tables/#overwriting-a-partition","title":"Overwriting a partition","text":"
You can overwrite a specific partition by using mode=\"overwrite\" together with partition_filters. This will remove all files within the matching partition and insert your data as new files. This can only be done on one partition at a time. All of the input data must belong to that partition or else the method will raise an error.
This method could also be used to insert a new partition if one doesn't already exist, making this operation idempotent.
"},{"location":"usage/optimize/delta-lake-z-order/","title":"Delta Lake Z Order","text":"
This section explains how to Z Order a Delta table.
Z Ordering colocates similar data in the same files, which allows for better file skipping and faster queries.
Suppose you have a table with first_name, age, and country columns.
If you Z Order the data by the country column, then individuals from the same country will be stored in the same files. When you subquently query the data for individuals from a given country, it will execute faster because more data can be skipped.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/","title":"Delta Lake small file compaction with optimize","text":"
This post shows you how to perform small file compaction with using the optimize method. This was added to the DeltaTable class in version 0.9.0. This command rearranges the small files into larger files which will reduce the number of files and speed up queries.
This is very helpful for workloads that append frequently. For example, if you have a table that is appended to every 10 minutes, after a year you will have 52,560 files in the table. If the table is partitioned by another dimension, you will have 52,560 files per partition; with just 100 unique values that's millions of files. By running optimize periodically, you can reduce the number of files in the table to a more manageable number.
Typically, you will run optimize less frequently than you append data. If possible, you might run optimize once you know you have finished writing to a particular partition. For example, on a table partitioned by date, you might append data every 10 minutes, but only run optimize once a day at the end of the day. This will ensure you don't need to compact the same data twice.
This section will also teach you about how to use vacuum to physically remove files from storage that are no longer needed. You\u2019ll often want vacuum after running optimize to remove the small files from storage once they\u2019ve been compacted into larger files.
Let\u2019s start with an example to explain these key concepts. All the code covered in this post is stored in this notebook in case you\u2019d like to follow along.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#create-a-delta-table-with-small-files","title":"Create a Delta table with small files","text":"
Let\u2019s start by creating a Delta table with a lot of small files so we can demonstrate the usefulness of the optimize command.
Start by writing a function that generates on thousand rows of random data given a timestamp.
def record_observations(date: datetime) -> pa.Table:\n \"\"\"Pulls data for a certain datetime\"\"\"\n nrows = 1000\n return pa.table(\n {\n \"date\": pa.array([date.date()] * nrows),\n \"timestamp\": pa.array([date] * nrows),\n \"value\": pc.random(nrows),\n }\n )\n
Let\u2019s run this function and observe the output:
Let\u2019s write 100 hours worth of data to the Delta table.
# Every hour starting at midnight on 2021-01-01\nhours_iter = (datetime(2021, 1, 1) + timedelta(hours=i) for i in itertools.count())\n\n# Write 100 hours worth of data\nfor timestamp in itertools.islice(hours_iter, 100):\n write_deltalake(\n \"observation_data\",\n record_observations(timestamp),\n partition_by=[\"date\"],\n mode=\"append\",\n )\n
This data was appended to the Delta table in 100 separate transactions, so the table will contain 100 transaction log entries and 100 data files. You can see the number of files with the files() method.
Each of these Parquet files are tiny - they\u2019re only 10 KB. Let\u2019s see how to compact these tiny files into larger files, which is more efficient for data queries.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#compact-small-files-in-the-delta-table-with-optimize","title":"Compact small files in the Delta table with optimize","text":"
Let\u2019s run the optimize command to compact the existing small files into larger files:
The optimize operation has added 5 new files and marked 100 exisitng files for removal (this is also known as \u201ctombstoning\u201d files). It has compacted the 100 tiny files into 5 larger files.
Let\u2019s append some more data to the Delta table and see how we can selectively run optimize on the new data that\u2019s added.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#handling-incremental-updates-with-optimize","title":"Handling incremental updates with optimize","text":"
Let\u2019s append another 24 hours of data to the Delta table:
for timestamp in itertools.islice(hours_iter, 24):\n write_deltalake(\n dt,\n record_observations(timestamp),\n partition_by=[\"date\"],\n mode=\"append\",\n )\n
We can use get_add_actions() to introspect the table state. We can see that 2021-01-06 has only a few hours of data so far, so we don't want to optimize that yet. But 2021-01-05 has all 24 hours of data, so it's ready to be optimized.
This optimize operation tombstones 21 small data files and adds one file with all the existing data properly condensed. Let\u2019s take a look a portion of the _delta_log/00000000000000000125.json file, which is the transaction log entry that corresponds with this incremental optimize command.
The trasaction log indicates that many files have been tombstoned and one file is added, as expected.
The Delta Lake optimize command \u201cremoves\u201d data by marking the data files as removed in the transaction log. The optimize command doesn\u2019t physically delete the Parquet file from storage. Optimize performs a \u201clogical remove\u201d not a \u201cphysical remove\u201d.
Delta Lake uses logical operations so you can time travel back to earlier versions of your data. You can vacuum your Delta table to physically remove Parquet files from storage if you don\u2019t need to time travel and don\u2019t want to pay to store the tombstoned files.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#vacuuming-after-optimizing","title":"Vacuuming after optimizing","text":"
The vacuum command deletes all files from storage that are marked for removal in the transaction log and older than the retention period which is 7 days by default.
It\u2019s normally a good idea to have a retention period of at least 7 days. For purposes of this example, we will set the retention period to zero, just so you can see how the files get removed from storage. Adjusting the retention period in this manner isn\u2019t recommended for production use cases.
All the partitions only contain a single file now, except for the date=2021-01-06 partition that has not been compacted yet.
An entire partition won\u2019t necessarily get compacted to a single data file when optimize is run. Each partition has data files that are condensed to the target file size.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#what-causes-the-small-file-problem","title":"What causes the small file problem?","text":"
Delta tables can accumulate small files for a variety of reasons:
User error: users can accidentally write files that are too small. Users should sometimes repartition in memory before writing to disk to avoid appending files that are too small.
Frequent appends: systems that append more often tend to append more smaller files. A pipeline that appends every minute will generally generate ten times as many small files compared to a system that appends every ten minutes.
Appending to partitioned data lakes with high cardinality columns can also cause small files. If you append every hour to a table that\u2019s partitioned on a column with 1,000 distinct values, then every append could create 1,000 new files. Partitioning by date avoids this problem because the data isn\u2019t split up across partitions in this manner.
This page showed you how to create a Delta table with many small files, compact the small files into larger files with optimize, and remove the tombstoned files from storage with vacuum.
You also learned about how to incrementally optimize partitioned Delta tables, so you only compact newly added data.
An excessive number of small files slows down Delta table queries, so periodic compaction is important. Make sure to properly maintain your Delta tables, so performance does not degrade over time.
"}]}
\ No newline at end of file
+{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"The deltalake package","text":"
This is the documentation for the native Rust/Python implementation of Delta Lake. It is based on the delta-rs Rust library and requires no Spark or JVM dependencies. For the PySpark implementation, see delta-spark instead.
This module provides the capability to read, write, and manage Delta Lake tables with Python or Rust without Spark or Java. It uses Apache Arrow under the hood, so is compatible with other Arrow-native or integrated libraries such as pandas, DuckDB, and Polars.
\"Rust deltalake\" refers to the Rust API of delta-rs (no Spark dependency)
\"Python deltalake\" refers to the Python API of delta-rs (no Spark dependency)
\"Delta Spark\" refers to the Scala impementation of the Delta Lake transaction log protocol. This depends on Spark and Java.
"},{"location":"#why-implement-the-delta-lake-transaction-log-protocol-in-rust-and-scala","title":"Why implement the Delta Lake transaction log protocol in Rust and Scala?","text":"
Delta Spark depends on Java and Spark, which is fine for many use cases, but not all Delta Lake users want to depend on these libraries. delta-rs allows using Delta Lake in Rust or other native projects when using a JVM is often not an option.
Python deltalake lets you query Delta tables without depending on Java/Scala.
Suppose you want to query a Delta table with pandas on your local machine. Python deltalake makes it easy to query the table with a simple pip install command - no need to install Java.
Delta tables store data in Parquet files and persist file-level metadata in the transaction log.
This offers two main performance advantages:
File skipping based on metadata that\u2019s quickly accessible
Easy identification of all file paths for the table, compared to file listing operations that can be slow, especially on cloud object stores
Delta Lake stores min/max values for each column of each file in the table. Certain queries can skip entire files based on the metadata. File skipping can be a massive performance optimization.
Delta Lake also makes it easy to rearrange data in the table, so more file skipping is possible. For example, the table can be partitioned or Z Ordered, so that similar data is colocated in the same files and data skipping is optimal for your query patterns.
For data lakes, you need to run file listing operations to get the file paths before you can actually read the data. Listing all the files in a data lake can take a long time, especially if there are a lot of files and they are stored in Hive-style partitions.
Delta Lake stores all the file paths in the transaction log. So you can quickly get the file paths directly from the log and then run your query. Delta Lake also stores the file-level metadata in the transaction log which is quicker than opening all the files in the data lake and grabbing the metadata from the file footer.
Many basic data operations are hard in data lakes but quite easy with Delta Lake. The only data operation that\u2019s easy with in data lake is appending data. Delta Lake makes all data operations easy including the following:
Appends
Upserts
Deletes
Replace where
Even deleting a few rows of data from a data lake is hard. It\u2019s even harder if you want to run the operation in a performant manner.
Delta Lake makes it easy to run common data operations and executes them performantly under the hood.
Delta Lake also executes write operations as transactions, which makes data operations safer and prevents downtime. Write operations will cause data lakes to be in an unstable state while the computations is running. For example, if you read a data lake while a delete operation is running, then you may get the wrong data.
Let\u2019s explore the benefits of reliable transactions in more detail.
Delta Lake tables are interoperable and can be read/written by multiple different query engines.
For example, you can create a Delta table with Spark, append to it with pandas, and then read it with Polars.
Delta tables are powerful because they are interoperable with various query engines and computation runtimes.
Suppose you have a Delta table that\u2019s updated with an AWS Lambda function every 5 minutes. There is only a small amount of data collected every 5 minutes, so a lightweight runtime like AWS Lambda is sufficient.
Further suppose that the overall table is quite large. So when you want to perform DML operations or query the whole table, your team uses a Spark cluster.
Delta Lake is flexible to allow these types of operations from multiple readers and writers. This provides teams with the flexibility to choose the right tool for the job.
"},{"location":"why-use-delta-lake/#support-for-many-languages","title":"Support for many languages","text":"
Delta tables can be queried with a variety of different languages. This project provides APIs for Rust and Python users and does not depend on Java or Scala. This project is a great alternative for pandas, Polars, DuckDB, or DataFusion.
Delta Lake supports many languages and even more language support is coming soon!
"},{"location":"why-use-delta-lake/#support-on-multiple-clouds","title":"Support on multiple clouds","text":"
Delta Lake supports multiple clouds including GCP, AWS, and Azure.
You can also use Delta Lake on your local machine or in an on-prem environment.
Delta Lake is a mature table format that offers users tons of advantages over a data lake with virtually no downsides.
Once you start using Delta Lake, you will never want to go back to data lakes that expose you to a variety of dangerous bugs, poor performance, and reliability issues.
The Delta Lake community is also welcome and open. We gladly accept new contributors and help users with their questions.
Refers to the Databricks Unity Catalog <https://docs.databricks.com/data-governance/unity-catalog/index.html>_
","boost":2},{"location":"api/delta_writer/","title":"Writer","text":"","boost":10},{"location":"api/delta_writer/#write-to-delta-tables","title":"Write to Delta Tables","text":"","boost":10},{"location":"api/delta_writer/#deltalake.write_deltalake","title":"deltalake.write_deltalake","text":"
If the table does not already exist, it will be created.
The pyarrow writer supports protocol version 2 currently and won't be updated. For higher protocol support use engine='rust', this will become the default eventually.
A locking mechanism is needed to prevent unsafe concurrent writes to a delta lake directory when writing to S3. For more information on the setup, follow this usage guide: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
Parameters:
Name Type Description Default table_or_uriUnion[str, Path, DeltaTable]
Data to write. If passing iterable, the schema must also be given.
required schemaOptional[Union[Schema, Schema]]
Optional schema to write.
Nonepartition_byOptional[Union[List[str], str]]
List of columns to partition the table by. Only required when creating a new table.
NonefilesystemOptional[FileSystem]
Optional filesystem to pass to PyArrow. If not provided will be inferred from uri. The file system has to be rooted in the table root. Use the pyarrow.fs.SubTreeFileSystem, to adopt the root of pyarrow file systems.
How to handle existing data. Default is to error if table already exists. If 'append', will add new data. If 'overwrite', will replace table with new data. If 'ignore', will not write anything if table already exists.
Optional write options for Parquet (ParquetFileWriteOptions). Can be provided with defaults using ParquetFileWriteOptions().make_write_options(). Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533 for the list of available options. Only used in pyarrow engine.
Nonemax_partitionsOptional[int]
the maximum number of partitions that will be used. Only used in pyarrow engine.
Nonemax_open_filesint
Limits the maximum number of files that can be left open while writing. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your data into many small files. Only used in pyarrow engine.
1024max_rows_per_fileint
Maximum number of rows per file. If greater than 0 then this will limit how many rows are placed in any single file. Otherwise there will be no limit and one file will be created in each output directory unless files need to be closed to respect max_open_files min_rows_per_group: Minimum number of rows per group. When the value is set, the dataset writer will batch incoming data and only write the row groups to the disk when sufficient rows have accumulated. Only used in pyarrow engine.
10 * 1024 * 1024max_rows_per_groupint
Maximum number of rows per group. If the value is set, then the dataset writer may split up large incoming batches into multiple row groups. If this value is set, then min_rows_per_group should also be set.
If none and compression has a level, the default level will be used, only relevant for GZIP: levels (1-9), BROTLI: levels (1-11), ZSTD: levels (1-22),
None","boost":10},{"location":"api/delta_writer/#convert-to-delta-tables","title":"Convert to Delta Tables","text":"","boost":10},{"location":"api/delta_writer/#deltalake.convert_to_deltalake","title":"deltalake.convert_to_deltalake","text":"
Currently only HIVE partitioned tables are supported. Convert to delta creates a transaction log commit with add actions, and additional properties provided such as configuration, name, and description.
Parameters:
Name Type Description Default uriUnion[str, Path]
URI of a table.
required partition_byOptional[Schema]
Optional partitioning schema if table is partitioned.
Nonepartition_strategyOptional[Literal['hive']]
Optional partition strategy to read and convert
NonemodeLiteral['error', 'ignore']
How to handle existing data. Default is to error if table already exists. If 'ignore', will not convert anything if table already exists.
","boost":2},{"location":"api/schema/","title":"Schema","text":"","boost":2},{"location":"api/schema/#schema-and-field","title":"Schema and field","text":"
Schemas, fields, and data types are provided in the deltalake.schema submodule.
get schema with all variable size types (list, binary, string) as large variants (with int64 indices). This is for compatibility with systems like Polars that only support the large versions of Arrow types.
Create the Delta Table from a path with an optional version. Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI. Depending on the storage backend used, you could provide options values using the storage_options parameter.
Parameters:
Name Type Description Default table_uriUnion[str, Path, PathLike[str]]
the path of the DeltaTable
required versionOptional[int]
version of the DeltaTable
Nonestorage_optionsOptional[Dict[str, str]]
a dictionary of the options to use for the storage backend
Nonewithout_filesbool
If True, will load table without tracking files. Some append-only applications might have no need of tracking any files. So, the DeltaTable will be loaded with a significant memory reduction.
Falselog_buffer_sizeOptional[int]
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.
Delete expired log files before current version from table. The table log retention is based on the configuration.logRetentionDuration value, 30 days by default.
How to handle existing data. Default is to error if table already exists. If 'append', returns not support error if table exists. If 'overwrite', will CREATE_OR_REPLACE table. If 'ignore', will not do anything if table already exists. Defaults to \"error\".
Delete records from a Delta Table that statisfy a predicate.
When a predicate is not provided then all records are deleted from the Delta Table. Otherwise a scan of the Delta table is performed to mark any files that contain records that satisfy the predicate. Once files are determined they are rewritten without the records.
Parameters:
Name Type Description Default predicateOptional[str]
a SQL where clause. If not passed, will delete all rows.
Nonewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer.
Nonecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
Get the list of files as absolute URIs, including the scheme (e.g. \"s3://\").
Local files will be just plain absolute paths, without a scheme. (That is, no 'file://' prefix.)
Use the partition_filters parameter to retrieve a subset of files that match the given filters.
Parameters:
Name Type Description Default partition_filtersOptional[List[Tuple[str, str, Any]]]
the partition filters that will be used for getting the matched files
None
Returns:
Type Description List[str]
list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable
Predicates are expressed in disjunctive normal form (DNF), like [(\"x\", \"=\", \"a\"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.
The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs.
Parameters:
Name Type Description Default partition_filtersOptional[List[Tuple[str, str, Any]]]
the partition filters that will be used for getting the matched files
None
Returns:
Type Description List[str]
list of the .parquet files referenced for the current version of the DeltaTable
Predicates are expressed in disjunctive normal form (DNF), like [(\"x\", \"=\", \"a\"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.
Name Type Description Default data_catalogDataCatalog
the Catalog to use for getting the storage location of the Delta Table
required database_namestr
the database name inside the Data Catalog
required table_namestr
the table name inside the Data Catalog
required data_catalog_idOptional[str]
the identifier of the Data Catalog
NoneversionOptional[int]
version of the DeltaTable
Nonelog_buffer_sizeOptional[int]
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.
Add actions represent the files that currently make up the table. This data is a low-level representation parsed from the transaction log.
Parameters:
Name Type Description Default flattenbool
whether to flatten the schema. Partition values columns are given the prefix partition., statistics (null_count, min, and max) are given the prefix null_count., min., and max., and tags the prefix tags.. Nested field names are concatenated with ..
False
Returns:
Type Description RecordBatch
a PyArrow RecordBatch containing the add action data.
Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format.
Parameters:
Name Type Description Default versionUnion[int, str, datetime]
the identifier of the version of the DeltaTable to load
Time travel Delta table to the latest version that's created at or before provided datetime_string argument. The datetime_string argument should be an RFC 3339 and ISO 8601 date and time string.
Deprecated
Load_version and load_with_datetime have been combined into DeltaTable.load_as_version.
Parameters:
Name Type Description Default datetime_stringstr
the identifier of the datetime point of the DeltaTable to load
Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not match the underlying table.
Parameters:
Name Type Description Default sourceUnion[Table, RecordBatch, RecordBatchReader, Dataset, DataFrame]
source data
required predicatestr
SQL like predicate on how to merge
required source_aliasOptional[str]
Alias for the source table
Nonetarget_aliasOptional[str]
Alias for the target table
Noneerror_on_type_mismatchbool
specify if merge will return error if data types are mismatching :default = True
Truewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer
Nonelarge_dtypesbool
If True, the data schema is kept in large_dtypes.
Truecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files.
Active files are ones that have an add action in the log, but no corresponding remove action. This operation creates a new FSCK transaction containing a remove action for each of the missing or corrupted files.
Parameters:
Name Type Description Default dry_runbool
when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False.
Falsecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
None
Returns: The metrics from repair (FSCK) action.
Example
from deltalake import DeltaTable\ndt = DeltaTable('TEST')\ndt.repair(dry_run=False)\n
Build a pandas dataframe using data from the DeltaTable.
Parameters:
Name Type Description Default partitionsOptional[List[Tuple[str, str, Any]]]
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
NonecolumnsOptional[List[str]]
The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
NonefilesystemOptional[Union[str, FileSystem]]
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
NonefiltersOptional[FilterType]
A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass partitions
Build a PyArrow Dataset using data from the DeltaTable.
Parameters:
Name Type Description Default partitionsOptional[List[Tuple[str, str, Any]]]
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
NonefilesystemOptional[Union[str, FileSystem]]
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
Build a PyArrow Table using data from the DeltaTable.
Parameters:
Name Type Description Default partitionsOptional[List[Tuple[str, str, Any]]]
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
NonecolumnsOptional[List[str]]
The columns to project. This can be a list of column names to include (order and duplicates will be preserved)
NonefilesystemOptional[Union[str, FileSystem]]
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
NonefiltersOptional[FilterType]
A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass partitions
Update a matched table row based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
Name Type Description Default updatesDict[str, str]
a mapping of column name to update SQL expression.
Updating all source fields to target fields, source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
Name Type Description Default predicateOptional[str]
Update a target row that has no matches in the source based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.
Parameters:
Name Type Description Default updatesDict[str, str]
a mapping of column name to update SQL expression.
Insert a new row to the target table based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the new row to be inserted.
Parameters:
Name Type Description Default updatesdict
a mapping of column name to insert SQL expression.
Insert a new row to the target table, updating all source fields to target fields. Source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the new row to be inserted.
Parameters:
Name Type Description Default predicateOptional[str]
Compacts small files to reduce the total number of files in the table.
This operation is idempotent; if run twice on the same table (assuming it has not been updated) it will do nothing the second time.
If this operation happens concurrently with any operations other than append, it will fail.
Parameters:
Name Type Description Default partition_filtersOptional[FilterType]
the partition filters that will be used for getting the matched files
Nonetarget_sizeOptional[int]
desired file size after bin-packing files, in bytes. If not provided, will attempt to read the table configuration value delta.targetFileSize. If that value isn't set, will use default value of 256MB.
Nonemax_concurrent_tasksOptional[int]
the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory.
minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition.
Nonewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer.
Nonecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
None
Returns:
Type Description Dict[str, Any]
the metrics from optimize
Example
Use a timedelta object to specify the seconds, minutes or hours of the interval.
Reorders the data using a Z-order curve to improve data skipping.
This also performs compaction, so the same parameters as compact() apply.
Parameters:
Name Type Description Default columnsIterable[str]
the columns to use for Z-ordering. There must be at least one column. partition_filters: the partition filters that will be used for getting the matched files
required target_sizeOptional[int]
desired file size after bin-packing files, in bytes. If not provided, will attempt to read the table configuration value delta.targetFileSize. If that value isn't set, will use default value of 256MB.
Nonemax_concurrent_tasksOptional[int]
the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory.
Nonemax_spill_sizeint
the maximum number of bytes to spill to disk. Defaults to 20GB.
minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition.
Nonewriter_propertiesOptional[WriterProperties]
Pass writer properties to the Rust parquet writer.
Nonecustom_metadataOptional[Dict[str, str]]
custom metadata that will be added to the transaction commit.
None
Returns:
Type Description Dict[str, Any]
the metrics from optimize
Example
Use a timedelta object to specify the seconds, minutes or hours of the interval.
The Parquet file stores the data that was written. The _delta_log directory stores metadata about the transactions. Let's inspect the _delta_log/00000000000000000000.json file.
This transaction adds a data file and marks the two exising data files for removal. Marking a file for removal in the transaction log is known as \"tombstoning the file\" or a \"logical delete\". This is different from a \"physical delete\" which actually removes the data file from storage.
"},{"location":"how-delta-lake-works/architecture-of-delta-table/#how-delta-table-operations-differ-from-data-lakes","title":"How Delta table operations differ from data lakes","text":"
Data lakes consist of data files persisted in storage. They don't have a transaction log that retain metadata about the transactions.
Data lakes perform transactions differently than Delta tables.
When you perform an overwrite tranasction with a Delta table, you logically delete the exiting data without physically removing it.
Data lakes don't support logical deletes, so you have to physically delete the data from storage.
Logical data operations are safer because they can be rolled back if they don't complete successfully. Physically removing data from storage can be dangerous, especially if it's before a transaction is complete.
We're now ready to look into Delta Lake ACID transactions in more detail.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/","title":"Delta Lake Transactions","text":"
This page teaches you about Delta Lake transactions and why transactions are important in production data settings. Data lakes don\u2019t support transactions and this is a huge downside because they offer a poor user experience, lack functionality, and can easily be corrupted.
Transactions on Delta Lake tables are operations that change the state of table and record descriptive entries (metadata) of those changes to the Delta Lake transaction log. Here are some examples of transactions:
Deleting rows
Appending to the table
Compacting small files
Upserting
Overwriting rows
All Delta Lake write operations are transactions in Delta tables. Reads actually aren\u2019t technically transactions because they don\u2019t result in new entries being appended to the transaction log.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#what-are-transactions","title":"What are transactions?","text":"
Transactions are any Delta operation that change the underlying files of a Delta table and result in new entries metadata entries in the transaction log. Some Delta operations rearrange data in the existing table (like Z Ordering the table or compacting the small files) and these are also transactions. Let\u2019s look at a simple example.
Suppose you have a Delta table with the following data:
Notice the 00000000000000000001.json file that was added to the transaction log to record this transaction. Let\u2019s inspect the content of the file.
Transactions are recorded in the transaction log. The transaction log is also referred to as the table metadata and is the _delta_log directory in storage.
Let\u2019s see how Delta Lake implements transactions.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#how-delta-lake-implements-transactions","title":"How Delta Lake implements transactions","text":"
Here is how Delta Lake implements transactions:
Read the existing metadata
Read the existing Parquet data files
Write the Parquet files for the current transaction
Record the new transaction in the transaction log (if there are no conflicts)
Let\u2019s recall our delete operation from the prior section and see how it fits into this transaction model:
We read the existing metadata to find the file paths for the existing Parquet files
We read the existing Parquet files and identify the files that contains data that should be removed
We write new Parquet files with the deleted data filtered out
Once the new Parquet files are written, we check for conflicts and then make an entry in the transaction log. The next section will discuss transaction conflicts in more detail.
Blind append operations can skip a few steps and are executed as follows:
Write the Parquet files for the current transaction
Record the new transaction in the metadata
Delta implements a non locking MVCC (multi version concurrency control) so writers optimistically write new data and simply abandon the transaction if it conflicts at the end. The alternative would be getting a lock at the start thereby guaranteeing the transaction immediately.
Let\u2019s look at the case when a Delta Lake transaction conflicts.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#how-delta-lake-transactions-can-conflict","title":"How Delta Lake transactions can conflict","text":"
Suppose you have a transaction that deletes a row of data that\u2019s stored in FileA (Transaction 1). While this job is running, there is another transaction that deletes some other rows in FileA (Transaction 2). Transaction 1 finishes running first and is recorded in the metadata.
Before Transaction 2 is recorded as a transaction, it will check the metadata, find that Transaction 2 conflicts with a transaction that was already recorded (from Transaction 1), and error without recording a new transaction.
Transactions 2 will write Parquet data files, but will not be recorded as a transaction, so the data files will be ignored. The zombie Parquet files can be easily cleaned up via subsequent vacuum operations.
Transaction 2 must fail otherwise it would cause the data to be incorrect.
Delta Lake transactions prevent users from making changes that would corrupt the table. Transaction conflict behavior can differ based on isolation level, which controls the degree to which a transaction must be isolated from modifications made by other concurrent transactions. More about this in the concurrency section.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#transactions-rely-on-atomic-primitives-storage-guarantees","title":"Transactions rely on atomic primitives storage guarantees","text":"
Suppose you have two transactions that are finishishing at the same exact time. Both of these transactions look at the existing Delta Lake transaction log, see that the latest transaction was 003.json and determine that the next entry should be 004.json.
If both transactions are recorded in the 004.json file, then one of them will be clobbered, and the transaction log entry for the clobbered metadata entry will be lost.
Delta tables rely on storage systems that provide atomic primitives for safe concurrency. The storage system must allow Delta Lake to write the file, only if it does not exist already, and error out otherwise. The storage system must NOT permit concurrent writers to overwrite existing metadata entries.
Some clouds have filesystems that don\u2019t explicitly support these atomic primitives, and therefore must be coupled with other services to provide the necessary guarantees.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#delta-lake-transactions-are-only-for-a-single-table","title":"Delta Lake transactions are only for a single table","text":"
Delta Lake transactions are only valid for a single table.
Some databases offer transaction support for operations that impact multiple tables. Delta Lake does not support multi-table transactions.
"},{"location":"how-delta-lake-works/delta-lake-acid-transactions/#data-lakes-dont-support-transactions","title":"Data lakes don\u2019t support transactions","text":"
Data lakes consist of many files in a storage system (e.g. a cloud storage system) and don\u2019t support transactions.
Data lakes don\u2019t have a metadata layer, conflict resolution, or any way to store information about transactions.
Data lakes are prone to multiple types of errors because they don\u2019t support transactions:
Easy to corrupt
Downtime/unstable state while jobs are running
Operations can conflict
Data lakes have many downsides and it\u2019s almost always better to use a lakehouse storage system like Delta Lake compared to a data lake.
We\u2019ve already explored how Delta Lake supports transactions. This section explains how Delta Lake transactions have the Atomic, Consistent, Isolated and Durable (ACID transaction) properties. Reading this section is optional.
ACID transactions are commonplace in databases but notably absent for data lakes.
Delta Lake\u2019s ACID transaction support is one of the major reasons it is almost always a better option than a data lake.
Let\u2019s explore how Delta Lake allows for ACID transactions.
Atomic transactions
An atomic transaction either fully completes or fully fails, with nothing in between.
Delta Lake transactions are atomic, unlike data lake transactions that are not atomic.
Suppose you have a job that\u2019s writing 100 files to a table. Further suppose that the job errors out and the cluster dies after writing 40 files:
For a Delta table, no additional data will be added to the table. Parquet files were written to the table, but the job errored, so no transaction log entry was added and no data was added to the table.
For a data lake, the 40 files are added and the transaction \u201cpartially succeeds\u201d.
For data tables, it\u2019s almost always preferable to have a transaction that \u201cfully fails\u201d instead of one that \u201cpartially succeeds\u201d because partial writes are hard to unwind and debug.
Delta Lake implements atomic transactions by writing data files first before making a new entry in the Delta transaction log.
These guarantees are provided at the protocol level through the \"transaction\" abstraction. We\u2019ve already discussed what constitutes a transaction for Delta Lake.
If there is an error with the transaction and some files don\u2019t get written, then no metadata entry is made and the partial data write is ignored. The zombie Parquet files can be easily cleaned up via subsequent vacuum operations.
Now let\u2019s look at how Delta Lake also provides consistent transactions.
Consistent transactions
Consistency means that transactions won\u2019t violate integrity constraints on the Delta table.
Delta Lake has two types of consistency checks:
Schema enforcement checks
Column constraints
Schema enforcement checks verify that new data appended to a Delta table matches the schema of the existing table. You cannot append data with a different schema, unless you enable schema evolution.
Delta Lake column constraints allow users to specify the requirements of data that\u2019s added to a Delta table. For example, if you have an age column with a constraint that requires the value to be positive, then Delta Lake will reject appends of any data that doesn\u2019t meet the constraint.
Data lakes don\u2019t support schema enforcement or column constraints. That\u2019s another reason why data lakes are not ACID-compliant.
Isolated transactions
Isolation means that transactions are applied to a Delta table sequentially.
Delta Lake transactions are persisted in monotonically increasing transaction files, as we saw in the previous example. First 00000000000000000000.json, then 00000000000000000001.json, then 00000000000000000002.json, and so on.
Delta Lake uses concurrency control to ensure that transactions are executed sequentially, even when user operations are performed concurrently. The next page of this guide explains concurrency in Delta Lake in detail.
Durable transactions
Delta tables are generally persisted in cloud object stores which provide durability guarantees.
Durability means that all transactions that are successfully completed will always remain persisted, even if there are service outages or program crashes.
Suppose you have a Delta table that\u2019s persisted in Azure blob storage. The Delta table transactions that are committed will always remain available, even in these circumstances:
When there are Azure service outages
If a computation cluster that\u2019s writing the Delta table crashes for some reason
Two operations are running concurrently and one of them fails
Successful transactions are always registered in the Delta table and persisted no matter what.
Delta Lake supports transactions which provide necessary reliability guarantees for production data systems.
Vanilla data lakes don\u2019t provide transactions and this can cause nasty bugs and a bad user experience. Let\u2019s look at a couple of scenarios when the lack of transactions cause a poor user experience:
While running a compaction operation on a data lake, newly compacted \u201cright sized\u201d files are added before the small files are deleted. If you read the data lake while this operation is running, you will see duplicate data.
While writing to a data lake, a job might fail, which leaves behind partially written files. These files are corrupt, which means that the data lake cannot be read until the corrupt files are manually removed.
Users want to run a simple DML operation like deleting a few rows of data which require a few files to be rewritten. This operation renders the data lake unusable until it\u2019s done running.
Transactions are a key advantage of Delta Lake vs. data lakes. There are many other advantages, but proper transactions are necessary in production data environments.
"},{"location":"how-delta-lake-works/delta-lake-file-skipping/","title":"Delta Lake File Skipping","text":"
Delta tables store file-level metadata information, which allows for a powerful optimization called file skipping.
This page explains how Delta Lake implements file skipping, how to optimize your tables to maximize file skipping, and the benefits of file skipping.
Let\u2019s start by looking at the file-level metadata in Delta tables.
"},{"location":"how-delta-lake-works/delta-lake-file-skipping/#delta-lake-file-metadata","title":"Delta Lake file metadata","text":"
Delta Lake stores metadata about each file's min/max values in the table. Query engines can skip entire files when they don\u2019t contain data that\u2019s relevant to the query.
Suppose you have a Delta table with data stored in two files and has the following metadata.
filename min_name max_name min_age max_age\nfileA alice joy 12 46 \nfileB allan linda 34 78\n
Suppose you want to run the following query: select * from the_table where age < 20.
The engine only needs to read fileA to execute this query. fileB has a min_age of 34, so we know there aren\u2019t any rows of data with an age less than 20.
The benefit of file skipping depends on the query and the data layout of the Delta table. Some queries cannot take advantage of any file skipping. Here\u2019s an example query that does not benefit from file skipping: select * from the_table group by age.
Let\u2019s recreate this example with Polars to drive the point home.
+-------+-----+\n| name | age |\n| --- | --- |\n| str | i64 |\n+=============+\n| alice | 12 |\n+-------+-----+\n
Polars can use the Delta table metadata to skip the file that does not contain data relevant to the query.
"},{"location":"how-delta-lake-works/delta-lake-file-skipping/#how-delta-lake-implements-file-skipping","title":"How Delta Lake implements file skipping","text":"
Here\u2019s how engines execute queries on Delta tables:
Start by reading the transaction log to get the file paths, file sizes, and min/max value for each column
Parse the query and push down the predicates to skip files
Read the minimal subset of the files needed for the query
Some file formats don\u2019t allow for file skipping. For example, CSV files don\u2019t have file-level metadata, so query engines can\u2019t read a minimal subset of the data. The query engine has to check all the files, even if they don\u2019t contain any relevant data.
When data is in Parquet files, the query engine can open up all the files, read the footers, build the file-level metadata, and perform file skipping. Fetching metadata in each file is slower than grabbing the pre-built file-level metadata from the transaction log.
Now, let\u2019s see how to structure your tables to allow for more file skipping.
"},{"location":"how-delta-lake-works/delta-lake-file-skipping/#file-skipping-for-different-file-sizes","title":"File skipping for different file sizes","text":"
Delta tables store data in files. Smaller files allow for more file skipping compared to bigger files.
However, an excessive number of small files isn\u2019t good because it creates I/O overhead and slows down queries.
Your Delta tables should have files that are \u201cright-sized\u201d. For a table with 150 GB of data, 5 GB files would probably be too large, and 10 KB files would be too small. It\u2019s generally best to store data in files that are between 100 MB and 1 GB.
Delta Lake has an optimize function that performs small file compaction, so you don\u2019t need to program this logic yourself.
Now, let's investigate how to store data in files to maximize the file skipping opportunities.
"},{"location":"how-delta-lake-works/delta-lake-file-skipping/#how-to-maximize-file-skipping","title":"How to maximize file skipping","text":"
You can maximize file-skipping by colocating similar data in the same files.
Suppose you have a table with test scores and frequently run queries that filter based on the test_score column.
Suppose you want to run the following query: select * from exams where test_score > 90.
This query cannot skip files, given the current organization of the data. You can rearrange the data to colocate similar test scores in the same files to allow for file skipping. Here\u2019s the new layout:
The query (select * from exams where test_score > 90) can skip two of the three files with the new Delta table layout. The query engine only has to read fileF for this query.
Now, let\u2019s look at how file skipping works with string values.
"},{"location":"how-delta-lake-works/delta-lake-file-skipping/#how-file-skipping-works-with-strings","title":"How file skipping works with strings","text":"
File skipping is also effective when filtering on string values.
Suppose you have a table with person_name and country columns. There are millions of rows of data. Here are the first three rows of data:
The Delta table contains three files with the following metadata:
filename min_country max_country\nfileA albania mali\nfileB libia paraguay\nfileC oman zimbabwe\n
Suppose you want to run the following query: select * from some_people where country = 'austria'.
You only need to read the data in fileA to run this query. The min_country value for fileB and fileC are greater than \u201caustria\u201d, so we know those files don\u2019t contain any data relevant to the query.
File skipping can also be a robust optimization for string values. Now, let\u2019s see how file skipping works for partitioned tables.
"},{"location":"how-delta-lake-works/delta-lake-file-skipping/#file-skipping-for-partitioned-tables","title":"File skipping for partitioned tables","text":"
You can partition Delta tables for file skipping as well. Suppose we have the same data as in the previous section, but the table is partitioned by country.
Suppose you want to run the following query on this partitioned table: select * from some_partitioned_table where country = 'albania'.
You only need to read fileA and fileE to execute this query. Delta Lake provides the file-level partition metadata in the transaction log so that this query will run quickly.
Delta Lake allows for file skipping, which is a powerful performance optimization.
Delta Lake also provides built-in utilities to colocate data in the same files like partitioning, Z Ordering, and compaction to improve file skipping.
Delta Lake users need to know how to assess the tradeoffs of these techniques to optimize file skipping. Users also need to understand the most frequent query patterns of their tables to best allow for file maximal file skipping.
"},{"location":"integrations/delta-lake-arrow/","title":"Delta Lake Arrow Integrations","text":"
Delta Lake tables can be exposed as Arrow tables and Arrow datasets, which allows for interoperability with a variety of query engines.
This page shows you how to convert Delta tables to Arrow data structures and teaches you the difference between Arrow tables and Arrow datasets. Tables are \"eager\" and datasets are \"lazy\", which has important performance implications, keep reading to learn more!
"},{"location":"integrations/delta-lake-arrow/#delta-lake-to-arrow-dataset","title":"Delta Lake to Arrow Dataset","text":"
Delta tables can easily be exposed as Arrow datasets. This makes it easy for any query engine that can read Arrow datasets to read a Delta table.
Let's take a look at the h2o groupby dataset that contains 9 columns of data. Here are three representative rows of data:
Arrow datasets allow for the predicates to get pushed down to the query engine, so the query is executed quickly.
"},{"location":"integrations/delta-lake-arrow/#delta-lake-to-arrow-table","title":"Delta Lake to Arrow Table","text":"
You can also run the same query with DuckDB on an Arrow table:
quack = duckdb.arrow(table.to_pyarrow_table())\nquack.filter(\"id1 = 'id016' and v2 > 10\")\n
This returns the same result, but it runs slower.
"},{"location":"integrations/delta-lake-arrow/#difference-between-arrow-dataset-and-arrow-table","title":"Difference between Arrow Dataset and Arrow Table","text":"
Arrow Datasets are lazy and allow for full predicate pushdown unlike Arrow tables which are eagerly loaded into memory.
The previous DuckDB queries were run on a 1 billion row dataset that's roughly 50 GB when stored as an uncompressed CSV file. Here are the runtimes when the data is stored in a Delta table and the queries are executed on a 2021 Macbook M1 with 64 GB of RAM:
Arrow table: 17.1 seconds
Arrow dataset: 0.01 seconds
The query runs much faster on an Arrow dataset because the predicates can be pushed down to the query engine and lots of data can be skipped.
Arrow tables are eagerly materialized in memory and don't allow for the same amount of data skipping.
"},{"location":"integrations/delta-lake-arrow/#multiple-query-engines-can-query-arrow-datasets","title":"Multiple query engines can query Arrow Datasets","text":"
Other query engines like DataFusion can also query Arrow datasets, see the following example:
from datafusion import SessionContext\n\nctx = SessionContext()\nctx.register_dataset(\"my_dataset\", table.to_pyarrow_dataset())\nctx.sql(\"select * from my_dataset where v2 > 5\")\n
Delta tables can easily be exposed as Arrow tables/datasets.
Therefore any query engine that can read an Arrow table/dataset can also read a Delta table.
Arrow datasets allow for more predicates to be pushed down to the query engine, so they can perform better performance than Arrow tables.
"},{"location":"integrations/delta-lake-datafusion/","title":"Using Delta Lake with DataFusion","text":"
This page explains how to use Delta Lake with DataFusion.
Delta Lake offers DataFusion users better performance and more features compared to other formats like CSV or Parquet.
Delta Lake works well with the DataFusion Rust API and the DataFusion Python API. It's a great option for all DataFusion users.
Delta Lake also depends on DataFusion to implement SQL-related functionality under the hood. We will also discuss this dependency at the end of this guide in case you're interested in learning more about the symbiotic relationship between the two libraries.
"},{"location":"integrations/delta-lake-datafusion/#delta-lake-performance-benefits-for-datafusion-users","title":"Delta Lake performance benefits for DataFusion users","text":"
Let's run some DataFusion queries on a Parquet file and a Delta table with the same data to learn more about the performance benefits of Delta Lake.
Suppose you have the following dataset with 1 billion rows and 9 columns. Here are the first three rows of data:
ctx.sql(\"select id1, sum(v1) as v1 from my_delta_table where id1='id096' group by id1\")\n
That query takes 2.8 seconds to execute.
Let's register the same dataset as a Parquet table, run the same query, and compare the runtime difference.
Register the Parquet table and run the query:
path = \"G1_1e9_1e2_0_0.parquet\"\nctx.register_parquet(\"my_parquet_table\", path)\nctx.sql(\"select id1, sum(v1) as v1 from my_parquet_table where id1='id096' group by id1\")\n
This query takes 5.3 seconds to run.
Parquet stores data in row groups and DataFusion can intelligently skip row groups that don't contain relevant data, so the query is faster than a file format like CSV which doesn't support row group skipping.
Delta Lake stores file-level metadata information in the transaction log, so it can skip entire files when queries are executed. Delta Lake can skip entire files and then skip row groups within the individual files. This makes Delta Lake even faster than Parquet files, especially for larger datasets spread across many files.
"},{"location":"integrations/delta-lake-datafusion/#delta-lake-features-for-datafusion-users","title":"Delta Lake features for DataFusion users","text":"
Delta Lake also provides other features that are useful for DataFusion users like ACID transactions, concurrency protection, time travel, versioned data, and more.
"},{"location":"integrations/delta-lake-datafusion/#why-delta-lake-depends-on-datafusion","title":"Why Delta Lake depends on DataFusion","text":"
Delta Lake depends on DataFusion to provide some end-user features.
DataFusion is useful in providing SQL-related Delta Lake features. Some examples:
Update and merge are written in terms of SQL expressions.
Invariants and constraints are written in terms of SQL expressions.
Anytime we have to evaluate SQL, we need some sort of SQL engine. We use DataFusion for that.
Delta Lake is a great file format for DataFusion users.
Delta Lake also uses DataFusion to provide some end-user features.
DataFusion and Delta Lake have a wonderful symbiotic relationship and play very nicely with each other.
See this guide for more information on Delta Lake and PyArrow and why PyArrow Datasets are often a better option than PyArrow tables.
"},{"location":"integrations/delta-lake-pandas/","title":"Using Delta Lake with pandas","text":"
Delta Lake is a great storage system for pandas analyses. This page shows how it's easy to use Delta Lake with pandas, the unique features Delta Lake offers pandas users, and how Delta Lake can make your pandas analyses run faster.
Delta Lake is very easy to install for pandas analyses, just run pip install deltalake.
Delta Lake allows for performance optimizations, so pandas queries can run much faster than the query run on data stored in CSV or Parquet. See the following chart for the query runtime for the a Delta tables compared with CSV/Parquet.
Z Ordered Delta tables run this query much faster than when the data is stored in Parquet or CSV. Let's dive in deeper and see how Delta Lake makes pandas faster.
"},{"location":"integrations/delta-lake-pandas/#delta-lake-makes-pandas-queries-run-faster","title":"Delta Lake makes pandas queries run faster","text":"
There are a few reasons Delta Lake can make pandas queries run faster:
column pruning: only grabbing the columns relevant for a query
file skipping: only reading files with data for the query
row group skipping: only reading row groups with data for the query
Z ordering data: colocating similar data in the same files, so file skipping is more effective
Reading less data (fewer columns and/or fewer rows) is how Delta Lake makes pandas queries run faster.
Parquet allows for column pruning and row group skipping, but doesn't support file-level skipping or Z Ordering. CSV doesn't support any of these performance optimizations.
Let's take a look at a sample dataset and run a query to see the performance enhancements offered by Delta Lake.
Suppose you have a 1 billion row dataset with 9 columns, here are the first three rows of the dataset:
Parquet stores data in row groups and allows for skipping when the filters predicates are set. Run the Parquet query again with row group skipping enabled:
Here are the contents after the overwrite operation (version 2 of the Delta table):
+-------+----------+\n| num | letter |\n|-------+----------|\n| 8 | dd |\n| 9 | ee |\n+-------+----------+\n
Read in the Delta table and it will grab the latest version by default:
DeltaTable(\"tmp/some-table\").to_pandas()\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 11 | aa |\n| 22 | bb |\n+-------+----------+\n
You can easily time travel back to version 0 of the Delta table:
DeltaTable(\"tmp/some-table\", version=0).to_pandas()\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n| 3 | c |\n+-------+----------+\n
You can also time travel to version 1 of the Delta table:
DeltaTable(\"tmp/some-table\", version=1).to_pandas()\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n| 3 | c |\n| 8 | dd |\n| 9 | ee |\n+-------+----------+\n
Time travel is a powerful feature that pandas users cannot access with CSV or Parquet.
Delta tables only allow you to append DataFrame with matching schema by default. Suppose you have a DataFrame with num and animal columns, which is different from the Delta table that has columns with num and letter columns.
Try to append this DataFrame with a mismatched schema to the existing table:
This transaction will be rejected and will return the following error message:
ValueError: Schema of data does not match table schema\nData schema:\nnum: int64\nanimal: string\n-- schema metadata --\npandas: '{\"index_columns\": [{\"kind\": \"range\", \"name\": null, \"start\": 0, \"' + 474\nTable Schema:\nnum: int64\nletter: string\n
Schema enforcement protects your table from getting corrupted by appending data with mismatched schema. Parquet and CSV don't offer schema enforcement for pandas users.
"},{"location":"integrations/delta-lake-pandas/#overwriting-schema-of-table","title":"Overwriting schema of table","text":"
You can overwrite the table contents and schema by setting the overwrite_schema option. Here's how to overwrite the table contents:
Here are the contents of the table after the values and schema have been overwritten:
+-------+----------+\n| num | animal |\n|-------+----------|\n| 5 | cat |\n| 6 | dog |\n+-------+----------+\n
"},{"location":"integrations/delta-lake-pandas/#in-memory-vs-in-storage-data-changes","title":"In-memory vs. in-storage data changes","text":"
It's important to distinguish between data stored in-memory and data stored on disk when understanding the functionality offered by Delta Lake.
pandas loads data from storage (CSV, Parquet, or Delta Lake) into in-memory DataFrames.
pandas makes it easy to modify the data in memory, say update a column value. It's not easy to update a column value in storage systems like CSV or Parquet using pandas.
Delta Lake makes it easy for pandas users to update data in storage.
"},{"location":"integrations/delta-lake-pandas/#why-delta-lake-allows-for-faster-queries","title":"Why Delta Lake allows for faster queries","text":"
Delta tables store data in many files and metadata about the files in the transaction log. Delta Lake allows for certain queries to skip entire files, which makes pandas queries run much faster.
Delta Lake provides many features that make it an excellent format for pandas analyses:
performance optimizations make pandas queries run faster
data management features make pandas analyses more reliable
advanced features allow you to perform more complex pandas analyses
Python deltalake offers pandas users a better experience compared with CSV/Parquet.
"},{"location":"integrations/delta-lake-polars/","title":"Using Delta Lake with polars","text":"
This page explains why Delta Lake is a great storage system for Polars analyses.
You will learn how to create Delta tables with Polars, how to query Delta tables with Polars, and the unique advantages Delta Lake offers the Polars community.
Here are some amazing benefits that Delta Lake provides Polars users:
time travel
ACID transactions for reliable writes
better performance with file skipping
enhanced file skipping via Z Ordering
ability to rollback mistakes
and many, many more
Let's start by showing how to use Polars with Delta Lake, explore how Delta Lake can make Polars queries run faster, and then look at all the cool features Delta Lake offers Polars users.
"},{"location":"integrations/delta-lake-polars/#creating-a-delta-lake-table-with-polars","title":"Creating a Delta Lake table with Polars","text":"
Create a Polars DataFrame and write it out to a Delta table:
import polars as pl\n\ndf = pl.DataFrame({\"x\": [1, 2, 3]})\ndf.write_delta(\"tmp/bear_delta_lake\")\n
This dataset is 50GB when stored in an uncompressed CSV file. Let's run some queries on this dataset when it's stored in different file formats with Polars.
This section will show the runtime for a query when the data is stored in CSV, Parquet, and Delta Lake and explain why Delta tables are the fastest.
Start by running a query on an uncompressed CSV file with read_csv:
This query runs in 8.3 seconds. It's much faster because Polars is optimized to skip row groups in Parquet files that don't contain data that's relevant for the query.
This query runs in 7.2 seconds. Polars can run this query faster because it can inspect the Delta transaction log and skip entire files that don't contain relevant data before performing the ordinary Parquet row group skipping.
Finally run the query on the Delta table after it has been Z Ordered by id1:
This query runs in 3.5 seconds. The query on the Z Ordered Delta table is even faster because similar data has been co-located in the same files. This allows for even greater data skipping.
Polars can leverage file skipping to query Delta tables very quickly.
"},{"location":"integrations/delta-lake-polars/#why-polars-is-fast-with-delta-lake","title":"Why Polars is fast with Delta Lake","text":"
Delta tables consist of metadata in a transaction log and data stored in Parquet files.
When Polars queries a Delta table, it starts by consulting the transaction log to understand the metadata of each file in the Delta table. This allows for Polars to quickly identify which files should be skipped by the query.
CSV files don't contain any such metadata, so file skipping isn't an option. Polars can skip Parquet files based on metadata, but it needs to open up each file and read the metadata, which is slower that grabbing the file-level metadata directly from the transaction log.
Parquet doesn't allow users to easily Z Order the data and colocate similar data in the same row groups. The Z Order optimizations are only supported in Delta tables.
Delta Lake offers Polars users with unique performance optimizations.
"},{"location":"integrations/delta-lake-polars/#other-delta-lake-features-relevant-for-polars-users","title":"Other Delta Lake features relevant for Polars users","text":"
Here are the contents of the Delta table after the overwrite operation:
+-------+----------+\n| num | letter |\n|-------+----------|\n| 11 | aa |\n| 22 | bb |\n+-------+----------+\n
Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable.
dt = DeltaTable(\"tmp/some-table\", version=1)\n\n+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n| 3 | c |\n| 8 | dd |\n| 9 | ee |\n+-------+----------+\n
"},{"location":"usage/constraints/","title":"Adding a Constraint to a table","text":"
Check constraints are a way to enforce that only data that meets the constraint is allowed to be added to the table.
"},{"location":"usage/constraints/#add-the-constraint","title":"Add the Constraint","text":"Python Rust
DeltaTable
from deltalake import DeltaTable\n\ndt = DeltaTable(\"../rust/tests/data/simple_table\")\n\n# Check the schema before hand\nprint(dt.schema())\n# Add the constraint to the table.\ndt.alter.add_constraint({\"id_gt_0\": \"id > 0\"})\n
DeltaTable
let table = deltalake::open_table(\"../rust/tests/data/simple_table\").await?;\nlet ops = DeltaOps(table);\nops.with_constraint(\"id_gt_0\", \"id > 0\").await?;\n
After you have added the constraint to the table attempting to append data to the table that violates the constraint will instead throw an error.
"},{"location":"usage/constraints/#verify-the-constraint-by-trying-to-add-some-data","title":"Verify the constraint by trying to add some data","text":"Python Rust
from deltalake import write_deltalake\nimport pandas as pd\n\ndf = pd.DataFrame({\"id\": [-1]})\nwrite_deltalake(dt, df, mode=\"append\", engine=\"rust\")\n# _internal.DeltaProtocolError: Invariant violations: [\"Check or Invariant (id > 0) violated by value in row: [-1]\"]\n
Here are the contents of the Delta table after the delete operation has been performed:
+-------+----------+\n| num | letter |\n|-------+----------|\n| 1 | a |\n| 2 | b |\n+-------+----------+\n
"},{"location":"usage/examining-table/","title":"Examining a Table","text":""},{"location":"usage/examining-table/#metadata","title":"Metadata","text":"
The delta log maintains basic metadata about a table, including:
A unique id
A name, if provided
A description, if provided
The list of partitionColumns.
The created_time of the table
A map of table configuration. This includes fields such as delta.appendOnly, which if true indicates the table is not meant to have data deleted from it.
Get metadata from a table with the DeltaTable.metadata() method:
The schema for the table is also saved in the transaction log. It can either be retrieved in the Delta Lake form as Schema or as a PyArrow schema. The first allows you to introspect any column-level metadata stored in the schema, while the latter represents the schema the table will be loaded into.
Use DeltaTable.schema to retrieve the delta lake schema:
Depending on what system wrote the table, the delta table may have provenance information describing what operations were performed on the table, when, and by whom. This information is retained for 30 days by default, unless otherwise specified by the table configuration delta.logRetentionDuration.
Note
This information is not written by all writers and different writers may use different schemas to encode the actions. For Spark\\'s format, see: https://docs.delta.io/latest/delta-utility.html#history-schema
To view the available history, use DeltaTable.history:
from deltalake import DeltaTable\n\ndt = DeltaTable(\"../rust/tests/data/simple_table\")\ndt.history()\n
The active state for a delta table is determined by the Add actions, which provide the list of files that are part of the table and metadata about them, such as creation time, size, and statistics. You can get a data frame of the add actions data using DeltaTable.get_add_actions:
The deltalake project can be installed via pip for Python or Cargo for Rust.
"},{"location":"usage/installation/#install-delta-lake-for-python","title":"Install Delta Lake for Python","text":"
With pip:
pip install deltalake\n
With Conda:
conda install -c conda-forge deltalake\n
"},{"location":"usage/installation/#install-delta-lake-for-rust","title":"Install Delta Lake for Rust","text":"
With Cargo:
cargo add deltalake\n
"},{"location":"usage/installation/#run-delta-lake-and-pandas-in-a-jupyter-notebook","title":"Run Delta Lake and pandas in a Jupyter Notebook","text":"
You can easily run Delta Lake and pandas in a Jupyter notebook.
Create an environment file with the dependencies as follows:
Create a virtual environment with the dependencies:
conda env create -f deltalake-minimal.yml\n
Open the Jupyter notebook and run commands as follows:
"},{"location":"usage/loading-table/","title":"Loading a Delta Table","text":"
A DeltaTable represents the state of a delta table at a particular version. This includes which files are currently part of the table, the schema of the table, and other metadata such as creation time.
Python Rust
DeltaTable
from deltalake import DeltaTable\n\ndt = DeltaTable(\"../rust/tests/data/delta-0.2.0\")\nprint(f\"Version: {dt.version()}\")\nprint(f\"Files: {dt.files()}\")\n
DeltaTable
let table = deltalake::open_table(\"../rust/tests/data/simple_table\").await.unwrap();\nprintln!(\"Version: {}\", table.version());\nprintln!(\"Files: {}\", table.get_files());\n
Depending on your storage backend, you could use the storage_options parameter to provide some configuration. Configuration is defined for specific backends - s3 options, azure options, gcs options.
The configuration can also be provided via the environment, and the basic service provider is derived from the URL being used. We try to support many of the well-known formats to identify basic service properties.
S3:
s3://\\<bucket>/\\<path>
s3a://\\<bucket>/\\<path>
Azure:
az://\\<container>/\\<path>
adl://\\<container>/\\<path>
abfs://\\<container>/\\<path>
GCS:
gs://\\<bucket>/\\<path>
Alternatively, if you have a data catalog you can load it by reference to a database and table name. Currently only AWS Glue is supported.
For AWS Glue catalog, use AWS environment variables to authenticate.
While delta always needs its internal storage backend to work and be properly configured, in order to manage the delta log, it may sometime be advantageous - and is common practice in the arrow world - to customize the storage interface used for reading the bulk data.
deltalake will work with any storage compliant with pyarrow.fs.FileSystem, however the root of the filesystem has to be adjusted to point at the root of the Delta table. We can achieve this by wrapping the custom filesystem into a pyarrow.fs.SubTreeFileSystem.
Previous table versions may not exist if they have been vacuumed, in which case an exception will be thrown. See Vacuuming tables for more information.
Vacuuming a table will delete any files that have been marked for deletion. This may make some past versions of a table invalid, so this can break time travel. However, it will save storage space. Vacuum will retain files in a certain window, by default one week, so time travel will still work in shorter ranges.
Delta tables usually don't delete old files automatically, so vacuuming regularly is considered good practice, unless the table is only appended to.
Use DeltaTable.vacuum to perform the vacuum operation. Note that to prevent accidental deletion, the function performs a dry-run by default: it will only list the files to be deleted. Pass dry_run=False to actually delete files.
>>> dt = DeltaTable(\"../rust/tests/data/simple_table\")\n>>> dt.vacuum()\n['../rust/tests/data/simple_table/part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet',\n '../rust/tests/data/simple_table/part-00190-8ac0ae67-fb1d-461d-a3d3-8dc112766ff5-c000.snappy.parquet',\n '../rust/tests/data/simple_table/part-00164-bf40481c-4afd-4c02-befa-90f056c2d77a-c000.snappy.parquet',\n ...]\n>>> dt.vacuum(dry_run=False) # Don't run this unless you are sure!\n
This guide teaches you how to use Delta Lake. You will learn how to create Delta tables, run queries, perform DML operations, and optimize your tables.
It's easy to use Delta Lake with pandas, Polars, Rust, or any other PyArrow-like DataFrame library.
See the Spark Delta Lake documentation if you're using Delta Lake with Spark.
Delta tables can be queried in several ways. By loading as Arrow data or an Arrow dataset, they can be used by compatible engines such as Pandas and DuckDB. By passing on the list of files, they can be loaded into other engines such as Dask.
Delta tables are often larger than can fit into memory on a single computer, so this module provides ways to read only the parts of the data you need. Partition filters allow you to skip reading files that are part of irrelevant partitions. Only loading the columns required also saves memory. Finally, some methods allow reading tables batch-by-batch, allowing you to process the whole table while only having a portion loaded at any given time.
To load into Pandas or a PyArrow table use the DeltaTable.to_pandas and DeltaTable.to_pyarrow_table methods, respectively. Both of these support filtering partitions and selecting particular columns.
Converting to a PyArrow Dataset allows you to filter on columns other than partition columns and load the result as a stream of batches rather than a single table. Convert to a dataset using DeltaTable.to_pyarrow_dataset. Filters applied to datasets will use the partition values and file statistics from the Delta transaction log and push down any other filters to the scanning operation.
PyArrow datasets may also be passed to compatible query engines, such as DuckDB
>>> import duckdb\n>>> ex_data = duckdb.arrow(dataset)\n>>> ex_data.filter(\"year = 2021 and value > 4\").project(\"value\")\n---------------------\n-- Expression Tree --\n---------------------\nProjection [value]\n Filter [year=2021 AND value>4]\n arrow_scan(140409099470144, 4828104688, 1000000)\n\n---------------------\n-- Result Columns --\n---------------------\n- value (VARCHAR)\n\n---------------------\n-- Result Preview --\n---------------------\nvalue\nVARCHAR\n[ Rows: 3]\n6\n7\n5\n
Finally, you can always pass the list of file paths to an engine. For example, you can pass them to dask.dataframe.read_parquet:
"},{"location":"usage/optimize/delta-lake-z-order/","title":"Delta Lake Z Order","text":"
This section explains how to Z Order a Delta table.
Z Ordering colocates similar data in the same files, which allows for better file skipping and faster queries.
Suppose you have a table with first_name, age, and country columns.
If you Z Order the data by the country column, then individuals from the same country will be stored in the same files. When you subquently query the data for individuals from a given country, it will execute faster because more data can be skipped.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/","title":"Delta Lake small file compaction with optimize","text":"
This post shows you how to perform small file compaction with using the optimize method. This was added to the DeltaTable class in version 0.9.0. This command rearranges the small files into larger files which will reduce the number of files and speed up queries.
This is very helpful for workloads that append frequently. For example, if you have a table that is appended to every 10 minutes, after a year you will have 52,560 files in the table. If the table is partitioned by another dimension, you will have 52,560 files per partition; with just 100 unique values that's millions of files. By running optimize periodically, you can reduce the number of files in the table to a more manageable number.
Typically, you will run optimize less frequently than you append data. If possible, you might run optimize once you know you have finished writing to a particular partition. For example, on a table partitioned by date, you might append data every 10 minutes, but only run optimize once a day at the end of the day. This will ensure you don't need to compact the same data twice.
This section will also teach you about how to use vacuum to physically remove files from storage that are no longer needed. You\u2019ll often want vacuum after running optimize to remove the small files from storage once they\u2019ve been compacted into larger files.
Let\u2019s start with an example to explain these key concepts. All the code covered in this post is stored in this notebook in case you\u2019d like to follow along.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#create-a-delta-table-with-small-files","title":"Create a Delta table with small files","text":"
Let\u2019s start by creating a Delta table with a lot of small files so we can demonstrate the usefulness of the optimize command.
Start by writing a function that generates on thousand rows of random data given a timestamp.
def record_observations(date: datetime) -> pa.Table:\n \"\"\"Pulls data for a certain datetime\"\"\"\n nrows = 1000\n return pa.table(\n {\n \"date\": pa.array([date.date()] * nrows),\n \"timestamp\": pa.array([date] * nrows),\n \"value\": pc.random(nrows),\n }\n )\n
Let\u2019s run this function and observe the output:
Let\u2019s write 100 hours worth of data to the Delta table.
# Every hour starting at midnight on 2021-01-01\nhours_iter = (datetime(2021, 1, 1) + timedelta(hours=i) for i in itertools.count())\n\n# Write 100 hours worth of data\nfor timestamp in itertools.islice(hours_iter, 100):\n write_deltalake(\n \"observation_data\",\n record_observations(timestamp),\n partition_by=[\"date\"],\n mode=\"append\",\n )\n
This data was appended to the Delta table in 100 separate transactions, so the table will contain 100 transaction log entries and 100 data files. You can see the number of files with the files() method.
Each of these Parquet files are tiny - they\u2019re only 10 KB. Let\u2019s see how to compact these tiny files into larger files, which is more efficient for data queries.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#compact-small-files-in-the-delta-table-with-optimize","title":"Compact small files in the Delta table with optimize","text":"
Let\u2019s run the optimize command to compact the existing small files into larger files:
The optimize operation has added 5 new files and marked 100 exisitng files for removal (this is also known as \u201ctombstoning\u201d files). It has compacted the 100 tiny files into 5 larger files.
Let\u2019s append some more data to the Delta table and see how we can selectively run optimize on the new data that\u2019s added.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#handling-incremental-updates-with-optimize","title":"Handling incremental updates with optimize","text":"
Let\u2019s append another 24 hours of data to the Delta table:
for timestamp in itertools.islice(hours_iter, 24):\n write_deltalake(\n dt,\n record_observations(timestamp),\n partition_by=[\"date\"],\n mode=\"append\",\n )\n
We can use get_add_actions() to introspect the table state. We can see that 2021-01-06 has only a few hours of data so far, so we don't want to optimize that yet. But 2021-01-05 has all 24 hours of data, so it's ready to be optimized.
This optimize operation tombstones 21 small data files and adds one file with all the existing data properly condensed. Let\u2019s take a look a portion of the _delta_log/00000000000000000125.json file, which is the transaction log entry that corresponds with this incremental optimize command.
The trasaction log indicates that many files have been tombstoned and one file is added, as expected.
The Delta Lake optimize command \u201cremoves\u201d data by marking the data files as removed in the transaction log. The optimize command doesn\u2019t physically delete the Parquet file from storage. Optimize performs a \u201clogical remove\u201d not a \u201cphysical remove\u201d.
Delta Lake uses logical operations so you can time travel back to earlier versions of your data. You can vacuum your Delta table to physically remove Parquet files from storage if you don\u2019t need to time travel and don\u2019t want to pay to store the tombstoned files.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#vacuuming-after-optimizing","title":"Vacuuming after optimizing","text":"
The vacuum command deletes all files from storage that are marked for removal in the transaction log and older than the retention period which is 7 days by default.
It\u2019s normally a good idea to have a retention period of at least 7 days. For purposes of this example, we will set the retention period to zero, just so you can see how the files get removed from storage. Adjusting the retention period in this manner isn\u2019t recommended for production use cases.
All the partitions only contain a single file now, except for the date=2021-01-06 partition that has not been compacted yet.
An entire partition won\u2019t necessarily get compacted to a single data file when optimize is run. Each partition has data files that are condensed to the target file size.
"},{"location":"usage/optimize/small-file-compaction-with-optimize/#what-causes-the-small-file-problem","title":"What causes the small file problem?","text":"
Delta tables can accumulate small files for a variety of reasons:
User error: users can accidentally write files that are too small. Users should sometimes repartition in memory before writing to disk to avoid appending files that are too small.
Frequent appends: systems that append more often tend to append more smaller files. A pipeline that appends every minute will generally generate ten times as many small files compared to a system that appends every ten minutes.
Appending to partitioned data lakes with high cardinality columns can also cause small files. If you append every hour to a table that\u2019s partitioned on a column with 1,000 distinct values, then every append could create 1,000 new files. Partitioning by date avoids this problem because the data isn\u2019t split up across partitions in this manner.
This page showed you how to create a Delta table with many small files, compact the small files into larger files with optimize, and remove the tombstoned files from storage with vacuum.
You also learned about how to incrementally optimize partitioned Delta tables, so you only compact newly added data.
An excessive number of small files slows down Delta table queries, so periodic compaction is important. Make sure to properly maintain your Delta tables, so performance does not degrade over time.
For overwrites and appends, use write_deltalake. If the table does not already exist, it will be created. The data parameter will accept a Pandas DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches.
Note: write_deltalake accepts a Pandas DataFrame, but will convert it to a Arrow table before writing. See caveats in pyarrow:python/pandas.
By default, writes create a new table and error if it already exists. This is controlled by the mode parameter, which mirrors the behavior of Spark's pyspark.sql.DataFrameWriter.saveAsTable DataFrame method. To overwrite pass in mode='overwrite' and to append pass in mode='append':
write_deltalake will raise ValueError if the schema of the data passed to it differs from the existing table's schema. If you wish to alter the schema as part of an overwrite pass in overwrite_schema=True.
"},{"location":"usage/writing/#overwriting-a-partition","title":"Overwriting a partition","text":"
You can overwrite a specific partition by using mode=\"overwrite\" together with partition_filters. This will remove all files within the matching partition and insert your data as new files. This can only be done on one partition at a time. All of the input data must belong to that partition or else the method will raise an error.
DynamoDB is the only available locking provider at the moment in delta-rs. To enable DynamoDB as the locking provider, you need to set the AWS_S3_LOCKING_PROVIDER to 'dynamodb' as a storage_options or as an environment variable.
Additionally, you must create a DynamoDB table with the name delta_log so that it can be automatically recognized by delta-rs. Alternatively, you can use a table name of your choice, but you must set the DELTA_DYNAMO_TABLE_NAME variable to match your chosen table name. The required schema for the DynamoDB table is as follows:
This locking mechanism is compatible with the one used by Apache Spark. The tablePath property, denoting the root url of the delta table itself, is part of the primary key, and all writers intending to write to the same table must match this property precisely. In Spark, S3 URLs are prefixed with s3a://, and a table in delta-rs must be configured accordingly.
The following code allows creating the necessary table from the AWS cli:
You can find additional information in the delta-rs-documentation, which also includes recommendations on configuring a time-to-live (TTL) for the table to avoid growing the table indefinitely.
"},{"location":"usage/writing/writing-to-s3-with-locking-provider/#enable-unsafe-writes-in-s3-opt-in","title":"Enable unsafe writes in S3 (opt-in)","text":"
If for some reason you don't want to use dynamodb as your locking mechanism you can choose to set the AWS_S3_ALLOW_UNSAFE_RENAME variable to true in order to enable S3 unsafe writes.
Deleting rows from a Delta Lake t
-
+
diff --git a/usage/examining-table/index.html b/usage/examining-table/index.html
index 98e7b2b19e..11ce31cb17 100644
--- a/usage/examining-table/index.html
+++ b/usage/examining-table/index.html
@@ -777,21 +777,85 @@
-