Skip to content

Commit

Permalink
adds airtable connector
Browse files Browse the repository at this point in the history
- creates source for a base which returns resources for airtables
- supports whitelisting airtables by names or ids
- implements test suite calling live API
- README for Airtable source

improves typing
  • Loading branch information
willi-mueller committed Aug 29, 2023
1 parent 123f681 commit def5819
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 0 deletions.
78 changes: 78 additions & 0 deletions sources/airtable/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
title: Airtable
description: dlt source for Airtable.com API
keywords: [airtable api, airtable source, airtable, base]
---


# Airtable

[Airtable](https://airtable.com/) is a spreadsheet-database hybrid, with the features of a database but applied to a spreadsheet. It is also marketed as a low‒code platform to build next‒gen apps. Spreadsheets are called *airtables* and are grouped into *bases*.
Records can link to each other.

This [dlt source](https://dlthub.com/docs/general-usage/source) creates a [dlt resource](https://dlthub.com/docs/general-usage/resource) for every airtable and loads it into the destination.


## Supported methods to load airtables

1. identify the *base* you want to load from. The ID starts with "app". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids)
2. identify the *airtables* you want to load. You can identify in three ways:

| filter | Description |
| ---------------- | -----------------------------------------------------------|
| table_names | retrieves *airtables* from a given *base* for a list of user-defined mutable names of tables |
| table_ids | retrieves *airtables* from a given *base* for a list of immutable IDs defined by airtable.com at the creation time of the *airtable*. IDs start with "tbl". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids) |
| empty filter | retrieves all *airtables* from a given *base* |


