From ea6a41fd23edde24fbb658a57a80421f66c66bbe Mon Sep 17 00:00:00 2001 From: Willi Date: Tue, 29 Aug 2023 19:28:33 +0530 Subject: [PATCH] changes airtable IDs to dlthub's CI account, adds schema hints to silence warnings and improve documentation refactoring according to latest recommendations of pyairtable simplify airtable filters by following the paradigm of the pyairtable API which treats table names and table IDs equally updates pyairtable dependency --- poetry.lock | 41 +++++++++++++++--- pyproject.toml | 5 ++- sources/airtable/README.md | 40 ++++++++++++++---- sources/airtable/__init__.py | 28 ++++++------- sources/airtable/requirements.txt | 2 +- sources/airtable_pipeline.py | 57 ++++++++++++++++++-------- tests/airtable/test_airtable_source.py | 4 +- 7 files changed, 126 insertions(+), 51 deletions(-) diff --git a/poetry.lock b/poetry.lock index cd7b4e14a..0f95906ee 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -1334,11 +1334,11 @@ files = [ google-auth = ">=2.14.1,<3.0.dev0" googleapis-common-protos = ">=1.56.2,<2.0.dev0" grpcio = [ - {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.33.2,<2.0dev", optional = true, markers = "extra == \"grpc\""}, {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] grpcio-status = [ - {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "extra == \"grpc\""}, {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0.dev0" @@ -1932,6 +1932,17 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker perf = ["ipython"] testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"] +[[package]] +name = "inflection" +version = "0.5.1" +description = "A port of Ruby on Rails inflector to Python" +optional = false +python-versions = ">=3.5" +files = [ + {file = "inflection-0.5.1-py2.py3-none-any.whl", hash = "sha256:f38b2b640938a4f35ade69ac3d053042959b62a0f1076a5bbaa1b9526605a8a2"}, + {file = "inflection-0.5.1.tar.gz", hash = "sha256:1a29730d366e996aaacffb2f1f1cb9593dc38e2ddd30c91250c6dde09ea9b417"}, +] + [[package]] name = "iniconfig" version = "2.0.0" @@ -2876,8 +2887,8 @@ files = [ [package.dependencies] numpy = [ {version = ">=1.20.3", markers = "python_version < \"3.10\""}, + {version = ">=1.21.0", markers = "python_version >= \"3.10\""}, {version = ">=1.23.2", markers = "python_version >= \"3.11\""}, - {version = ">=1.21.0", markers = "python_version >= \"3.10\" and python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -3328,6 +3339,24 @@ all = ["apache-bookkeeper-client (>=4.16.1)", "fastavro (==1.7.3)", "grpcio (>=1 avro = ["fastavro (==1.7.3)"] functions = ["apache-bookkeeper-client (>=4.16.1)", "grpcio (>=1.8.2)", "prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"] +[[package]] +name = "pyairtable" +version = "2.1.0.post1" +description = "Python Client for the Airtable API" +optional = false +python-versions = "*" +files = [ + {file = "pyairtable-2.1.0.post1-py2.py3-none-any.whl", hash = "sha256:a80eb85f7c020bf41679bb00ca57da11aeaa43769afbc73619276798a2ca182e"}, + {file = "pyairtable-2.1.0.post1.tar.gz", hash = "sha256:e588249e68cf338dcdca9908537ed16d5a22ae72345ec930022b230ba96e5f84"}, +] + +[package.dependencies] +inflection = "*" +pydantic = "*" +requests = ">=2.22.0" +typing-extensions = "*" +urllib3 = ">=1.26" + [[package]] name = "pyarrow" version = "12.0.1" @@ -4284,7 +4313,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""} +greenlet = {version = "!=0.4.17", markers = "platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\""} typing-extensions = ">=4.2.0" [package.extras] @@ -5207,4 +5236,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "0e89e2b8d1aee8797c4675deca30610061e0aa4892184e163d23b19d978bffb8" +content-hash = "19e2b9ea4846700a37b97b4192137dfc566cea522f254f6a2a5e980b01877bfc" diff --git a/pyproject.toml b/pyproject.toml index cca9c9013..976b80bcc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,9 +70,12 @@ tiktoken = "^0.4.0" [tool.poetry.group.mongodb.dependencies] pymongo = "^4.3.3" +[tool.poetry.group.airtable.dependencies] +pyairtable = "^2.1.0.post1" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.black] -include = '.*py$' +include = '.*py$' \ No newline at end of file diff --git a/sources/airtable/README.md b/sources/airtable/README.md index d88b433e7..f87797f1b 100644 --- a/sources/airtable/README.md +++ b/sources/airtable/README.md @@ -18,18 +18,40 @@ This [dlt source](https://dlthub.com/docs/general-usage/source) creates a [dlt r 1. identify the *base* you want to load from. The ID starts with "app". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids) 2. identify the *airtables* you want to load. You can identify in three ways: -| filter | Description | -| ---------------- | -----------------------------------------------------------| -| table_names | retrieves *airtables* from a given *base* for a list of user-defined mutable names of tables | -| table_ids | retrieves *airtables* from a given *base* for a list of immutable IDs defined by airtable.com at the creation time of the *airtable*. IDs start with "tbl". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids) | -| empty filter | retrieves all *airtables* from a given *base* | + 1. retrieve *airtables* from a given *base* for a list of user-defined mutable names of tables + 2. retrieve *airtables* from a given *base* for a list of immutable IDs defined by airtable.com at the creation time of the *airtable*. IDs start with "tbl". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids) + 3. empty filter: retrieve all *airtables* from a given *base* ## Supported write dispositions This connector supports the write disposition `replace`, i.e. it does a [full load](https://dlthub.com/docs/general-usage/full-loading) on every invocation. -We do not support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) because a *base* can contain only [up to 50k records in the most expensive plan](https://support.airtable.com/docs/airtable-plans). -If resource consumption for data loading becomes a concern in practice [request](https://github.com/dlt-hub/verified-sources/issues/new/choose) the `append` loading method. +To use support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) there are two possibilities: + +### Parametrize the `pipeline.run` method + +```python + event_base = airtable_source( + base_id="app7RlqvdoOmJm9XR", + table_names=["💰 Budget"], + ) + load_info = pipeline.run(event_base, write_disposition="replace") +``` + +## Customize the resource using the `apply_hints` method + +This approach further allows to [adjust the schema](https://dlthub.com/docs/general-usage/resource#adjust-schema) +```python + event_base = airtable_source( + base_id="app7RlqvdoOmJm9XR", + table_names=["💰 Budget"], + ) + event_base.resources["💰 Budget"].apply_hints( + write_disposition="merge", + columns={"Item": {"name": "Item", "data_type": "text"}}, + ) + load_info = pipeline.run(event_base) +``` ## Initialize the pipeline @@ -38,8 +60,8 @@ If resource consumption for data loading becomes a concern in practice [request] dlt init airtable duckdb ``` -Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or -any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/). +Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or any of the other +[destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/). ## Add credentials diff --git a/sources/airtable/__init__.py b/sources/airtable/__init__.py index 946aae98c..26c1bc8bb 100644 --- a/sources/airtable/__init__.py +++ b/sources/airtable/__init__.py @@ -14,7 +14,6 @@ @dlt.source def airtable_source( base_id: str, - table_ids: Optional[List[str]] = None, table_names: Optional[List[str]] = None, access_token: str = dlt.secrets.value, ) -> Iterable[DltResource]: @@ -22,35 +21,31 @@ def airtable_source( Represents tables for a single Airtable base. Args: base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids - table_ids (Optional[List[str]]): A list of table ids to load. By default, all tables in the schema are loaded. Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids - table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. Table names can change and thus filtering on names is less reliable than on ids. + table_names (Optional[List[str]]): A list of table IDs or table names to load. Unless specified otherwise, all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions """ api = pyairtable.Api(access_token) all_tables_url = api.build_url(f"meta/bases/{base_id}/tables") tables = api.request(method="GET", url=all_tables_url).get("tables") for t in tables: - if table_ids: - if t.get("id") in table_ids: - yield airtable_resource(access_token, base_id, t) - elif table_names: - if t.get("name") in table_names: - yield airtable_resource(access_token, base_id, t) + if table_names: + if t.get("id") in table_names or t.get("name") in table_names: + yield airtable_resource(api, base_id, t) else: - yield airtable_resource(access_token, base_id, t) + yield airtable_resource(api, base_id, t) def airtable_resource( - access_token: str, + api: pyairtable.Api, base_id: str, table: Dict[str, Any], ) -> DltResource: """ Represents a single airtable. Args: + api (pyairtable.Api): The API connection object base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids - access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions - table (TDataItem): The iterable created by pyairtable representing the data of an airtable + table (Dict[str, Any]): Metadata about an airtable, does not contain the actual records """ primary_key_id = table["primaryFieldId"] primary_key_field = [ @@ -58,11 +53,14 @@ def airtable_resource( ][0] table_name: str = table["name"] primary_key: List[str] = [primary_key_field["name"]] - air_table = pyairtable.Table(access_token, base_id, table["id"]) + air_table = api.table(base_id, table["id"]) + + # Table.iterate() supports rich customization options, such as chunk size, fields, cell format, timezone, locale, and view air_table_generator: Iterator[List[Any]] = air_table.iterate() + return dlt.resource( air_table_generator, name=table_name, primary_key=primary_key, - write_disposition="replace", # using a typed parameter crashes the typing + write_disposition="replace", ) diff --git a/sources/airtable/requirements.txt b/sources/airtable/requirements.txt index 2c5209aab..8fb63f024 100644 --- a/sources/airtable/requirements.txt +++ b/sources/airtable/requirements.txt @@ -1,2 +1,2 @@ -pyairtable~=2.0 +pyairtable~=2.1 dlt>=0.3.8 \ No newline at end of file diff --git a/sources/airtable_pipeline.py b/sources/airtable_pipeline.py index f6c45bd87..59301874f 100644 --- a/sources/airtable_pipeline.py +++ b/sources/airtable_pipeline.py @@ -5,7 +5,22 @@ def load_entire_base(pipeline: dlt.Pipeline) -> None: # Loads all tables inside a given base. # Find the base ID starting with "app". See https://support.airtable.com/docs/finding-airtable-ids - all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62") + all_event_planning_tables = airtable_source(base_id="app7RlqvdoOmJm9XR") + + # typing columns to silence warnings + all_event_planning_tables.resources["📆 Schedule"].apply_hints( + columns={"Activity": {"name": "Activity", "data_type": "text"}} + ) + all_event_planning_tables.resources["🎤 Speakers"].apply_hints( + columns={"Name": {"name": "Name", "data_type": "text"}} + ) + all_event_planning_tables.resources["🪑 Attendees"].apply_hints( + columns={"Name": {"name": "Name", "data_type": "text"}} + ) + all_event_planning_tables.resources["💰 Budget"].apply_hints( + columns={"Item": {"name": "Item", "data_type": "text"}} + ) + load_info = pipeline.run(all_event_planning_tables, write_disposition="replace") print(load_info) @@ -13,11 +28,12 @@ def load_entire_base(pipeline: dlt.Pipeline) -> None: def load_select_tables_from_base_by_id(pipeline: dlt.Pipeline) -> None: # Loads specific table IDs. # Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids - # See example: https://airtable.com/appctwIznRf5lqe62/tblPjXnwd3V2RWgJS/ + # See example: https://airtable.com/app7RlqvdoOmJm9XR/tblKHM5s3AujfSbAH airtables = airtable_source( - base_id="appctwIznRf5lqe62", - table_ids=["tblPjXnwd3V2RWgJS", "tbltdCacZQPxI7fV0"], + base_id="app7RlqvdoOmJm9XR", + table_names=["tblKHM5s3AujfSbAH", "tbloBrS8PnoO63aMP"], ) + load_info = pipeline.run(airtables, write_disposition="replace") print(load_info) @@ -25,21 +41,28 @@ def load_select_tables_from_base_by_id(pipeline: dlt.Pipeline) -> None: def load_select_tables_from_base_by_name(pipeline: dlt.Pipeline) -> None: # Loads specific table names. # Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users. - # See example: https://airtable.com/appctwIznRf5lqe62/tblOe4fjtZfnvqAHd/ - budget_table = airtable_source( - base_id="appctwIznRf5lqe62", + # See example: https://airtable.com/app7RlqvdoOmJm9XR/tblJCTXfjwOETmvy2/ + event_base = airtable_source( + base_id="app7RlqvdoOmJm9XR", table_names=["💰 Budget"], ) - load_info = pipeline.run(budget_table, write_disposition="replace") + event_base.resources["💰 Budget"].apply_hints( + primary_key="Item", columns={"Item": {"name": "Item", "data_type": "text"}} + ) + load_info = pipeline.run(event_base, write_disposition="replace") print(load_info) -def load_table_for_ci(pipeline: dlt.Pipeline) -> None: - # Setup for CI of dlt hub - questionnaire_table = airtable_source( - base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"] +def load_and_customize_write_disposition(pipeline: dlt.Pipeline) -> None: + questionnaire = airtable_source( + base_id="appcChDyP0pZeC76v", table_names=["tbl1sN4CpPv8pBll4"] + ) + questionnaire.resources["Sheet1"].apply_hints( + primary_key="Name", + columns={"Name": {"name": "Name", "data_type": "text"}}, + write_disposition="merge", ) - load_info = pipeline.run(questionnaire_table, write_disposition="replace") + load_info = pipeline.run(questionnaire) print(load_info) @@ -49,7 +72,7 @@ def load_table_for_ci(pipeline: dlt.Pipeline) -> None: pipeline_name="airtable", destination="duckdb", dataset_name="airtable_data" ) - load_table_for_ci(pipeline) - # load_select_tables_from_base_by_id(pipeline) - # load_select_tables_from_base_by_name(pipeline) - # load_entire_base(pipeline) + load_entire_base(pipeline) + load_select_tables_from_base_by_id(pipeline) + load_select_tables_from_base_by_name(pipeline) + load_and_customize_write_disposition(pipeline) diff --git a/tests/airtable/test_airtable_source.py b/tests/airtable/test_airtable_source.py index 1f594517e..be573140e 100644 --- a/tests/airtable/test_airtable_source.py +++ b/tests/airtable/test_airtable_source.py @@ -22,7 +22,7 @@ def make_pipeline(destination_name: str) -> dlt.Pipeline: def test_load_table_by_id(destination_name: str) -> None: pipeline = make_pipeline(destination_name) questionnaire_table = airtable_source( - base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"] + base_id="appcChDyP0pZeC76v", table_names=["tbl1sN4CpPv8pBll4"] ) run_single_table_assertions(pipeline, questionnaire_table) @@ -59,7 +59,7 @@ def run_single_table_assertions(pipeline, questionnaire_table): @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) def test_load_all_tables_in_base(destination_name: str) -> None: - all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62") + all_event_planning_tables = airtable_source(base_id="app7RlqvdoOmJm9XR") pipeline = make_pipeline(destination_name) load_info = pipeline.run(all_event_planning_tables, write_disposition="replace") assert_load_info(load_info)