Skip to content
This repository has been archived by the owner on May 31, 2023. It is now read-only.

Commit

Permalink
Support large DBs by requesting only tables defined in manifest (Tomm…
Browse files Browse the repository at this point in the history
…e#86)

Co-authored-by: Justas Cernas <[email protected]>
  • Loading branch information
JustasCe and Justas Cernas authored Jun 1, 2022
1 parent 192136a commit 0546ca9
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
44 changes: 41 additions & 3 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from uuid import uuid4
import agate
import re
import boto3
from botocore.exceptions import ClientError
from typing import Optional
from itertools import chain
from threading import Lock
from typing import Dict, Iterator, Optional, Set
from uuid import uuid4

from dbt.adapters.base import available
from dbt.adapters.base.impl import GET_CATALOG_MACRO_NAME
from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.relation import AthenaRelation
from dbt.adapters.athena.relation import AthenaRelation, AthenaSchemaSearchMap
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.events import AdapterLogger
logger = AdapterLogger("Athena")

Expand Down Expand Up @@ -109,3 +114,36 @@ def quote_seed_column(
self, column: str, quote_config: Optional[bool]
) -> str:
return super().quote_seed_column(column, False)

def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Dict[str, Optional[Set[str]]],
manifest: Manifest,
) -> agate.Table:

kwargs = {"information_schema": information_schema, "schemas": schemas}
table = self.execute_macro(
GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest so we get any local project
# overrides
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest)
return results


def _get_catalog_schemas(self, manifest: Manifest) -> AthenaSchemaSearchMap:
info_schema_name_map = AthenaSchemaSearchMap()
nodes: Iterator[CompileResultNode] = chain(
[node for node in manifest.nodes.values() if (
node.is_relational and not node.is_ephemeral_model
)],
manifest.sources.values(),
)
for node in nodes:
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
return info_schema_name_map
20 changes: 19 additions & 1 deletion dbt/adapters/athena/relation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from typing import Dict, Optional, Set

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.adapters.base.relation import BaseRelation, InformationSchema, Policy


@dataclass
Expand All @@ -14,3 +15,20 @@ class AthenaIncludePolicy(Policy):
class AthenaRelation(BaseRelation):
quote_character: str = ""
include_policy: Policy = AthenaIncludePolicy()

class AthenaSchemaSearchMap(Dict[InformationSchema, Dict[str, Set[Optional[str]]]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas and relations. The schema and relation values are all
lowercased to avoid duplication.
"""
def add(self, relation: AthenaRelation):
key = relation.information_schema_only()
if key not in self:
self[key] = {}
schema: Optional[str] = None
if relation.schema is not None:
schema = relation.schema.lower()
relation_name = relation.name.lower()
if schema not in self[key]:
self[key][schema] = set()
self[key][schema].add(relation_name)
18 changes: 15 additions & 3 deletions dbt/include/athena/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,21 @@

)

{%- for schema in schemas -%}
select * from catalog where lower("table_schema") = lower('{{ schema }}')
{%- if not loop.last %} union all {% endif -%}
{%- for schema, relations in schemas.items() -%}
{%- for relation_batch in relations|batch(100) %}
select * from catalog
where "table_schema" = lower('{{ schema }}')
and (
{%- for relation in relation_batch -%}
"table_name" = lower('{{ relation }}')
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)

{%- if not loop.last %} union all {% endif -%}
{%- endfor -%}

{%- if not loop.last %} union all {% endif -%}
{%- endfor -%}
)
)
Expand Down

0 comments on commit 0546ca9

Please sign in to comment.