Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dremio #367

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
- [ ] Snowflake
- [ ] Google BigQuery
- [ ] Databricks
- [ ] Spark
- [ ] Dremio
- [ ] N/A
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ env.sh
.venv

.DS_Store

integration_test_project/package-lock.yml
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The package currently supports
- Spark :white_check_mark:
- Snowflake :white_check_mark:
- Google BigQuery :white_check_mark:
- Dremio :white_check_mark:
- Postgres :white_check_mark:

Models included:
Expand Down
3 changes: 2 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ models:
+full_refresh: false
+persist_docs:
# Databricks doesn't offer column-level support for persisting docs
columns: '{{ target.name != "databricks" }}'
columns: '{{ target.name not in ["databricks", "dremio"] }}'
relation: '{{ target.name not in ["dremio"] }}'
4 changes: 2 additions & 2 deletions integration_test_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ vars:

models:
+persist_docs:
relation: true
columns: true
columns: '{{ target.type != "dremio" }}'
relation: '{{ target.type != "dremio" }}'
seeds:
+quote_columns: false

Expand Down
4 changes: 4 additions & 0 deletions integration_test_project/example-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ export DBT_ENV_SECRET_DATABRICKS_TOKEN=
export DBT_ENV_SECRET_GCP_PROJECT=
export DBT_ENV_SPARK_DRIVER_PATH= # /Library/simba/spark/lib/libsparkodbc_sbu.dylib on a Mac
export DBT_ENV_SPARK_ENDPOINT= # The endpoint ID from the Databricks HTTP path
export DBT_ENV_DREMIO_USER=
export DBT_ENV_DREMIO_PASSWORD=
export DBT_ENV_DREMIO_SOFTWARE_HOST=
export DBT_ENV_OBJECT_STORAGE_SOURCE=

# dbt environment variables, change these
export DBT_VERSION="1_5_0"
Expand Down
19 changes: 15 additions & 4 deletions integration_test_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ dbt_artifacts:
role: "{{ env_var('DBT_ENV_SECRET_SNOWFLAKE_TEST_ROLE') }}"
database: "{{ env_var('DBT_ENV_SECRET_SNOWFLAKE_TEST_DATABASE') }}"
warehouse: "{{ env_var('DBT_ENV_SECRET_SNOWFLAKE_TEST_WAREHOUSE') }}"
schema: dbt_artifacts_test_commit_{{ env_var('DBT_VERSION', '') }}_{{ env_var('GITHUB_SHA_OVERRIDE', '') if env_var('GITHUB_SHA_OVERRIDE', '') else env_var('GITHUB_SHA') }}
schema: &schema dbt_artifacts_test_commit_{{ env_var('DBT_VERSION', '') }}_{{ env_var('GITHUB_SHA_OVERRIDE', '') if env_var('GITHUB_SHA_OVERRIDE', '') else env_var('GITHUB_SHA') }}
threads: 8
databricks:
type: databricks
schema: dbt_artifacts_test_commit_{{ env_var('DBT_VERSION', '') }}_{{ env_var('GITHUB_SHA_OVERRIDE', '') if env_var('GITHUB_SHA_OVERRIDE', '') else env_var('GITHUB_SHA') }}
schema: *schema
host: "{{ env_var('DBT_ENV_SECRET_DATABRICKS_HOST') }}"
http_path: "{{ env_var('DBT_ENV_SECRET_DATABRICKS_HTTP_PATH') }}"
token: "{{ env_var('DBT_ENV_SECRET_DATABRICKS_TOKEN') }}"
threads: 8
spark:
type: spark
method: odbc
schema: dbt_artifacts_test_commit_spark_{{ env_var('DBT_VERSION', '') }}_{{ env_var('GITHUB_SHA_OVERRIDE', '') if env_var('GITHUB_SHA_OVERRIDE', '') else env_var('GITHUB_SHA') }}
schema: *schema
host: "{{ env_var('DBT_ENV_SECRET_DATABRICKS_HOST') }}"
driver: "{{ env_var('DBT_ENV_SPARK_DRIVER_PATH') }}"
endpoint: "{{ env_var('DBT_ENV_SPARK_ENDPOINT') }}"
Expand All @@ -38,7 +38,7 @@ dbt_artifacts:
type: bigquery
method: oauth
project: "{{ env_var('DBT_ENV_SECRET_GCP_PROJECT') }}"
dataset: dbt_artifacts_test_commit_{{ env_var('DBT_VERSION', '') }}_{{ env_var('GITHUB_SHA_OVERRIDE', '') if env_var('GITHUB_SHA_OVERRIDE', '') else env_var('GITHUB_SHA') }}
dataset: *schema
threads: 8
timeout_seconds: 300
priority: interactive
Expand All @@ -52,3 +52,14 @@ dbt_artifacts:
dbname: postgres
schema: public
threads: 8
dremio:
type: dremio
user: "{{ env_var('DBT_ENV_DREMIO_USER') }}"
password: "{{ env_var('DBT_ENV_DREMIO_PASSWORD') }}"
software_host: "{{ env_var('DBT_ENV_DREMIO_SOFTWARE_HOST') }}"
port: 443
threads: 8
use_ssl: true
dremio_space: *schema
object_storage_path: "{{ env_var('DBT_ENV_OBJECT_STORAGE_PATH') }}"
object_storage_source: "{{ env_var('DBT_ENV_OBJECT_STORAGE_SOURCE') }}"
15 changes: 14 additions & 1 deletion macros/database_specific_helpers/generate_surrogate_key.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@

