From def5819a0b49ea7cb061ea1eebba6a0016a61f12 Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 25 Aug 2023 15:29:42 +0530 Subject: [PATCH] adds airtable connector - creates source for a base which returns resources for airtables - supports whitelisting airtables by names or ids - implements test suite calling live API - README for Airtable source improves typing --- sources/airtable/README.md | 78 +++++++++++++++++++++++ sources/airtable/__init__.py | 68 ++++++++++++++++++++ sources/airtable/requirements.txt | 2 + sources/airtable_pipeline.py | 55 ++++++++++++++++ tests/airtable/__init__.py | 0 tests/airtable/test_airtable_source.py | 86 ++++++++++++++++++++++++++ 6 files changed, 289 insertions(+) create mode 100644 sources/airtable/README.md create mode 100644 sources/airtable/__init__.py create mode 100644 sources/airtable/requirements.txt create mode 100644 sources/airtable_pipeline.py create mode 100644 tests/airtable/__init__.py create mode 100644 tests/airtable/test_airtable_source.py diff --git a/sources/airtable/README.md b/sources/airtable/README.md new file mode 100644 index 000000000..d88b433e7 --- /dev/null +++ b/sources/airtable/README.md @@ -0,0 +1,78 @@ +--- +title: Airtable +description: dlt source for Airtable.com API +keywords: [airtable api, airtable source, airtable, base] +--- + + +# Airtable + +[Airtable](https://airtable.com/) is a spreadsheet-database hybrid, with the features of a database but applied to a spreadsheet. It is also marketed as a low‒code platform to build next‒gen apps. Spreadsheets are called *airtables* and are grouped into *bases*. +Records can link to each other. + +This [dlt source](https://dlthub.com/docs/general-usage/source) creates a [dlt resource](https://dlthub.com/docs/general-usage/resource) for every airtable and loads it into the destination. + + +## Supported methods to load airtables + +1. identify the *base* you want to load from. The ID starts with "app". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids) +2. identify the *airtables* you want to load. You can identify in three ways: + +| filter | Description | +| ---------------- | -----------------------------------------------------------| +| table_names | retrieves *airtables* from a given *base* for a list of user-defined mutable names of tables | +| table_ids | retrieves *airtables* from a given *base* for a list of immutable IDs defined by airtable.com at the creation time of the *airtable*. IDs start with "tbl". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids) | +| empty filter | retrieves all *airtables* from a given *base* | + + +## Supported write dispositions +This connector supports the write disposition `replace`, i.e. it does a [full load](https://dlthub.com/docs/general-usage/full-loading) on every invocation. +We do not support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) because a *base* can contain only [up to 50k records in the most expensive plan](https://support.airtable.com/docs/airtable-plans). + +If resource consumption for data loading becomes a concern in practice [request](https://github.com/dlt-hub/verified-sources/issues/new/choose) the `append` loading method. + + +## Initialize the pipeline + +```bash +dlt init airtable duckdb +``` + +Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or +any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/). + + +## Add credentials + +1. [Obtain a Personal Access Token](https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens). + If you're on an enterprise plan you can create an [Airtable Service Account](https://dlthub.com/docs/dlt-ecosystem/verified-sources/chess). + Place the key into your `secrets.toml` like this: +``` +[sources.airtable] +access_token = "pat***" +``` +2. Follow the instructions in the [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/) + document to add credentials for your chosen destination. + + +## Run the pipeline + +1. Install the necessary dependencies by running the following command: + + ```bash + pip install -r requirements.txt + ``` + +2. Now the pipeline can be run by using the command: + + ```bash + python airtable_pipeline.py + ``` + +3. To make sure that everything is loaded as expected, use the command: + + ```bash + dlt pipeline airtable_pipeline show + ``` + +💡 To explore additional customizations for this pipeline, we recommend referring to the example pipelines in `airtable_pipeline.py`. \ No newline at end of file diff --git a/sources/airtable/__init__.py b/sources/airtable/__init__.py new file mode 100644 index 000000000..946aae98c --- /dev/null +++ b/sources/airtable/__init__.py @@ -0,0 +1,68 @@ +"""Source that loads tables form Airtable. +Supports whitelisting of tables or loading of all tables from a specified base. +Supported write dispositions: replace +""" +from typing import Optional, Iterable, Iterator, List, Dict, Any + +import dlt +from dlt.extract.source import DltResource +from dlt.common.typing import TDataItem + +import pyairtable + + +@dlt.source +def airtable_source( + base_id: str, + table_ids: Optional[List[str]] = None, + table_names: Optional[List[str]] = None, + access_token: str = dlt.secrets.value, +) -> Iterable[DltResource]: + """ + Represents tables for a single Airtable base. + Args: + base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids + table_ids (Optional[List[str]]): A list of table ids to load. By default, all tables in the schema are loaded. Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids + table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. Table names can change and thus filtering on names is less reliable than on ids. + access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions + """ + api = pyairtable.Api(access_token) + all_tables_url = api.build_url(f"meta/bases/{base_id}/tables") + tables = api.request(method="GET", url=all_tables_url).get("tables") + for t in tables: + if table_ids: + if t.get("id") in table_ids: + yield airtable_resource(access_token, base_id, t) + elif table_names: + if t.get("name") in table_names: + yield airtable_resource(access_token, base_id, t) + else: + yield airtable_resource(access_token, base_id, t) + + +def airtable_resource( + access_token: str, + base_id: str, + table: Dict[str, Any], +) -> DltResource: + """ + Represents a single airtable. + Args: + base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids + access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions + table (TDataItem): The iterable created by pyairtable representing the data of an airtable + """ + primary_key_id = table["primaryFieldId"] + primary_key_field = [ + field for field in table["fields"] if field["id"] == primary_key_id + ][0] + table_name: str = table["name"] + primary_key: List[str] = [primary_key_field["name"]] + air_table = pyairtable.Table(access_token, base_id, table["id"]) + air_table_generator: Iterator[List[Any]] = air_table.iterate() + return dlt.resource( + air_table_generator, + name=table_name, + primary_key=primary_key, + write_disposition="replace", # using a typed parameter crashes the typing + ) diff --git a/sources/airtable/requirements.txt b/sources/airtable/requirements.txt new file mode 100644 index 000000000..2c5209aab --- /dev/null +++ b/sources/airtable/requirements.txt @@ -0,0 +1,2 @@ +pyairtable~=2.0 +dlt>=0.3.8 \ No newline at end of file diff --git a/sources/airtable_pipeline.py b/sources/airtable_pipeline.py new file mode 100644 index 000000000..f6c45bd87 --- /dev/null +++ b/sources/airtable_pipeline.py @@ -0,0 +1,55 @@ +import dlt +from airtable import airtable_source + + +def load_entire_base(pipeline: dlt.Pipeline) -> None: + # Loads all tables inside a given base. + # Find the base ID starting with "app". See https://support.airtable.com/docs/finding-airtable-ids + all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62") + load_info = pipeline.run(all_event_planning_tables, write_disposition="replace") + print(load_info) + + +def load_select_tables_from_base_by_id(pipeline: dlt.Pipeline) -> None: + # Loads specific table IDs. + # Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids + # See example: https://airtable.com/appctwIznRf5lqe62/tblPjXnwd3V2RWgJS/ + airtables = airtable_source( + base_id="appctwIznRf5lqe62", + table_ids=["tblPjXnwd3V2RWgJS", "tbltdCacZQPxI7fV0"], + ) + load_info = pipeline.run(airtables, write_disposition="replace") + print(load_info) + + +def load_select_tables_from_base_by_name(pipeline: dlt.Pipeline) -> None: + # Loads specific table names. + # Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users. + # See example: https://airtable.com/appctwIznRf5lqe62/tblOe4fjtZfnvqAHd/ + budget_table = airtable_source( + base_id="appctwIznRf5lqe62", + table_names=["💰 Budget"], + ) + load_info = pipeline.run(budget_table, write_disposition="replace") + print(load_info) + + +def load_table_for_ci(pipeline: dlt.Pipeline) -> None: + # Setup for CI of dlt hub + questionnaire_table = airtable_source( + base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"] + ) + load_info = pipeline.run(questionnaire_table, write_disposition="replace") + print(load_info) + + +if __name__ == "__main__": + # configure the pipeline with your destination details + pipeline = dlt.pipeline( + pipeline_name="airtable", destination="duckdb", dataset_name="airtable_data" + ) + + load_table_for_ci(pipeline) + # load_select_tables_from_base_by_id(pipeline) + # load_select_tables_from_base_by_name(pipeline) + # load_entire_base(pipeline) diff --git a/tests/airtable/__init__.py b/tests/airtable/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/airtable/test_airtable_source.py b/tests/airtable/test_airtable_source.py new file mode 100644 index 000000000..1f594517e --- /dev/null +++ b/tests/airtable/test_airtable_source.py @@ -0,0 +1,86 @@ +from tests.utils import ( + ALL_DESTINATIONS, + assert_load_info, + load_table_counts, + assert_query_data, +) +import pytest +import dlt +from dlt.common.utils import uniq_id +from sources.airtable import airtable_source + + +def make_pipeline(destination_name: str) -> dlt.Pipeline: + return dlt.pipeline( + pipeline_name="airtable_test", + destination=destination_name, + dataset_name="airtable_test_data" + uniq_id(), + ) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_load_table_by_id(destination_name: str) -> None: + pipeline = make_pipeline(destination_name) + questionnaire_table = airtable_source( + base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"] + ) + run_single_table_assertions(pipeline, questionnaire_table) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_load_table_by_name(destination_name: str) -> None: + pipeline = make_pipeline(destination_name) + questionnaire_table = airtable_source( + base_id="appcChDyP0pZeC76v", table_names=["Sheet1"] + ) + run_single_table_assertions(pipeline, questionnaire_table) + + +def run_single_table_assertions(pipeline, questionnaire_table): + row_count: int = 22 + airtable_name: str = "Sheet1" + + load_info = pipeline.run(questionnaire_table, write_disposition="replace") + + assert_load_info(load_info) + + loaded_table_names = [t["name"] for t in pipeline.default_schema.data_tables()] + assert loaded_table_names == [airtable_name.lower()] + + counts_of_rows = load_table_counts(pipeline, *loaded_table_names) + assert counts_of_rows[airtable_name.lower()] == row_count + + query_string = ( + "select fields__question_1 from sheet1 where fields__name = 'Tina Quinn'" + ) + expected_table_data = ["Maybe"] + assert_query_data(pipeline, query_string, expected_table_data) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_load_all_tables_in_base(destination_name: str) -> None: + all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62") + pipeline = make_pipeline(destination_name) + load_info = pipeline.run(all_event_planning_tables, write_disposition="replace") + assert_load_info(load_info) + + loaded_table_names = [t["name"] for t in pipeline.default_schema.data_tables()] + expected_table_names = [ + "_attendees", + "_budget", + "_schedule", + "_speakers", + "_schedule__fields__speaker_sx", + "_speakers__fields__speaking_at", + ] + assert set(loaded_table_names) == set(expected_table_names) + + counts_of_rows = load_table_counts(pipeline, *loaded_table_names) + assert counts_of_rows == { + "_attendees": 11, + "_budget": 7, + "_schedule": 19, + "_speakers": 9, + "_speakers__fields__speaking_at": 16, + "_schedule__fields__speaker_sx": 16, + }