0.10.0 (2023-12-18)
Breaking Changes
-
Upgrade
etl-entities
from v1 to v2 (#172).This implies that
HWM
classes are now have different internal structure than they used to.etl-entities < 2 etl-entities >= 2 from etl_entities.old_hwm import IntHWM as OldIntHWM from etl_entities.source import Column, Table from etl_entities.process import Process hwm = OldIntHWM( process=Process(name="myprocess", task="abc", dag="cde", host="myhost"), source=Table(name="schema.table", instance="postgres://host:5432/db"), column=Column(name="col1"), value=123, )
from etl_entities.hwm import ColumnIntHWM hwm = ColumnIntHWM( name="some_unique_name", description="any value you want", source="schema.table", expression="col1", value=123, )
Breaking change: If you used HWM classes from
etl_entities
module, you should rewrite your code to make it compatible with new version.More details
HWM
classes used by previous onETL versions were moved frometl_entities
toetl_entities.old_hwm
submodule. They are here for compatibility reasons, but are planned to be removed inetl-entities
v3 release.- New
HWM
classes have flat structure instead of nested. - New
HWM
classes have mandatoryname
attribute (it was known asqualified_name
before). - Type aliases used while serializing and deserializing
HWM
objects todict
representation were changed too:int
->column_int
.
To make migration simpler, you can use new method:
old_hwm = OldIntHWM(...) new_hwm = old_hwm.as_new_hwm()
Which automatically converts all fields from old structure to new one, including
qualified_name
->name
. -
Breaking changes:
- Methods
BaseHWMStore.get()
andBaseHWMStore.save()
were renamed toget_hwm()
andset_hwm()
. - They now can be used only with new HWM classes from
etl_entities.hwm
, old HWM classes are not supported.
If you used them in your code, please update it accordingly.
- Methods
-
YAMLHWMStore CANNOT read files created by older onETL versions (0.9.x or older).
Upgrade procedure
# pip install onetl==0.9.5 # Get qualified_name for HWM # Option 1. HWM is built manually from etl_entities import IntHWM, FileListHWM from etl_entities.source import Column, Table, RemoteFolder from etl_entities.process import Process # for column HWM old_column_hwm = IntHWM( process=Process(name="myprocess", task="abc", dag="cde", host="myhost"), source=Table(name="schema.table", instance="postgres://host:5432/db"), column=Column(name="col1"), ) qualified_name = old_column_hwm.qualified_name # "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost" # for file HWM old_file_hwm = FileListHWM( process=Process(name="myprocess", task="abc", dag="cde", host="myhost"), source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"), ) qualified_name = old_file_hwm.qualified_name # "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost" # Option 2. HWM is generated automatically (by DBReader/FileDownloader) # See onETL logs and search for string like qualified_name = '...' qualified_name = "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost" # Get .yml file path by qualified_name import os from pathlib import PurePosixPath from onetl.hwm.store import YAMLHWMStore # here you should pass the same arguments as used on production, if any yaml_hwm_store = YAMLHWMStore() hwm_path = yaml_hwm_store.get_file_path(qualified_name) print(hwm_path) # for column HWM # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml') # for file HWM # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml') # Read raw .yml file content from yaml import safe_load, dump raw_old_hwm_items = safe_load(hwm_path.read_text()) print(raw_old_hwm_items) # for column HWM # [ # { # "column": { "name": "col1", "partition": {} }, # "modified_time": "2023-12-18T10: 39: 47.377378", # "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" }, # "source": { "instance": "postgres: //host:5432/db", "name": "schema.table" }, # "type": "int", # "value": "123", # }, # ] # for file HWM # [ # { # "modified_time": "2023-12-18T11:15:36.478462", # "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" }, # "source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" }, # "type": "file_list", # "value": ["file1.txt", "file2.txt"], # }, # ] # Convert file content to new structure, compatible with onETL 0.10.x raw_new_hwm_items = [] for old_hwm in raw_old_hwm_items: new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]} if "column" in old_hwm: new_hwm["expression"] = old_hwm["column"]["name"] new_hwm["entity"] = old_hwm["source"]["name"] old_hwm.pop("process", None) if old_hwm["type"] == "int": new_hwm["type"] = "column_int" new_hwm["value"] = old_hwm["value"] elif old_hwm["type"] == "date": new_hwm["type"] = "column_date" new_hwm["value"] = old_hwm["value"] elif old_hwm["type"] == "datetime": new_hwm["type"] = "column_datetime" new_hwm["value"] = old_hwm["value"] elif old_hwm["type"] == "file_list": new_hwm["type"] = "file_list" new_hwm["value"] = [ os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path)) for path in old_hwm["value"] ] else: raise ValueError("WAT?") raw_new_hwm_items.append(new_hwm) print(raw_new_hwm_items) # for column HWM # [ # { # "name": "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost", # "modified_time": "2023-12-18T10:39:47.377378", # "expression": "col1", # "source": "schema.table", # "type": "column_int", # "value": 123, # }, # ] # for file HWM # [ # { # "name": "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost", # "modified_time": "2023-12-18T11:15:36.478462", # "entity": "/absolute/path", # "type": "file_list", # "value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"], # }, # ] # Save file with new content with open(hwm_path, "w") as file: dump(raw_new_hwm_items, file) # Stop Python interpreter and update onETL # pip install onetl==0.10.0 # Check that new .yml file can be read from onetl.hwm.store import YAMLHWMStore qualified_name = ... # here you should pass the same arguments as used on production, if any yaml_hwm_store = YAMLHWMStore() yaml_hwm_store.get_hwm(qualified_name) # for column HWM # ColumnIntHWM( # name='col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost', # description='', # entity='schema.table', # value=123, # expression='col1', # modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378), # ) # for file HWM # FileListHWM( # name='file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost', # description='', # entity=AbsolutePath('/absolute/path'), # value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}), # expression=None, # modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462) # ) # That's all!
But most of users use other HWM store implementations which do not have such issues.
-
Several classes and functions were moved from
onetl
toetl_entities
:onETL 0.9.x
and olderonETL 0.10.x
and newerfrom onetl.hwm.store import ( detect_hwm_store, BaseHWMStore, HWMStoreClassRegistry, register_hwm_store_class, HWMStoreManager, MemoryHWMStore, )
from etl_entities.hwm_store import ( detect_hwm_store, BaseHWMStore, HWMStoreClassRegistry, register_hwm_store_class, HWMStoreManager, MemoryHWMStore, )
They still can be imported from old module, but this is deprecated and will be removed in v1.0.0 release.
-
Change the way of passing
HWM
toDBReader
andFileDownloader
classes:onETL 0.9.x
and olderonETL 0.10.x
and newerreader = DBReader( connection=..., source=..., hwm_column="col1", )
reader = DBReader( connection=..., source=..., hwm=DBReader.AutoDetectHWM( # name is mandatory now! name="my_unique_hwm_name", expression="col1", ), )
reader = DBReader( connection=..., source=..., hwm_column=( "col1", "cast(col1 as date)", ), )
reader = DBReader( connection=..., source=..., hwm=DBReader.AutoDetectHWM( # name is mandatory now! name="my_unique_hwm_name", expression="cast(col1 as date)", ), )
downloader = FileDownloader( connection=..., source_path=..., target_path=..., hwm_type="file_list", )
downloader = FileDownloader( connection=..., source_path=..., target_path=..., hwm=FileListHWM( # name is mandatory now! name="another_unique_hwm_name", ), )
New HWM classes have mandatory
name
attribute which should be passed explicitly, instead of generating if automatically under the hood.Automatic
name
generation using the oldDBReader.hwm_column
/FileDownloader.hwm_type
syntax is still supported, but will be removed in v1.0.0 release. (#179) -
Performance of read Incremental and Batch strategies has been drastically improved. (#182).
Before and after in details
DBReader.run()
+ incremental/batch strategy behavior in versions 0.9.x and older:- Get table schema by making query
SELECT * FROM table WHERE 1=0
(ifDBReader.columns
has*
) - Expand
*
to real column names from table, add herehwm_column
, remove duplicates (as some RDBMS does not allow that). - Create dataframe from query like
SELECT hwm_expression AS hwm_column, ...other table columns... FROM table WHERE hwm_expression > prev_hwm.value
. - Determine HWM class using dataframe schema:
df.schema[hwm_column].dataType
. - Determine x HWM column value using Spark:
df.select(max(hwm_column)).collect()
. - Use
max(hwm_column)
as next HWM value, and save it to HWM Store. - Return dataframe to user.
This was far from ideal:
-
Dataframe content (all rows or just changed ones) was loaded from the source to Spark only to get min/max values of specific column.
-
Step of fetching table schema and then substituting column names in the next query caused some unexpected errors.
For example, source contains columns with mixed name case, like
"CamelColumn"
or"spaced column"
.Column names were not escaped during query generation, leading to queries that cannot be executed by database.
So users have to explicitly pass column names
DBReader
, wrapping columns with mixed naming with"
:reader = DBReader( connection=..., source=..., columns=[ # passing '*' here leads to wrong SQL query generation "normal_column", '"CamelColumn"', '"spaced column"', ..., ], )
-
Using
DBReader
withIncrementalStrategy
could lead to reading rows already read before.Dataframe was created from query with WHERE clause like
hwm.expression > prev_hwm.value
, nothwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value
.So if new rows appeared in the source after HWM value is determined, they can be read by accessing dataframe content (because Spark dataframes are lazy), leading to inconsistencies between HWM value and dataframe content.
This may lead to issues then
DBReader.run()
read some data, updated HWM value, and next call ofDBReader.run()
will read rows that were already read in previous run.
DBReader.run()
+ incremental/batch strategy behavior in versions 0.10.x and newer:- Detect type of HWM expression:
SELECT hwm.expression FROM table WHERE 1=0
. - Determine corresponding Spark type
df.schema[0]
and when determine matching HWM class (ifDReader.AutoDetectHWM
is used). - Get min/max values by querying the source:
SELECT MAX(hwm.expression) FROM table WHERE hwm.expression >= prev_hwm.value
. - Use
max(hwm.expression)
as next HWM value, and save it to HWM Store. - Create dataframe from query
SELECT ... table columns ... FROM table WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value
, baking new HWM value into the query. - Return dataframe to user.
Improvements:
- Allow source to calculate min/max instead of loading everything to Spark. This should be faster on large amounts of data (up to x2), because we do not transfer all the data from the source to Spark. This can be even faster if source have indexes for HWM column.
- Columns list is passed to source as-is, without any resolving on
DBReader
side. So you can passDBReader(columns=["*"])
to read tables with mixed columns naming. - Restrict dataframe content to always match HWM values, which leads to never reading the same row twice.
Breaking change: HWM column is not being implicitly added to dataframe. It was a part of
SELECT
clause, but now it is mentioned only inWHERE
clause.So if you had code like this, you have to rewrite it:
onETL 0.9.x
and olderonETL 0.10.x
and newerreader = DBReader( connection=..., source=..., columns=[ "col1", "col2", ], hwm_column="hwm_col", ) df = reader.run() # hwm_column value is in the dataframe assert df.columns == ["col1", "col2", "hwm_col"]
reader = DBReader( connection=..., source=..., columns=[ "col1", "col2", # add hwm_column explicitly "hwm_col", ], hwm_column="hwm_col", ) df = reader.run() # if columns list is not updated, # this fill fail assert df.columns == ["col1", "col2", "hwm_col"]
reader = DBReader( connection=..., source=..., columns=[ "col1", "col2", ], hwm_column=( "hwm_col", "cast(hwm_col as int)", ), ) df = reader.run() # hwm_expression value is in the dataframe assert df.columns == ["col1", "col2", "hwm_col"]
reader = DBReader( connection=..., source=..., columns=[ "col1", "col2", # add hwm_expression explicitly "cast(hwm_col as int) as hwm_col", ], hwm_column=( "hwm_col", "cast(hwm_col as int)", ), ) df = reader.run() # if columns list is not updated, # this fill fail assert df.columns == ["col1", "col2", "hwm_col"]
But most users just use
columns=["*"]
anyway, they won't see any changes. - Get table schema by making query
-
FileDownloader.run()
now updates HWM in HWM Store not after each file is being successfully downloaded, but after all files were handled.This is because:
- FileDownloader can be used with
DownloadOptions(workers=N)
, which could lead to race condition - one thread can save to HWM store one HWM value when another thread will save different value. - FileDownloader can download hundreds and thousands of files, and issuing a request to HWM Store for each file could potentially DDoS HWM Store. (#189)
There is a exception handler which tries to save HWM to HWM store if download process was interrupted. But if it was interrupted by force, like sending
SIGKILL
event, HWM will not be saved to HWM store, so some already downloaded files may be downloaded again next time.But unexpected process kill may produce other negative impact, like some file will be downloaded partially, so this is an expected behavior.
- FileDownloader can be used with
Features
- Add Python 3.12 compatibility. (#167)
Excel
file format now can be used with Spark 3.5.0. (#187)SnapshotBatchStagy
andIncrementalBatchStrategy
does no raise exceptions if source does not contain any data. Instead they stop at first iteration and return empty dataframe. (#188)- Cache result of
connection.check()
in high-level classes likeDBReader
,FileDownloader
and so on. This makes logs less verbose. (#190)
Bug Fixes
- Fix
@slot
and@hook
decorators returning methods with missing arguments in signature (Pylance, VS Code). (#183) - Kafka connector documentation said that it does support reading topic data incrementally by passing
group.id
orgroupIdPrefix
. Actually, this is not true, because Spark does not send information to Kafka which messages were consumed. So currently users can only read the whole topic, no incremental reads are supported.