From dd89a0be85f4eedc2672fc3e5852da6836f9e85d Mon Sep 17 00:00:00 2001 From: Amit Srivastava Date: Wed, 18 Dec 2024 11:27:32 -0800 Subject: [PATCH] [computes] fixed session-type extraction for connectors The problem is that connector based query execution is not able to reuse session to fetch results. The frontend is sending the correct session_id but our session fetching logic got broken when the computes was implemented. we are now looking for the session_type from compute['name'] for computes, connector['name'] for connector and then snippets['type'] for old config file based hive/impala sessions. A related change is to make use of session for get_log and check_status calls if the frontend is sending it. --- apps/beeswax/src/beeswax/common.py | 26 ++++++++++++------- apps/beeswax/src/beeswax/server/dbms.py | 12 ++++----- .../beeswax/server/hive_metastore_server.py | 2 +- .../src/beeswax/server/hive_server2_lib.py | 18 ++++++------- .../src/jobbrowser/apis/query_api.py | 3 ++- .../notebook/src/notebook/connectors/base.py | 6 ++--- .../src/notebook/connectors/hiveserver2.py | 5 ++-- 7 files changed, 39 insertions(+), 33 deletions(-) diff --git a/apps/beeswax/src/beeswax/common.py b/apps/beeswax/src/beeswax/common.py index 7a759b1a8f4..1185e2aad29 100644 --- a/apps/beeswax/src/beeswax/common.py +++ b/apps/beeswax/src/beeswax/common.py @@ -91,15 +91,26 @@ def tokenize_and_convert(item, key=None): return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key)) -def is_compute(cluster): +def find_compute_in_cluster(cluster): if not cluster: - return False + return None connector = cluster.get('connector') compute = cluster.get('compute') - def compute_check(x): + def _compute_check(x): return x and x.get('type') in COMPUTE_TYPES - return compute_check(cluster) or compute_check(connector) or compute_check(compute) + + return ( + cluster if _compute_check(cluster) + else compute if _compute_check(compute) + else connector if _compute_check(connector) else None) + + +def extract_session_type(snippet): + compute = find_compute_in_cluster(snippet) + if compute and compute.get('name'): + return compute['name'] + return snippet.get('type') if snippet else None ''' @@ -119,13 +130,8 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None): connector = cluster.get('connector') compute = cluster.get('compute') - def compute_check(x): - return x and x.get('type') in COMPUTE_TYPES - # Pick the most probable compute object - selected_compute = (cluster if compute_check(cluster) - else compute if compute_check(compute) - else connector if compute_check(connector) else None) + selected_compute = find_compute_in_cluster(cluster) # If found, we will attempt to reload it, first by id then by name if selected_compute: diff --git a/apps/beeswax/src/beeswax/server/dbms.py b/apps/beeswax/src/beeswax/server/dbms.py index 6defed37743..5396a052f69 100644 --- a/apps/beeswax/src/beeswax/server/dbms.py +++ b/apps/beeswax/src/beeswax/server/dbms.py @@ -29,7 +29,7 @@ from kazoo.client import KazooClient from azure.abfs import abfspath -from beeswax.common import apply_natural_sort, is_compute +from beeswax.common import apply_natural_sort, find_compute_in_cluster from beeswax.conf import ( APPLY_NATURAL_SORT_MAX, AUTH_PASSWORD, @@ -164,7 +164,7 @@ def get(user, query_server=None, cluster=None): def get_query_server_config(name='beeswax', connector=None): - if connector and (has_connectors() or is_compute(connector)): + if connector and (has_connectors() or find_compute_in_cluster(connector)): LOG.debug("Query via connector %s (%s)" % (name, connector.get('type'))) query_server = get_query_server_config_via_connector(connector) else: @@ -1042,14 +1042,14 @@ def use(self, database, session=None): query = hql_query('USE `%s`' % database) return self.client.use(query, session=session) - def get_log(self, query_handle, start_over=True): - return self.client.get_log(query_handle, start_over) + def get_log(self, query_handle, start_over=True, session=None): + return self.client.get_log(query_handle, start_over, session=session) def get_state(self, handle): return self.client.get_state(handle) - def get_operation_status(self, handle): - return self.client.get_operation_status(handle) + def get_operation_status(self, handle, session=None): + return self.client.get_operation_status(handle, session=session) def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5): """ diff --git a/apps/beeswax/src/beeswax/server/hive_metastore_server.py b/apps/beeswax/src/beeswax/server/hive_metastore_server.py index 965afaa7894..82b9be94731 100644 --- a/apps/beeswax/src/beeswax/server/hive_metastore_server.py +++ b/apps/beeswax/src/beeswax/server/hive_metastore_server.py @@ -142,7 +142,7 @@ def get_state(self, handle): def close(self, handle): pass - def get_operation_status(self, handle): + def get_operation_status(self, handle, session=None): return MockFinishedOperation() def get_default_configuration(self, *args, **kwargs): diff --git a/apps/beeswax/src/beeswax/server/hive_server2_lib.py b/apps/beeswax/src/beeswax/server/hive_server2_lib.py index d4ac2c41c0b..47752b586e8 100644 --- a/apps/beeswax/src/beeswax/server/hive_server2_lib.py +++ b/apps/beeswax/src/beeswax/server/hive_server2_lib.py @@ -1103,18 +1103,18 @@ def fetch_log(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT, return '\n'.join(lines) - def get_operation_status(self, operation_handle): + def get_operation_status(self, operation_handle, session=None): req = TGetOperationStatusReq(operationHandle=operation_handle) - (res, session) = self.call(self._client.GetOperationStatus, req) + (res, session) = self.call(self._client.GetOperationStatus, req, session=session) return res - def get_log(self, operation_handle): + def get_log(self, operation_handle, session=None): try: req = TGetLogReq(operationHandle=operation_handle) - (res, session) = self.call(self._client.GetLog, req) + (res, session) = self.call(self._client.GetLog, req, session=session) return res.log except Exception as e: - if 'Invalid query handle' in str(e): + if 'Invalid query handle' in str(e) or 'Invalid or unknown query handle' in str(e): message = 'Invalid query handle' LOG.error('%s: %s' % (message, e)) else: @@ -1436,9 +1436,9 @@ def get_state(self, handle): res = self._client.get_operation_status(operationHandle) return HiveServerQueryHistory.STATE_MAP[res.operationState] - def get_operation_status(self, handle): + def get_operation_status(self, handle, session=None): operationHandle = handle.get_rpc_handle() - return self._client.get_operation_status(operationHandle) + return self._client.get_operation_status(operationHandle, session=session) def use(self, query, session=None): data = self._client.execute_query(query, session=session) @@ -1482,11 +1482,11 @@ def close_session(self, session): def dump_config(self): return 'Does not exist in HS2' - def get_log(self, handle, start_over=True): + def get_log(self, handle, start_over=True, session=None): operationHandle = handle.get_rpc_handle() if beeswax_conf.USE_GET_LOG_API.get() or self.query_server.get('dialect') == 'impala': - return self._client.get_log(operationHandle) + return self._client.get_log(operationHandle, session=session) else: if start_over: orientation = TFetchOrientation.FETCH_FIRST diff --git a/apps/jobbrowser/src/jobbrowser/apis/query_api.py b/apps/jobbrowser/src/jobbrowser/apis/query_api.py index a92c6fa5eb2..13b881f2f7c 100644 --- a/apps/jobbrowser/src/jobbrowser/apis/query_api.py +++ b/apps/jobbrowser/src/jobbrowser/apis/query_api.py @@ -29,6 +29,7 @@ from babel import localtime from django.utils.translation import gettext as _ +from beeswax.common import extract_session_type from desktop.lib import export_csvxls from impala.conf import COORDINATOR_UI_SPNEGO from jobbrowser.apis.base_api import Api @@ -54,7 +55,7 @@ def _get_api(user, cluster=None): server_url = compute['options'].get('api_url') else: # TODO: multi computes if snippet.get('compute') or snippet['type'] has computes - application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala') + application = extract_session_type(cluster) or 'impala' session = Session.objects.get_session(user, application=application) server_url = _get_impala_server_url(session) return get_impalad_api(user=user, url=server_url) diff --git a/desktop/libs/notebook/src/notebook/connectors/base.py b/desktop/libs/notebook/src/notebook/connectors/base.py index 22c770868c0..f74fae99164 100644 --- a/desktop/libs/notebook/src/notebook/connectors/base.py +++ b/desktop/libs/notebook/src/notebook/connectors/base.py @@ -25,7 +25,7 @@ from django.utils.encoding import smart_str from django.utils.translation import gettext as _ -from beeswax.common import find_compute, is_compute +from beeswax.common import find_compute, find_compute_in_cluster from desktop.auth.backend import is_admin from desktop.conf import TASK_SERVER, has_connectors, is_cdw_compute_enabled from desktop.lib import export_csvxls @@ -402,7 +402,7 @@ def patch_snippet_for_connector(snippet, user=None): Connector backward compatibility switcher. # TODO Connector unification """ - if is_compute(snippet): + if find_compute_in_cluster(snippet): snippet['connector'] = find_compute(cluster=snippet, user=user) if snippet['connector'] and snippet['connector'].get('dialect'): snippet['dialect'] = snippet['connector']['dialect'] @@ -433,7 +433,7 @@ def get_api(request, snippet): if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user): LOG.debug('Using the interpreter from snippet') interpreter = snippet.get('interpreter') - elif is_cdw_compute_enabled(): + elif find_compute_in_cluster(snippet): LOG.debug("Finding the compute from db using snippet: %s" % snippet) interpreter = find_compute(cluster=snippet, user=request.user) if interpreter is None: diff --git a/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py b/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py index 21a24d6b225..30760d3a68a 100644 --- a/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py +++ b/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py @@ -28,7 +28,7 @@ from django.urls import reverse from django.utils.translation import gettext as _ -from beeswax.common import is_compute +from beeswax.common import extract_session_type from desktop.auth.backend import is_admin from desktop.conf import USE_DEFAULT_CONFIGURATION, has_connectors from desktop.lib.conf import BoundConfig @@ -321,8 +321,7 @@ def execute(self, notebook, snippet): db = self._get_db(snippet, interpreter=self.interpreter) statement = self._get_current_statement(notebook, snippet) - compute = snippet.get('compute', {}) - session_type = compute['name'] if is_compute(snippet) and compute.get('name') else snippet['type'] + session_type = extract_session_type(snippet) session = self._get_session(notebook, session_type) query = self._prepare_hql_query(snippet, statement['statement'], session)