Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema contract not being respected #2127

Open
ubiquitousbyte opened this issue Dec 9, 2024 · 6 comments
Open

Schema contract not being respected #2127

ubiquitousbyte opened this issue Dec 9, 2024 · 6 comments

Comments

@ubiquitousbyte
Copy link

ubiquitousbyte commented Dec 9, 2024

dlt version

1.4.0

Describe the problem

I am pulling data from an external API (Jira) and want to expose a fixed schema in my destination. For this purpose, I have defined the columns I want dlt to load at the destination and I pass them into my pipeline.

It's important to note that I want only the columns defined in the fixed schema to be loaded in the destination. For this reason, I set discard_value on the schema contract for columns. However, the pipeline also loads other columns coming from the resource that are not part of the fixed column list I have provided.

Expected behavior

What I am looking for is the freeze-and-trim strategy defined by @sh-rp in #135.

Steps to reproduce

Create a source that generates a multitude of columns. Define a column set and a schema contract that should only import the predefined column set. dlt imports all of the columns coming from the source, even though it should only import the fixed column set.

Example:

@dlt.resource(name="some_test_resource", primary_key="id", write_disposition="replace")
def test_resource():
    for i in range(0, 10):
        print("YIELDING")
        yield {"id": i, "name": f"test_{i}", "column_i_dont_want": True}


columns_i_want = [
    {"name": "id", "data_type": "text", "nullable": False, "primary_key": True},
    {"name": "name", "data_type": "text", "nullable": True},
]

pipeline = dlt.pipeline(pipeline_name="test-pipeline")
pipeline.run(
    test_resource,
    columns=columns_i_want,
    schema_contract={
        "columns": "discard_value",
        "data_type": "freeze",
    },
    destination="duckdb",
)

Connect to duckdb and describe the schema:

describe test_pipeline_dataset.some_test_resource;
┌────────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│    column_name     │ column_type │  null   │   key   │ default │  extra  │
│      varchar       │   varchar   │ varchar │ varchar │ varchar │ varchar │
├────────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ id                 │ VARCHAR     │ NO      │         │         │         │
│ name               │ VARCHAR     │ YES     │         │         │         │
│ column_i_dont_want │ BOOLEAN     │ YES     │         │         │         │
│ _dlt_load_id       │ VARCHAR     │ NO      │         │         │         │
│ _dlt_id            │ VARCHAR     │ NO      │         │         │         │
└────────────────────┴─────────────┴─────────┴─────────┴─────────┴─────────┘

The column_i_dont_want column should not be present in the schema.

Operating system

Linux, macOS

Runtime environment

Local

Python version

3.11

dlt data source

A custom source for JIra.

dlt destination

No response

Other deployment details

No response

Additional information

No response

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 9, 2024

@ubiquitousbyte i think the columns should be defined on the resource, not as an argument to the pipeline run method. try that out and let me know what you find. Also I would use dev_mode=True so you can make sure that no old schema that has this column already is used.

@ubiquitousbyte
Copy link
Author

ubiquitousbyte commented Dec 9, 2024

@sh-rp dev_mode=True was a good tip, thanks! I was cleaning up the state manually via dlt pipeline test-pipeline drop --drop-all.
Unfortunately, defining the columns on the resource does not change anything in the output.

This example

@dlt.resource(
    name="some_test_resource",
    primary_key="id",
    write_disposition="replace",
    columns={
        "id": {"data_type": "text", "nullable": False, "primary_key": True},
        "name": {"data_type": "text", "nullable": True},
    },
    schema_contract={"columns": "discard_value", "data_type": "freeze"},
)
def test_resource():
    for i in range(0, 10):
        yield {
            "id": i,
            "name": f"test_{i}",
            "column_i_dont_want": True,
        }


pipeline = dlt.pipeline(pipeline_name="test-pipeline", dev_mode=True)
pipeline.run(
    test_resource,
    destination="duckdb",
)

still creates the column_i_dont_want column in the destination.

D describe test_pipeline_dataset_20241209065623.some_test_resource;
┌────────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│    column_name     │ column_type │  null   │   key   │ default │  extra  │
│      varchar       │   varchar   │ varchar │ varchar │ varchar │ varchar │
├────────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ id                 │ VARCHAR     │ NO      │         │         │         │
│ name               │ VARCHAR     │ YES     │         │         │         │
│ column_i_dont_want │ BOOLEAN     │ YES     │         │         │         │
│ _dlt_load_id       │ VARCHAR     │ NO      │         │         │         │
│ _dlt_id            │ VARCHAR     │ NO      │         │         │         │
└────────────────────┴─────────────┴─────────┴─────────┴─────────┴─────────┘

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 10, 2024

Hey @ubiquitousbyte, I just tested this out. Our implementation will let the first run of the pipeline pass because this table does not exist in the schema yet. So if you run your resource with the correct fields on the first run, after that the schema contract will go into effect:


    @dlt.resource(
        name="some_test_resource",
        primary_key="id",
        write_disposition="replace",
        columns={
            "id": {"data_type": "text", "nullable": False, "primary_key": True},
            "name": {"data_type": "text", "nullable": True},
        },
        schema_contract={"columns": "discard_value", "data_type": "freeze"},
    )
    def test_resource():
        for i in range(0, 10):
            yield {
                "id": i,
                "name": f"test_{i}",
            }


    @dlt.resource(
        name="some_test_resource",
    )
    def test_resource_other_data():
        for i in range(0, 10):
            yield {
                "id": i,
                "name": f"test_{i}",
                "column_i_dont_want": True,
            }



    pipeline = dlt.pipeline(pipeline_name="test-pipeline", dev_mode=True)
    pipeline.run(
        test_resource,
        destination="duckdb",
    )
    pipeline.run(
        test_resource_other_data,
        destination="duckdb",
    )

This is by design to make the contracts work properly, we don't consider the hints to be a full schema. The docs for this are here: https://dlthub.com/docs/general-usage/schema-contracts#contracts-on-new-tables

@rudolfix
Copy link
Collaborator

@sh-rp wouldn't it work with Pydantic model instead? I think we consider it a full table schema

@ubiquitousbyte
Copy link
Author

ubiquitousbyte commented Dec 11, 2024

Shouldn't there be an option to use a fixed schema, irrespective of how it was defined? This would render utility functions for column removal unnecessary, i.e this part of the documentation would not be needed anymore.

We use a YAML configuration file where we keep the schema. That file is loaded in and injected as column hints into the dlt resource. Operational personnel touch that file and should have full control over the schema that is ingested into the destination. I would like them to stick to the configuration file rather than modifying Pydantic models.

@sh-rp
Copy link
Collaborator

sh-rp commented Dec 16, 2024

@ubiquitousbyte, the column hints on the resource decorator can also be used to provide a part of the schema, for example if you know that your incoming data has one decimal field that you need to set the precision on, so we always consider these column hints as non complete and do not apply a full contract. As @rudolfix suggests pydantic models would work, or you work with an import schema to achieve what you want: https://dlthub.com/docs/walkthroughs/adjust-a-schema

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Todo
Development

No branches or pull requests

3 participants