Skip to content

Commit

Permalink
Don't generate table names with "." (#110)
Browse files Browse the repository at this point in the history
* Don't generate table names with "."

Since snowflake is the only DB that can handle table names with periods,
we should ensure taht tables we generate don't have periods.

* fix tests

* fix final test
  • Loading branch information
dimberman committed Feb 16, 2022
1 parent 472165a commit 16a50fd
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 7 deletions.
8 changes: 6 additions & 2 deletions src/astro/sql/operators/sql_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,13 @@ def _get_dataframe(self, table: Table):
self.hook = PostgresHook(
postgres_conn_id=table.conn_id, schema=table.database
)
schema = table.schema or get_schema()
query = (
sql.SQL("SELECT * FROM {input_table}")
.format(input_table=sql.Identifier(table.table_name))
sql.SQL("SELECT * FROM {schema}.{input_table}")
.format(
schema=sql.Identifier(schema),
input_table=sql.Identifier(table.table_name),
)
.as_string(self.hook.get_conn())
)
return self.hook.get_pandas_df(query)
Expand Down
4 changes: 3 additions & 1 deletion src/astro/sql/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ def to_table(self, table_name: str, schema: str) -> Table:
def create_table_name(context):
ti: TaskInstance = context["ti"]
dag_run: DagRun = ti.get_dagrun()
table_name = f"{dag_run.dag_id}_{ti.task_id}_{dag_run.id}".replace("-", "_")
table_name = f"{dag_run.dag_id}_{ti.task_id}_{dag_run.id}".replace(
"-", "_"
).replace(".", "__")
if not table_name.isidentifier():
table_name = f'"{table_name}"'
return table_name
24 changes: 20 additions & 4 deletions tests/operators/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def my_df_func(df: pandas.DataFrame):
res = self.create_and_run_task(
my_df_func,
(),
{"df": Table("actor", conn_id="postgres_conn", database="pagila")},
{
"df": Table(
"actor", conn_id="postgres_conn", database="pagila", schema="public"
)
},
)
assert (
XCom.get_one(
Expand All @@ -133,7 +137,11 @@ def my_df_func(df: pandas.DataFrame):

res = self.create_and_run_task(
my_df_func,
(Table("actor", conn_id="postgres_conn", database="pagila"),),
(
Table(
"actor", conn_id="postgres_conn", database="pagila", schema="public"
),
),
{},
)
assert (
Expand All @@ -150,8 +158,16 @@ def my_df_func(actor_df: pandas.DataFrame, film_df: pandas.DataFrame):

res = self.create_and_run_task(
my_df_func,
(Table("actor", conn_id="postgres_conn", database="pagila"),),
{"film_df": Table("film", conn_id="postgres_conn", database="pagila")},
(
Table(
"actor", conn_id="postgres_conn", database="pagila", schema="public"
),
),
{
"film_df": Table(
"film", conn_id="postgres_conn", database="pagila", schema="public"
)
},
)
assert (
XCom.get_one(
Expand Down
5 changes: 5 additions & 0 deletions tests/parsers/postgres_simple_tasks/test_astro.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
conn_id: postgres_conn
database: pagila
---
SELECT * FROM actor
1 change: 1 addition & 0 deletions tests/parsers/postgres_simple_tasks/test_inheritance.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT * FROM {{test_astro}} LIMIT 10
19 changes: 19 additions & 0 deletions tests/parsers/test_sql_directory_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,22 @@ def test_parse_creates_xcom(self):
rendered_tasks = aql.render(dir_path + "/single_task_dag")

test_utils.run_dag(self.dag)

def test_parse_to_dataframe(self):
"""
Runs two tasks with a direct dependency, the DAG will fail if task two can not inherit the table produced by task 1
:return:
"""
import pandas as pd

from astro.dataframe import dataframe as adf

@adf
def dataframe_func(df: pd.DataFrame):
print(df.to_string)

with self.dag:
rendered_tasks = aql.render(dir_path + "/postgres_simple_tasks")
dataframe_func(rendered_tasks["test_inheritance"])

test_utils.run_dag(self.dag)

0 comments on commit 16a50fd

Please sign in to comment.