{%- endfor -%}

{{ dbt.hash(dbt.concat(fields)) }}
{{ hash(dbt.concat(fields)) }}

{%- endmacro -%}

{%- macro hash(field) -%}
{{ adapter.dispatch('hash', 'dbt_artifacts')(field) }}
{%- endmacro -%}

{%- macro default__hash(field) -%}
{{ dbt.hash(field) }}
{%- endmacro -%}

-- FIXME: See https://github.com/dremio/dbt-dremio/issues/189
{%- macro dremio__hash(field) -%}
md5(cast({{ field }} as varchar))
{%- endmacro -%}
8 changes: 8 additions & 0 deletions macros/database_specific_helpers/type_helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
json
{% endmacro %}

{%- macro dremio__type_json() -%}
varchar
{%- endmacro -%}

{#- ARRAY -#}

{% macro type_array() %}
Expand All @@ -43,3 +47,7 @@
{% macro bigquery__type_array() %}
array<string>
{% endmacro %}

{%- macro dremio__type_array() -%}
varchar
{%- endmacro -%}
11 changes: 11 additions & 0 deletions macros/string.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{%- macro escape_string(field) -%}
{{ return(adapter.dispatch('escape_string', 'dbt_artifacts')(field)) }}
{%- endmacro -%}

{%- macro default__escape_string(field) -%}
{{ field | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}
{%- endmacro -%}

{%- macro dremio__escape_string(field) -%}
{{ field | replace("'", "''") }}
{%- endmacro -%}
16 changes: 16 additions & 0 deletions macros/timestamp.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{%- macro cast_as_timestamp(field) -%}
{{ return(adapter.dispatch('cast_as_timestamp', 'dbt_artifacts')(field)) }}
{%- endmacro -%}

{%- macro default__cast_as_timestamp(field) -%}
cast({{ field }} as timestamp)
{%- endmacro -%}

{%- macro dremio__cast_as_timestamp(field) -%}
cast({{ dbt_artifacts.truncate_timestamp(field) }} as timestamp)
{%- endmacro -%}

{%- macro truncate_timestamp(field) -%}
{%- set pattern = '^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(\.\d{0,3})?)\d*(\+\d{2}:\d{2}|\-\d{2}:\d{2}|Z|[A-Z]{3}|)$' -%}
concat(regexp_extract('{{ field }}', '{{ pattern }}', 1), regexp_extract('{{ field }}', '{{ pattern }}', 3))
{%- endmacro -%}
34 changes: 34 additions & 0 deletions macros/upload_individual_datasets/upload_exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,40 @@
{% endif %}
{%- endmacro %}

{% macro dremio__get_exposures_dml_sql(exposures) -%}

{% if exposures != [] %}
{% set exposure_values %}
{% for exposure in exposures -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ dbt_artifacts.escape_string(exposure.unique_id) }}', {# node_id #}
{{ dbt_artifacts.cast_as_timestamp(run_started_at) }}, {# run_started_at #}
'{{ dbt_artifacts.escape_string(exposure.name) }}', {# name #}
'{{ exposure.type }}', {# type #}
'{{ tojson(exposure.owner) }}', {# owner #}
'{{ exposure.maturity }}', {# maturity #}
'{{ dbt_artifacts.escape_string(exposure.original_file_path) }}', {# path #}
'{{ dbt_artifacts.escape_string(exposure.description) }}', {# description #}
'{{ exposure.url }}', {# url #}
'{{ exposure.package_name }}', {# package_name #}
'{{ tojson(exposure.depends_on.nodes) }}', {# depends_on_nodes #}
'{{ tojson(exposure.tags) }}', {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ dbt_artifacts.escape_string(tojson(exposure)) }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ exposure_values }}
{% else %}
{{ return("") }}
{% endif %}
{% endmacro -%}

{% macro postgres__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}

Expand Down
54 changes: 54 additions & 0 deletions macros/upload_individual_datasets/upload_invocations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,60 @@

{% endmacro -%}

{% macro dremio__get_invocations_dml_sql() -%}
{% set invocation_values %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ dbt_version }}', {# dbt_version #}
'{{ project_name }}', {# project_name #}
{{ dbt_artifacts.cast_as_timestamp(run_started_at) }}, {# run_started_at #}
'{{ flags.WHICH }}', {# dbt_command #}
'{{ flags.FULL_REFRESH }}', {# full_refresh_flag #}
'{{ target.profile_name }}', {# target_profile_name #}
'{{ target.name }}', {# target_name #}
'{{ target.schema }}', {# target_schema #}
{{ target.threads }}, {# target_threads #}

nullif('{{ env_var('DBT_CLOUD_PROJECT_ID', '') }}', ''), {# dbt_cloud_project_id #}
nullif('{{ env_var('DBT_CLOUD_JOB_ID', '') }}', ''), {# dbt_cloud_job_id #}
nullif('{{ env_var('DBT_CLOUD_RUN_ID', '') }}', ''), {# dbt_cloud_run_id #}
nullif('{{ env_var('DBT_CLOUD_RUN_REASON_CATEGORY', '') }}', ''), {# dbt_cloud_run_reason_category #}
nullif('{{ dbt_artifacts.escape_string(env_var('DBT_CLOUD_RUN_REASON', '')) }}', ''), {# dbt_cloud_run_reason #}

{% if var('env_vars', none) %}
{% set env_vars_dict = {} %}
{% for env_variable in var('env_vars') %}
{% do env_vars_dict.update({env_variable: (env_var(env_variable, '') | replace("'", "''"))}) %}
{% endfor %}
'{{ tojson(env_vars_dict) }}', {# env_vars #}
{% else %}
null, {# env_vars #}
{% endif %}

{% if var('dbt_vars', none) %}
{% set dbt_vars_dict = {} %}
{% for dbt_var in var('dbt_vars') %}
{% do dbt_vars_dict.update({dbt_var: (dbt_artifacts.escape_string(var(dbt_var, '')))}) %}
{% endfor %}
'{{ tojson(dbt_vars_dict) }}', {# dbt_vars #}
{% else %}
null, {# dbt_vars #}
{% endif %}

'{{ dbt_artifacts.escape_string(tojson(invocation_args_dict)) }}', {# invocation_args #}

{% set metadata_env = {} %}
{% for key, value in dbt_metadata_envs.items() %}
{% do metadata_env.update({key: (dbt_artifacts.escape_string(value))}) %}
{% endfor %}
'{{ dbt_artifacts.escape_string(tojson(metadata_env)) }}' {# dbt_custom_envs #}

)
{% endset %}
{{ invocation_values }}

{% endmacro -%}

{% macro postgres__get_invocations_dml_sql() -%}
{% set invocation_values %}
(
Expand Down
93 changes: 67 additions & 26 deletions macros/upload_individual_datasets/upload_model_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -104,32 +104,14 @@
{% endif %}
{%- endmacro %}

{% macro snowflake__get_model_executions_dml_sql(models) -%}
{% macro dremio__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
select
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(13) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(15) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(16)) }}
from values
{% for model in models -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
{{ dbt_artifacts.cast_as_timestamp(run_started_at) }}, {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
Expand All @@ -141,18 +123,18 @@
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% if compile_started_at %}{{ dbt_artifacts.cast_as_timestamp(compile_started_at) }}{% else %}cast(null as timestamp){% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}
{% if query_completed_at %}{{ dbt_artifacts.cast_as_timestamp(query_completed_at) }}{% else %}cast(null as timestamp){% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}
try_cast('{{ model.adapter_response.rows_affected }}' as int), {# rows_affected #}
cast({{ model.execution_time }} as float), {# total_node_runtime #}
null, -- rows_affected not available {# Only available in Snowflake & BigQuery #}
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #}
'{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #}
'{{ dbt_artifacts.escape_string(model.message) }}', {# message #}
'{{ dbt_artifacts.escape_string(tojson(model.adapter_response)) }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
Expand Down Expand Up @@ -203,3 +185,62 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro snowflake__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
select
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(13) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(15) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(16)) }}
from values
{% for model in models -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
'{{ config_full_refresh }}', {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}
try_cast('{{ model.adapter_response.rows_affected }}' as int), {# rows_affected #}
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #}
'{{ tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{% endmacro -%}
Loading
Loading