Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alter for partitions #79

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions dbt/adapters/vertica/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class verticaCredentials(Credentials):
# backup_server_node: Optional[str] = None

# additional_info = {
# 'password': str,
# 'password': str,
# 'backup_server_node': list# invalid value to be set in a connection string
# }

Expand Down Expand Up @@ -97,9 +97,9 @@ def open(cls, connection):
'connection_load_balance':credentials.connection_load_balance,
'session_label': f'dbt_{credentials.username}',
'retries': credentials.retries,

'backup_server_node':credentials.backup_server_node,

}

# if credentials.ssl.lower() in {'true', 'yes', 'please'}:
Expand All @@ -119,16 +119,16 @@ def open(cls, connection):
context = ssl.create_default_context()
conn_info['ssl'] = context
logger.debug(f'SSL is on')

def connect():
handle = vertica_python.connect(**conn_info)
logger.debug(f':P Connection work {handle}')
connection.state = 'open'
connection.handle = handle
logger.debug(f':P Connected to database: {credentials.database} at {credentials.host} at {handle}')
return handle




except Exception as exc:
Expand Down Expand Up @@ -184,6 +184,45 @@ def cancel(self, connection):
logger.debug(':P Cancel query')
connection.handle.cancel()

@classmethod
def get_result_from_cursor(cls, cursor: Any) -> agate.Table:
data: List[Any] = []
column_names: List[str] = []

if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()

# check result for every query if there are some queries with ; separator
while cursor.nextset():
check = cursor._message
if isinstance(check, ErrorResponse):
logger.debug(f'Cursor message is: {check}')
self.release()
raise dbt.exceptions.DatabaseException(str(check))

data = cls.process_results(column_names, rows)

return dbt.clients.agate_helper.table_from_data_flat(data, column_names)

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor)
else:
table = dbt.clients.agate_helper.empty_table()
while cursor.nextset():
check = cursor._message
if isinstance(check, vertica_python.vertica.messages.ErrorResponse):
logger.debug(f'Cursor message is: {check}')
self.release()
raise dbt.exceptions.DatabaseException(str(check))
return response, table


@contextmanager
def exception_handler(self, sql):
Expand All @@ -197,4 +236,3 @@ def exception_handler(self, sql):
logger.debug(f':P Error: {exc}')
self.release()
raise dbt.exceptions.RuntimeException(str(exc))

Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,40 @@
{%- set partition_by_string = config.get('partition_by_string', default=none) -%}
{%- set partition_by_group_by_string = config.get('partition_by_group_by_string', default=none) -%}
{%- set partition_by_active_count = config.get('partition_by_active_count', default=none) -%}

create {% if temporary: -%}local temporary{%- endif %} table
{{ relation.include(database=(not temporary), schema=(not temporary)) }}
{% if temporary: -%}on commit preserve rows{%- endif %}
INCLUDE SCHEMA PRIVILEGES as (
{{ sql }}
)

{% if order_by is not none -%}
order by {{ order_by }}
{% endif -%}
{% if not temporary: %}
{% if order_by is not none -%}
order by {{ order_by }}
{% endif -%}

{% if segmented_by_string is not none -%}
segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %}
{% endif %}
{% if segmented_by_string is not none -%}
segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %}
{% endif %}

{% if no_segmentation =='True' or no_segmentation=='true' -%}
UNSEGMENTED ALL NODES
{% endif -%}
{% if no_segmentation =='True' or no_segmentation=='true' -%}
UNSEGMENTED ALL NODES
{% endif -%}

{% if ksafe is not none -%}
ksafe {{ ksafe }}
{% endif -%}

{% if partition_by_string is not none -%}
; alter table {{ relation.include(database=(not temporary), schema=(not temporary)) }} partition BY {{ partition_by_string }}
{% if partition_by_string is not none and partition_by_group_by_string is not none -%}
group by {{ partition_by_group_by_string }}
{% endif %}
{% if partition_by_string is not none and partition_by_active_count is not none %}
SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }}
{% if ksafe is not none -%}
ksafe {{ ksafe }}
{% endif -%}

{% if partition_by_string is not none -%}
; alter table {{ relation.include(database=(not temporary), schema=(not temporary)) }} partition BY {{ partition_by_string }}
{% if partition_by_string is not none and partition_by_group_by_string is not none -%}
group by {{ partition_by_group_by_string }}
{% endif %}
{% if partition_by_string is not none and partition_by_active_count is not none %}
SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }}
{% endif %}
{% endif %}
{% endif %}
{% endif %}
;
{% endmacro %}