Skip to content

Releases: MobileTeleSystems/onetl

0.10.1 (2024-02-05)

05 Feb 09:16
3214e75
Compare
Choose a tag to compare

Features

  • Add support of Incremental Strategies for Kafka connection:

    reader = DBReader(
        connection=Kafka(...),
        source="topic_name",
        hwm=AutoDetectHWM(name="some_hwm_name", expression="offset"),
    )
    
    with IncrementalStrategy():
        df = reader.run()

    This lets you resume reading data from a Kafka topic starting at the last committed offset from your previous run. (#202)

  • Add has_data, raise_if_no_data methods to DBReader class. (#203)

  • Updade VMware Greenplum connector from 2.1.4 to 2.3.0. This implies:

  • Greenplum.get_packages() method now accepts optional arg package_version which allows to override version of Greenplum connector package. (#208)

0.10.0 (2023-12-18)

17 Dec 21:02
f5244e5
Compare
Choose a tag to compare

Breaking Changes

  • Upgrade etl-entities from v1 to v2 (#172).

    This implies that HWM classes are now have different internal structure than they used to.

    etl-entities < 2 etl-entities >= 2
    from etl_entities.old_hwm import IntHWM as OldIntHWM
    from etl_entities.source import Column, Table
    from etl_entities.process import Process
    
    hwm = OldIntHWM(
        process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
        source=Table(name="schema.table", instance="postgres://host:5432/db"),
        column=Column(name="col1"),
        value=123,
    )
    from etl_entities.hwm import ColumnIntHWM
    
    hwm = ColumnIntHWM(
        name="some_unique_name",
        description="any value you want",
        source="schema.table",
        expression="col1",
        value=123,
    )

    Breaking change: If you used HWM classes from etl_entities module, you should rewrite your code to make it compatible with new version.

    More details

    • HWM classes used by previous onETL versions were moved from etl_entities to etl_entities.old_hwm submodule. They are here for compatibility reasons, but are planned to be removed in etl-entities v3 release.
    • New HWM classes have flat structure instead of nested.
    • New HWM classes have mandatory name attribute (it was known as qualified_name before).
    • Type aliases used while serializing and deserializing HWM objects to dict representation were changed too: int -> column_int.

    To make migration simpler, you can use new method:

    old_hwm = OldIntHWM(...)
    new_hwm = old_hwm.as_new_hwm()

    Which automatically converts all fields from old structure to new one, including qualified_name -> name.

  • Breaking changes:

    • Methods BaseHWMStore.get() and BaseHWMStore.save() were renamed to get_hwm() and set_hwm().
    • They now can be used only with new HWM classes from etl_entities.hwm, old HWM classes are not supported.

    If you used them in your code, please update it accordingly.

  • YAMLHWMStore CANNOT read files created by older onETL versions (0.9.x or older).

    Upgrade procedure
    # pip install onetl==0.9.5
    
    # Get qualified_name for HWM
    
    
    # Option 1. HWM is built manually
    from etl_entities import IntHWM, FileListHWM
    from etl_entities.source import Column, Table, RemoteFolder
    from etl_entities.process import Process
    
    # for column HWM
    old_column_hwm = IntHWM(
        process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
        source=Table(name="schema.table", instance="postgres://host:5432/db"),
        column=Column(name="col1"),
    )
    qualified_name = old_column_hwm.qualified_name
    # "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
    
    # for file HWM
    old_file_hwm = FileListHWM(
        process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
        source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"),
    )
    qualified_name = old_file_hwm.qualified_name
    # "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost"
    
    
    # Option 2. HWM is generated automatically (by DBReader/FileDownloader)
    # See onETL logs and search for string like qualified_name = '...'
    
    qualified_name = "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
    
    
    # Get .yml file path by qualified_name
    
    import os
    from pathlib import PurePosixPath
    from onetl.hwm.store import YAMLHWMStore
    
    # here you should pass the same arguments as used on production, if any
    yaml_hwm_store = YAMLHWMStore()
    hwm_path = yaml_hwm_store.get_file_path(qualified_name)
    print(hwm_path)
    
    # for column HWM
    # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml')
    
    # for file HWM
    # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml')
    
    
    # Read raw .yml file content
    
    from yaml import safe_load, dump
    
    raw_old_hwm_items = safe_load(hwm_path.read_text())
    print(raw_old_hwm_items)
    
    # for column HWM
    # [
    #   {
    #     "column": { "name": "col1", "partition": {} },
    #     "modified_time": "2023-12-18T10: 39: 47.377378",
    #     "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
    #     "source": { "instance": "postgres: //host:5432/db", "name": "schema.table" },
    #     "type": "int",
    #     "value": "123",
    #   },
    # ]
    
    # for file HWM
    # [
    #   {
    #     "modified_time": "2023-12-18T11:15:36.478462",
    #     "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
    #     "source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" },
    #     "type": "file_list",
    #     "value": ["file1.txt", "file2.txt"],
    #   },
    # ]
    
    
    # Convert file content to new structure, compatible with onETL 0.10.x
    raw_new_hwm_items = []
    for old_hwm in raw_old_hwm_items:
        new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]}
    
        if "column" in old_hwm:
            new_hwm["expression"] = old_hwm["column"]["name"]
        new_hwm["entity"] = old_hwm["source"]["name"]
        old_hwm.pop("process", None)
    
        if old_hwm["type"] == "int":
            new_hwm["type"] = "column_int"
            new_hwm["value"] = old_hwm["value"]
    
        elif old_hwm["type"] == "date":
            new_hwm["type"] = "column_date"
            new_hwm["value"] = old_hwm["value"]
    
        elif old_hwm["type"] == "datetime":
            new_hwm["type"] = "column_datetime"
            new_hwm["value"] = old_hwm["value"]
    
        elif old_hwm["type"] == "file_list":
            new_hwm["type"] = "file_list"
            new_hwm["value"] = [
                os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path))
                for path in old_hwm["value"]
            ]
    
        else:
            raise ValueError("WAT?")
    
        raw_new_hwm_items.append(new_hwm)
    
    
    print(raw_new_hwm_items)
    # for column HWM
    # [
    #   {
    #     "name": "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost",
    #     "modified_time": "2023-12-18T10:39:47.377378",
    #     "expression": "col1",
    #     "source": "schema.table",
    #     "type": "column_int",
    #     "value": 123,
    #   },
    # ]
    
    # for file HWM
    # [
    #   {
    #     "name": "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost",
    #     "modified_time": "2023-12-18T11:15:36.478462",
    #     "entity": "/absolute/path",
    #     "type": "file_list",
    #     "value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"],
    #   },
    # ]
    
    
    # Save file with new content
    with open(hwm_path, "w") as file:
        dump(raw_new_hwm_items, file)
    
    
    # Stop Python interpreter and update onETL
    # pip install onetl==0.10.0
    # Check that new .yml file can be read
    
    from onetl.hwm.store import YAMLHWMStore
    
    qualified_name = ...
    
    # here you should pass the same arguments as used on production, if any
    yaml_hwm_store = YAMLHWMStore()
    yaml_hwm_store.get_hwm(qualified_name)
    
    # for column HWM
    # ColumnIntHWM(
    #     name='col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost',
    #     description='',
    #     entity='schema.table',
    #     value=123,
    #     expression='col1',
    #     modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378),
    # )
    
    # for file HWM
    # FileListHWM(
    #     name='file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost',
    #     description='',
    #     entity=AbsolutePath('/absolute/path'),
    #     value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}),
    #     expression=None,
    #     modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462)
    # )
    
    
    # That's all!

    But most of users use other HWM store implementations which do not have such issues.

  • Several classes and functions were moved from onetl to etl_entities:

    onETL 0.9.x and older onETL 0.10.x and newer
    from onetl.hwm.store import (
        detect_hwm_store,
        BaseHWMStore,
        HWMStoreClassRegistry,
        register_hwm_store_class,
        HWMStoreManager,
        MemoryHWMStore,
    )
    from etl_entities.hwm_store import (
        detect_hwm_store,
        BaseHWMStore,
        HWMStoreClassRegistry,
        register_hwm_store_class,
        HWMStoreManager,
        MemoryHWMStore,
    )

    They s...

