Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
feat: Handle query execution manually for biguery (#428)
Browse files Browse the repository at this point in the history
* Handle SQL execution manually for biguery

* adapt tests
  • Loading branch information
chamini2 authored Jun 30, 2022
1 parent 611e6ad commit 39bdf19
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 16 deletions.
2 changes: 1 addition & 1 deletion integration_tests/features/execute_sql_function.feature
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ Feature: `execute_sql` function
And the following scripts are ran:
| execute_sql_model_one.run_macro.py |
And the script execute_sql_model_one.run_macro.py output file has the lines:
| Model dataframe first row: | my_int_times_ten 10.0 |
| Model dataframe first row: | my_int_times_ten 10 |
8 changes: 4 additions & 4 deletions integration_tests/features/write_to_model_function.feature
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ Feature: `write_to_model` function
And the following scripts are ran:
| some_model.write_to_source_twice.py | other_model.complete_model.py | third_model.complete_model.py |
And the script other_model.complete_model.py output file has the lines:
| my_int None | my_int 3.0 | size 1 |
| my_int 0.0 | my_int 3.0 | size 1 |
And the script third_model.complete_model.py output file has the lines:
| my_int None | my_int 3.0 | size 1 |
| my_int 0.0 | my_int 3.0 | size 1 |
And the script some_model.write_to_source_twice.py output file has the lines:
| my_float 1.2 |

Expand All @@ -29,7 +29,7 @@ Feature: `write_to_model` function
And the following scripts are ran:
| other_model.complete_model.py |
And the script other_model.complete_model.py output file has the lines:
| my_int None | my_int 3.0 | size 1 |
| my_int 0.0 | my_int 3.0 | size 1 |

@TODO-postgres
@TODO-snowflake
Expand Down Expand Up @@ -57,4 +57,4 @@ Feature: `write_to_model` function
Then the following models are calculated:
| model_with_array.py |
And the script model_with_array.py output file has the lines:
| my_array: ["some", "other"] | other_array: [1, 2, 3] |
| my_array: ['some', 'other'] | other_array: [1, 2, 3] |
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

output = ""

df: pd.DataFrame = ref(model_name)
df: pd.DataFrame = ref(model_name).fillna(0)
df.columns = df.columns.str.lower() # Snowflake has uppercase columns
df = df.astype({"my_int": float})
output += f"my_int {df.my_int[0]}\n"

df.my_int = 3
Expand All @@ -19,8 +20,9 @@

write_to_model(df) # default: overwrite

df: pd.DataFrame = ref(model_name)
df: pd.DataFrame = ref(model_name).fillna(0)
df.columns = df.columns.str.lower() # Snowflake has uppercase columns
df = df.astype({"my_int": float})
output += f"my_int {df.my_int[0]}\n"
output += f"size {len(df)}\n"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

model_name = context.current_model.name
df = ref(model_name)
output = f"my_array: {df['my_array'][0]}"
output += f"\nother_array: {df['other_array'][0]}"
print(df)
df.info()
output = f"my_array: {list(df['my_array'][0])}"
output += f"\nother_array: {list(df['other_array'][0])}"
temp_dir = os.environ["temp_dir"]
with open(os.path.join(temp_dir, model_name + ".txt"), "w") as file:
file.write(output)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"""

df: pd.DataFrame = execute_sql(query)
df.columns = df.columns.str.lower() # Snowflake has uppercase columns
# Cast since some get float
df = df.astype({"my_int_times_ten": int})

output = f"\nModel dataframe first row:\n{df.iloc[0]}"
temp_dir = os.environ["temp_dir"]
Expand Down
43 changes: 36 additions & 7 deletions src/faldbt/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@
from sqlalchemy.sql import Insert


class WriteModeEnum(Enum):
APPEND = "append"
OVERWRITE = "overwrite"


DBT_V1 = dbt.semver.VersionSpecifier.from_version_string("1.0.0")
DBT_VCURRENT = dbt.version.get_installed_version()

IS_DBT_V1PLUS = DBT_VCURRENT.compare(DBT_V1) >= 0
IS_DBT_V0 = not IS_DBT_V1PLUS

if IS_DBT_V0:
from faldbt.cp.contracts.sql import ResultTable, RemoteRunResult
from faldbt.cp.contracts.sql import RemoteRunResult
else:
from dbt.contracts.sql import ResultTable, RemoteRunResult
from dbt.contracts.sql import RemoteRunResult


class WriteModeEnum(Enum):
APPEND = "append"
OVERWRITE = "overwrite"


@dataclass
Expand Down Expand Up @@ -114,6 +114,9 @@ def _execute_sql(
if adapter is None:
adapter = _get_adapter(project_dir, profiles_dir, profile_target, config=config)

if adapter.type() == "bigquery":
return _bigquery_execute_sql(adapter, sql, open_conn)

# HACK: we need to include uniqueness (UUID4) to avoid clashes
name = "SQL:" + str(hash(sql)) + ":" + str(uuid4())
with _existing_or_new_connection(adapter, name, open_conn) as is_new:
Expand Down Expand Up @@ -558,6 +561,32 @@ def _existing_or_new_connection(


# Adapter: BigQuery
def _bigquery_execute_sql(
adapter: BaseAdapter, sql: str, open_conn: bool
) -> Tuple[AdapterResponse, pd.DataFrame]:
assert adapter.type() == "bigquery"

import google.cloud.bigquery as bigquery

# HACK: we need to include uniqueness (UUID4) to avoid clashes
name = "bigquery:execute_sql:" + str(hash(sql)) + ":" + str(uuid4())
with _existing_or_new_connection(adapter, name, open_conn):
conection_manager: BaseConnectionManager = adapter.connections # type: ignore
conn = conection_manager.get_thread_connection()
client: bigquery.Client = conn.handle # type: ignore

job = client.query(sql)
df = job.to_dataframe()
if job.destination:
query_table = client.get_table(job.destination)
num_rows = query_table.num_rows
else:
num_rows = df.size

# TODO: better AdapterResponse
return AdapterResponse("OK", rows_affected=num_rows), df


def _bigquery_write_relation(
data: pd.DataFrame,
project_dir: str,
Expand Down

0 comments on commit 39bdf19

Please sign in to comment.