Skip to content

Commit

Permalink
changes airtable IDs to dlthub's CI account, adds schema hints to sil…
Browse files Browse the repository at this point in the history
…ence warnings and improve documentation
  • Loading branch information
willi-mueller committed Aug 25, 2023
1 parent 3bff642 commit a257313
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 21 deletions.
32 changes: 28 additions & 4 deletions sources/airtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,33 @@ This [dlt source](https://dlthub.com/docs/general-usage/source) creates a [dlt r

## Supported write dispositions
This connector supports the write disposition `replace`, i.e. it does a [full load](https://dlthub.com/docs/general-usage/full-loading) on every invocation.
We do not support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) because a *base* can contain only [up to 50k records in the most expensive plan](https://support.airtable.com/docs/airtable-plans).

If resource consumption for data loading becomes a concern in practice [request](https://github.com/dlt-hub/verified-sources/issues/new/choose) the `append` loading method.
To use support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) there are two possibilities:

### Parametrize the `pipeline.run` method

```python
event_base = airtable_source(
base_id="app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
)
load_info = pipeline.run(event_base, write_disposition="replace")
```

## Customize the resource using the `apply_hints` method

This approach further allows to [adjust the schema](https://dlthub.com/docs/general-usage/resource#adjust-schema)
```python
event_base = airtable_source(
base_id="app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
)
event_base.resources["💰 Budget"].apply_hints(
write_disposition="merge",
columns={"Item": {"name": "Item", "data_type": "text"}},
)
load_info = pipeline.run(event_base)
```


## Initialize the pipeline
Expand All @@ -38,8 +62,8 @@ If resource consumption for data loading becomes a concern in practice [request]
dlt init airtable duckdb
```

Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or
any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).
Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or any of the other
[destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).


## Add credentials
Expand Down
55 changes: 39 additions & 16 deletions sources/airtable_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,64 @@
def load_entire_base(pipeline: dlt.Pipeline) -> None:
# Loads all tables inside a given base.
# Find the base ID starting with "app". See https://support.airtable.com/docs/finding-airtable-ids
all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62")
all_event_planning_tables = airtable_source(base_id="app7RlqvdoOmJm9XR")

# typing columns to silence warnings
all_event_planning_tables.resources["📆 Schedule"].apply_hints(
columns={"Activity": {"name": "Activity", "data_type": "text"}}
)
all_event_planning_tables.resources["🎤 Speakers"].apply_hints(
columns={"Name": {"name": "Name", "data_type": "text"}}
)
all_event_planning_tables.resources["🪑 Attendees"].apply_hints(
columns={"Name": {"name": "Name", "data_type": "text"}}
)
all_event_planning_tables.resources["💰 Budget"].apply_hints(
columns={"Item": {"name": "Item", "data_type": "text"}}
)

load_info = pipeline.run(all_event_planning_tables, write_disposition="replace")
print(load_info)


def load_select_tables_from_base_by_id(pipeline: dlt.Pipeline) -> None:
# Loads specific table IDs.
# Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
# See example: https://airtable.com/appctwIznRf5lqe62/tblPjXnwd3V2RWgJS/
# See example: https://airtable.com/app7RlqvdoOmJm9XR/tblKHM5s3AujfSbAH
airtables = airtable_source(
base_id="appctwIznRf5lqe62",
table_ids=["tblPjXnwd3V2RWgJS", "tbltdCacZQPxI7fV0"],
base_id="app7RlqvdoOmJm9XR",
table_ids=["tblKHM5s3AujfSbAH", "tbloBrS8PnoO63aMP"],
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)


def load_select_tables_from_base_by_name(pipeline: dlt.Pipeline) -> None:
# Loads specific table names.
# Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
# See example: https://airtable.com/appctwIznRf5lqe62/tblOe4fjtZfnvqAHd/
budget_table = airtable_source(
base_id="appctwIznRf5lqe62",
# See example: https://airtable.com/app7RlqvdoOmJm9XR/tblJCTXfjwOETmvy2/
event_base = airtable_source(
base_id="app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
)
load_info = pipeline.run(budget_table, write_disposition="replace")
event_base.resources["💰 Budget"].apply_hints(
primary_key="Item", columns={"Item": {"name": "Item", "data_type": "text"}}
)
load_info = pipeline.run(event_base, write_disposition="replace")
print(load_info)


def load_table_for_ci(pipeline: dlt.Pipeline) -> None:
# Setup for CI of dlt hub
questionnaire_table = airtable_source(
def load_and_customize_write_disposition(pipeline: dlt.Pipeline) -> None:
questionnaire = airtable_source(
base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"]
)
load_info = pipeline.run(questionnaire_table, write_disposition="replace")
questionnaire.resources["Sheet1"].apply_hints(
primary_key="Name",
columns={"Name": {"name": "Name", "data_type": "text"}},
write_disposition="merge",
)
load_info = pipeline.run(questionnaire)
print(load_info)


Expand All @@ -49,7 +72,7 @@ def load_table_for_ci(pipeline: dlt.Pipeline) -> None:
pipeline_name="airtable", destination="duckdb", dataset_name="airtable_data"
)

load_table_for_ci(pipeline)
# load_select_tables_from_base_by_id(pipeline)
# load_select_tables_from_base_by_name(pipeline)
# load_entire_base(pipeline)
load_entire_base(pipeline)
load_select_tables_from_base_by_id(pipeline)
load_select_tables_from_base_by_name(pipeline)
load_and_customize_write_disposition(pipeline)
2 changes: 1 addition & 1 deletion tests/airtable/test_airtable_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def run_single_table_assertions(pipeline, questionnaire_table):

@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_load_all_tables_in_base(destination_name: str) -> None:
all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62")
all_event_planning_tables = airtable_source(base_id="app7RlqvdoOmJm9XR")
pipeline = make_pipeline(destination_name)
load_info = pipeline.run(all_event_planning_tables, write_disposition="replace")
assert_load_info(load_info)
Expand Down

0 comments on commit a257313

Please sign in to comment.