Read more

0.9.5 (2023-10-10)

10 Oct 11:38
17ed2de
Compare
Choose a tag to compare

Features

  • Add XML file format support. (#163)
  • Tested compatibility with Spark 3.5.0. MongoDB and Excel are not supported yet, but other packages do. (#159)

Improvements

  • Add check to all DB and FileDF connections that Spark session is alive. (#164)

Bug Fixes

  • Fix Hive.check() behavior when Hive Metastore is not available. (#164)

0.9.4 (2023-09-26)

26 Sep 12:34
6944c4f
Compare
Choose a tag to compare

Features

  • Add Excel file format support. (#148)
  • Add Samba file connection. It is now possible to download and upload files to Samba shared folders using FileDownloader/FileUploader. (#150)
  • Add if_exists="ignore" and error to Hive.WriteOptions (#143)
  • Add if_exists="ignore" and error to JDBC.WriteOptions (#144)
  • Add if_exists="ignore" and error to MongoDB.WriteOptions (#145)

Improvements

  • Add documentation about different ways of passing packages to Spark session. (#151)

  • Drastically improve Greenplum documentation:

    • Added information about network ports, grants, pg_hba.conf and so on.
    • Added interaction schemas for reading, writing and executing statements in Greenplum.
    • Added recommendations about reading data from views and JOIN results from Greenplum. (#154)
  • Make .fetch and .execute methods of DB connections thread-safe. Each thread works with its own connection. (#156)

  • Call .close() on FileConnection then it is removed by garbage collector. (#156)

Bug Fixes

  • Fix issue while stopping Python interpreter calls JDBCMixin.close() and prints exceptions to log. (#156)

0.9.3 (2023-09-06)

06 Sep 15:03
39c497b
Compare
Choose a tag to compare

Bug Fixes

  • Fix documentation build

0.9.2 (2023-09-06)

06 Sep 12:57
3b17ce5
Compare
Choose a tag to compare

Features

  • Add if_exists="ignore" and error to Greenplum.WriteOptions (#142)

Improvements

  • Improve validation messages while writing dataframe to Kafka. (#131)
  • Improve documentation:
    • Add notes about reading and writing to database connections documentation
    • Add notes about executing statements in JDBC and Greenplum connections

Bug Fixes

  • Fixed validation of headers column is written to Kafka with default Kafka.WriteOptions() - default value was False, but instead of raising an exception, column value was just ignored. (#131)
  • Fix reading data from Oracle with partitioningMode="range" without explicitly set lowerBound / upperBound. (#133)
  • Update Kafka documentation with SSLProtocol usage. (#136)
  • Raise exception if someone tries to read data from Kafka topic which does not exist. (#138)
  • Allow to pass Kafka topics with name like some.topic.name to DBReader. Same for MongoDB collections. (#139)

0.9.1 (2023-08-17)

17 Aug 18:57
5f0f86a
Compare
Choose a tag to compare

Bug Fixes

  • Fixed bug then number of threads created by FileDownloader / FileUploader / FileMover was not min(workers, len(files)), but max(workers, len(files)). leading to create too much workers on large files list.

0.9.0 (2023-08-17)

17 Aug 12:47
b448bbd
Compare
Choose a tag to compare

Breaking Changes

  • Rename methods:

    • DBConnection.read_df -> DBConnection.read_source_as_df
    • DBConnection.write_df -> DBConnection.write_df_to_target (#66)
  • Rename classes:

    • HDFS.slots -> HDFS.Slots
    • Hive.slots -> Hive.Slots

    Old names are left intact, but will be removed in v1.0.0 (#103)

  • Rename options to make them self-explanatory:

    • Hive.WriteOptions(mode="append") -> Hive.WriteOptions(if_exists="append")
    • Hive.WriteOptions(mode="overwrite_table") -> Hive.WriteOptions(if_exists="replace_entire_table")
    • Hive.WriteOptions(mode="overwrite_partitions") -> Hive.WriteOptions(if_exists="replace_overlapping_partitions")
    • JDBC.WriteOptions(mode="append") -> JDBC.WriteOptions(if_exists="append")
    • JDBC.WriteOptions(mode="overwrite") -> JDBC.WriteOptions(if_exists="replace_entire_table")
    • Greenplum.WriteOptions(mode="append") -> Greenplum.WriteOptions(if_exists="append")
    • Greenplum.WriteOptions(mode="overwrite") -> Greenplum.WriteOptions(if_exists="replace_entire_table")
    • MongoDB.WriteOptions(mode="append") -> Greenplum.WriteOptions(if_exists="append")
    • MongoDB.WriteOptions(mode="overwrite") -> Greenplum.WriteOptions(if_exists="replace_entire_collection")
    • FileDownloader.Options(mode="error") -> FileDownloader.Options(if_exists="error")
    • FileDownloader.Options(mode="ignore") -> FileDownloader.Options(if_exists="ignore")
    • FileDownloader.Options(mode="overwrite") -> FileDownloader.Options(if_exists="replace_file")
    • FileDownloader.Options(mode="delete_all") -> FileDownloader.Options(if_exists="replace_entire_directory")
    • FileUploader.Options(mode="error") -> FileUploader.Options(if_exists="error")
    • FileUploader.Options(mode="ignore") -> FileUploader.Options(if_exists="ignore")
    • FileUploader.Options(mode="overwrite") -> FileUploader.Options(if_exists="replace_file")
    • FileUploader.Options(mode="delete_all") -> FileUploader.Options(if_exists="replace_entire_directory")
    • FileMover.Options(mode="error") -> FileMover.Options(if_exists="error")
    • FileMover.Options(mode="ignore") -> FileMover.Options(if_exists="ignore")
    • FileMover.Options(mode="overwrite") -> FileMover.Options(if_exists="replace_file")
    • FileMover.Options(mode="delete_all") -> FileMover.Options(if_exists="replace_entire_directory")

    Old names are left intact, but will be removed in v1.0.0 (#108)

  • Rename onetl.log.disable_clients_logging() to onetl.log.setup_clients_logging(). (#120)

Features

  • Add new methods returning Maven packages for specific connection class:

    • Clickhouse.get_packages()
    • MySQL.get_packages()
    • Postgres.get_packages()
    • Teradata.get_packages()
    • MSSQL.get_packages(java_version="8")
    • Oracle.get_packages(java_version="8")
    • Greenplum.get_packages(scala_version="2.12")
    • MongoDB.get_packages(scala_version="2.12")
    • Kafka.get_packages(spark_version="3.4.1", scala_version="2.12")

    Deprecate old syntax:

    • Clickhouse.package
    • MySQL.package
    • Postgres.package
    • Teradata.package
    • MSSQL.package
    • Oracle.package
    • Greenplum.package_spark_2_3
    • Greenplum.package_spark_2_4
    • Greenplum.package_spark_3_2
    • MongoDB.package_spark_3_2
    • MongoDB.package_spark_3_3
    • MongoDB.package_spark_3_4 (#87)
  • Allow to set client modules log level in onetl.log.setup_clients_logging().

    Allow to enable underlying client modules logging in onetl.log.setup_logging() by providing additional argument enable_clients=True. This is useful for debug. (#120)

  • Added support for reading and writing data to Kafka topics.

    For these operations, new classes were added.

    Currently, Kafka does not support incremental read strategies, this will be implemented in future releases.

  • Added support for reading files as Spark DataFrame and saving DataFrame as Files.

    For these operations, new classes were added.

    FileDFConnections:

    High-level classes:

    • FileDFReader (#73)
    • FileDFWriter (#81)

    File formats:

Improvements

  • Remove redundant checks for driver availability in Greenplum and MongoDB connections. (#67)
  • Check of Java class availability moved from .check() method to connection constructor. (#97)

0.8.1 (2023-07-10)

10 Jul 09:08
4ac9e3b
Compare
Choose a tag to compare

Features

  • Add @slot decorator to public methods of:

    • DBConnection
    • FileConnection
    • DBReader
    • DBWriter
    • FileDownloader
    • FileUploader
    • FileMover (#49)
  • Add workers field to FileDownloader / FileUploader / FileMover. Options classes.

    This allows to speed up all file operations using parallel threads. (#57)

Improvements

  • Add documentation for HWM store .get and .save methods. (#49)
  • Improve Readme:
    • Move Quick start section from documentation
    • Add Non-goals section
    • Fix code blocks indentation (#50)
  • Improve Contributing guide:
    • Move Develop section from Readme
    • Move docs/changelog/README.rst content
    • Add Limitations section
    • Add instruction of creating a fork and building documentation (#50)
  • Remove duplicated checks for source file existence in FileDownloader / FileMover. (#57)
  • Update default logging format to include thread name. (#57)

Bug Fixes

  • Fix S3.list_dir('/') returns empty list on latest Minio version. (#58)

0.8.0 (2023-05-31)

31 May 12:34
9c6b44c
Compare
Choose a tag to compare

Breaking Changes

  • Rename methods of FileConnection classes:

    • get_directoryresolve_dir
    • get_fileresolve_file
    • listdirlist_dir
    • mkdircreate_dir
    • rmdirremove_dir

    New naming should be more consistent.
    They were undocumented in previous versions, but someone could use these methods, so this is a breaking change. (#36)

  • Deprecate onetl.core.FileFilter class, replace it with new classes:

    • onetl.file.filter.Glob
    • onetl.file.filter.Regexp
    • onetl.file.filter.ExcludeDir

    Old class will be removed in v1.0.0. (#43)

  • Deprecate onetl.core.FileLimit class, replace it with new class onetl.file.limit.MaxFilesCount.

    Old class will be removed in v1.0.0. (#44)

  • Change behavior of BaseFileLimit.reset method.

    This method should now return self instead of None. Return value could be the same limit object or a copy, this is an implementation detail. (#44)

  • Replaced FileDownloader.filter and .limit with new options .filters and .limits:

    FileDownloader(
        ...,
        filter=FileFilter(glob="*.txt", exclude_dir="/path"),
        limit=FileLimit(count_limit=10),
    )
    FileDownloader(
        ...,
        filters=[Glob("*.txt"), ExcludeDir("/path")],
        limits=[MaxFilesCount(10)],
    )

    This allows to developers to implement their own filter and limit classes, and combine them with existing ones.

    Old behavior still supported, but it will be removed in v1.0.0. (#45)

  • Removed default value for FileDownloader.limits, user should pass limits list explicitly. (#45)

  • Move classes from module onetl.core:

    from onetl.core import DBReader
    from onetl.core import DBWriter
    from onetl.core import FileDownloader
    from onetl.core import FileUploader
    from onetl.core import FileResult
    from onetl.core import FileSet

    with new modules onetl.db and onetl.file:

    from onetl.db import DBReader
    from onetl.db import DBWriter
    
    from onetl.file import FileDownloader
    from onetl.file import FileUploader
    
    # not a public interface
    from onetl.file.file_result import FileResult
    from onetl.file.file_set import FileSet

    Imports from old module onetl.core still can be used, but marked as deprecated. Module will be removed in v1.0.0. (#46)

Features

  • Add rename_dir method.

    Method was added to following connections:

    • FTP
    • FTPS
    • HDFS
    • SFTP
    • WebDAV

    It allows to rename/move directory to new path with all its content.

    S3 does not have directories, so there is no such method in that class. (#40)

  • Add onetl.file.FileMover class.

    It allows to move files between directories of remote file system. Signature is almost the same as in FileDownloader, but without HWM support. (#42)

Improvements

  • Document all public methods in FileConnection classes:

    • download_file
    • resolve_dir
    • resolve_file
    • get_stat
    • is_dir
    • is_file
    • list_dir
    • create_dir
    • path_exists
    • remove_file
    • rename_file
    • remove_dir
    • upload_file
    • walk (#39)
  • Update documentation of check method of all connections - add usage example and document result type. (#39)

  • Add new exception type FileSizeMismatchError.

    Methods connection.download_file and connection.upload_file now raise new exception type instead of RuntimeError, if target file after download/upload has different size than source. (#39)

  • Add new exception type DirectoryExistsError - it is raised if target directory already exists. (#40)

  • Improved FileDownloader / FileUploader exception logging.

    If DEBUG logging is enabled, print exception with stacktrace instead of printing only exception message. (#42)

  • Updated documentation of FileUploader.

    • Class does not support read strategies, added note to documentation.
    • Added examples of using run method with explicit files list passing, both absolute and relative paths.
    • Fix outdated imports and class names in examples. (#42)
  • Updated documentation of DownloadResult class - fix outdated imports and class names. (#42)

  • Improved file filters documentation section.

    Document interface class onetl.base.BaseFileFilter and function match_all_filters. (#43)

  • Improved file limits documentation section.

    Document interface class onetl.base.BaseFileLimit and functions limits_stop_at / limits_reached / reset_limits. (#44)

  • Added changelog.

    Changelog is generated from separated news files using towncrier. (#47)

Misc

  • Improved CI workflow for tests.
    • If developer haven't changed source core of a specific connector or its dependencies, run tests only against maximum supported versions of Spark, Python, Java and db/file server.
    • If developed made some changes in a specific connector, or in core classes, or in dependencies, run tests for both minimal and maximum versions.
    • Once a week run all aganst for minimal and latest versions to detect breaking changes in dependencies
    • Minimal tested Spark version is 2.3.1 instead on 2.4.8. (#32)

Full Changelog: 0.7.2...0.8.0