## Supported write dispositions
This connector supports the write disposition `replace`, i.e. it does a [full load](https://dlthub.com/docs/general-usage/full-loading) on every invocation.
We do not support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) because a *base* can contain only [up to 50k records in the most expensive plan](https://support.airtable.com/docs/airtable-plans).

If resource consumption for data loading becomes a concern in practice [request](https://github.com/dlt-hub/verified-sources/issues/new/choose) the `append` loading method.


## Initialize the pipeline

```bash
dlt init airtable duckdb
```

Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or
any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).


## Add credentials

1. [Obtain a Personal Access Token](https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens).
If you're on an enterprise plan you can create an [Airtable Service Account](https://dlthub.com/docs/dlt-ecosystem/verified-sources/chess).
Place the key into your `secrets.toml` like this:
```
[sources.airtable]
access_token = "pat***"
```
2. Follow the instructions in the [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/)
document to add credentials for your chosen destination.


## Run the pipeline

1. Install the necessary dependencies by running the following command:

```bash
pip install -r requirements.txt
```

2. Now the pipeline can be run by using the command:

```bash
python airtable_pipeline.py
```

3. To make sure that everything is loaded as expected, use the command:

```bash
dlt pipeline airtable_pipeline show
```

💡 To explore additional customizations for this pipeline, we recommend referring to the example pipelines in `airtable_pipeline.py`.
68 changes: 68 additions & 0 deletions sources/airtable/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Source that loads tables form Airtable.
Supports whitelisting of tables or loading of all tables from a specified base.
Supported write dispositions: replace
"""
from typing import Optional, Iterable, Iterator, List, Dict, Any

import dlt
from dlt.extract.source import DltResource
from dlt.common.typing import TDataItem

import pyairtable


@dlt.source
def airtable_source(
base_id: str,
table_ids: Optional[List[str]] = None,
table_names: Optional[List[str]] = None,
access_token: str = dlt.secrets.value,
) -> Iterable[DltResource]:
"""
Represents tables for a single Airtable base.
Args:
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_ids (Optional[List[str]]): A list of table ids to load. By default, all tables in the schema are loaded. Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. Table names can change and thus filtering on names is less reliable than on ids.
access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
"""
api = pyairtable.Api(access_token)
all_tables_url = api.build_url(f"meta/bases/{base_id}/tables")
tables = api.request(method="GET", url=all_tables_url).get("tables")
for t in tables:
if table_ids:
if t.get("id") in table_ids:
yield airtable_resource(access_token, base_id, t)
elif table_names:
if t.get("name") in table_names:
yield airtable_resource(access_token, base_id, t)
else:
yield airtable_resource(access_token, base_id, t)


def airtable_resource(
access_token: str,
base_id: str,
table: Dict[str, Any],
) -> DltResource:
"""
Represents a single airtable.
Args:
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
table (TDataItem): The iterable created by pyairtable representing the data of an airtable
"""
primary_key_id = table["primaryFieldId"]
primary_key_field = [
field for field in table["fields"] if field["id"] == primary_key_id
][0]
table_name: str = table["name"]
primary_key: List[str] = [primary_key_field["name"]]
air_table = pyairtable.Table(access_token, base_id, table["id"])
air_table_generator: Iterator[List[Any]] = air_table.iterate()
return dlt.resource(
air_table_generator,
name=table_name,
primary_key=primary_key,
write_disposition="replace", # using a typed parameter crashes the typing
)
2 changes: 2 additions & 0 deletions sources/airtable/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pyairtable~=2.0
dlt>=0.3.8
55 changes: 55 additions & 0 deletions sources/airtable_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import dlt
from airtable import airtable_source


def load_entire_base(pipeline: dlt.Pipeline) -> None:
# Loads all tables inside a given base.
# Find the base ID starting with "app". See https://support.airtable.com/docs/finding-airtable-ids
all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62")
load_info = pipeline.run(all_event_planning_tables, write_disposition="replace")
print(load_info)


def load_select_tables_from_base_by_id(pipeline: dlt.Pipeline) -> None:
# Loads specific table IDs.
# Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
# See example: https://airtable.com/appctwIznRf5lqe62/tblPjXnwd3V2RWgJS/
airtables = airtable_source(
base_id="appctwIznRf5lqe62",
table_ids=["tblPjXnwd3V2RWgJS", "tbltdCacZQPxI7fV0"],
)
load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)


def load_select_tables_from_base_by_name(pipeline: dlt.Pipeline) -> None:
# Loads specific table names.
# Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
# See example: https://airtable.com/appctwIznRf5lqe62/tblOe4fjtZfnvqAHd/
budget_table = airtable_source(
base_id="appctwIznRf5lqe62",
table_names=["💰 Budget"],
)
load_info = pipeline.run(budget_table, write_disposition="replace")
print(load_info)


def load_table_for_ci(pipeline: dlt.Pipeline) -> None:
# Setup for CI of dlt hub
questionnaire_table = airtable_source(
base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"]
)
load_info = pipeline.run(questionnaire_table, write_disposition="replace")
print(load_info)


if __name__ == "__main__":
# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name="airtable", destination="duckdb", dataset_name="airtable_data"
)

load_table_for_ci(pipeline)
# load_select_tables_from_base_by_id(pipeline)
# load_select_tables_from_base_by_name(pipeline)
# load_entire_base(pipeline)
Empty file added tests/airtable/__init__.py
Empty file.
86 changes: 86 additions & 0 deletions tests/airtable/test_airtable_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from tests.utils import (
ALL_DESTINATIONS,
assert_load_info,
load_table_counts,
assert_query_data,
)
import pytest
import dlt
from dlt.common.utils import uniq_id
from sources.airtable import airtable_source


def make_pipeline(destination_name: str) -> dlt.Pipeline:
return dlt.pipeline(
pipeline_name="airtable_test",
destination=destination_name,
dataset_name="airtable_test_data" + uniq_id(),
)


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_load_table_by_id(destination_name: str) -> None:
pipeline = make_pipeline(destination_name)
questionnaire_table = airtable_source(
base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"]
)
run_single_table_assertions(pipeline, questionnaire_table)


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_load_table_by_name(destination_name: str) -> None:
pipeline = make_pipeline(destination_name)
questionnaire_table = airtable_source(
base_id="appcChDyP0pZeC76v", table_names=["Sheet1"]
)
run_single_table_assertions(pipeline, questionnaire_table)


def run_single_table_assertions(pipeline, questionnaire_table):
row_count: int = 22
airtable_name: str = "Sheet1"

load_info = pipeline.run(questionnaire_table, write_disposition="replace")

assert_load_info(load_info)

loaded_table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
assert loaded_table_names == [airtable_name.lower()]

counts_of_rows = load_table_counts(pipeline, *loaded_table_names)
assert counts_of_rows[airtable_name.lower()] == row_count

query_string = (
"select fields__question_1 from sheet1 where fields__name = 'Tina Quinn'"
)
expected_table_data = ["Maybe"]
assert_query_data(pipeline, query_string, expected_table_data)


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_load_all_tables_in_base(destination_name: str) -> None:
all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62")
pipeline = make_pipeline(destination_name)
load_info = pipeline.run(all_event_planning_tables, write_disposition="replace")
assert_load_info(load_info)

loaded_table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
expected_table_names = [
"_attendees",
"_budget",
"_schedule",
"_speakers",
"_schedule__fields__speaker_sx",
"_speakers__fields__speaking_at",
]
assert set(loaded_table_names) == set(expected_table_names)

counts_of_rows = load_table_counts(pipeline, *loaded_table_names)
assert counts_of_rows == {
"_attendees": 11,
"_budget": 7,
"_schedule": 19,
"_speakers": 9,
"_speakers__fields__speaking_at": 16,
"_schedule__fields__speaker_sx": 16,
}

0 comments on commit def5819

Please sign in to comment.