- 
                Notifications
    
You must be signed in to change notification settings  - Fork 28.9k
 
[SPARK-54108][CONNECT] Revise execute* methods of SparkConnectStatement #52810
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (resultSet != null) { | ||
| -1 | ||
| } else { | ||
| 0 // always return 0 because affected rows is not supported yet | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can not be supported soon because it requires non-trivial work on the classic sql module to add metrics for all data writing commands and modify the connect protocol to retrieve them back to the client.
| val df = conn.spark.sql(sql) | ||
| val sparkResult = df.collectResult() | ||
| operationId = sparkResult.operationId | ||
| if (hasResultSet(sparkResult)) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked some well-known JDBC drivers, and the implementation can be classified into:
- have a simple JDBC driver-side SQL parser to classify SQL type, e.g., DML, DDL, and justify by the SQL type whether it will return result sets
 - Trino does this better, the server sets the query type in the analyzing phase and returns such info to the client before execution.
 - blindly execute the queries and check the returned result sets
 
here we use the last approach.
| 0 | ||
| private def hasResultSet(sparkResult: SparkResult[_]): Boolean = { | ||
| // suppose this works in most cases | ||
| sparkResult.schema.length > 0 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find a counterexample after a quick thought, please let me know if you have a more reliable approach
| 
           cc @LuciferYang  | 
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks ok
        
          
                ...ent/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ent/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ent/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
          
            Show resolved
            Hide resolved
        
              
          
                ...dbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      ### What changes were proposed in this pull request?
This PR revises the following 3 `execute*` methods and one additional `getUpdateCount` method  of `SparkConnectStatement` that are defined in `java.sql.Statement`
```java
    /**
     * Executes the given SQL statement, which returns a single
     * {code ResultSet} object.
     *<p>
     * <strong>Note:</strong>This method cannot be called on a
     * {code PreparedStatement} or {code CallableStatement}.
     * param sql an SQL statement to be sent to the database, typically a
     *        static SQL {code SELECT} statement
     * return a {code ResultSet} object that contains the data produced
     *         by the given query; never {code null}
     * throws SQLException if a database access error occurs,
     * this method is called on a closed {code Statement}, the given
     *            SQL statement produces anything other than a single
     *            {code ResultSet} object, the method is called on a
     * {code PreparedStatement} or {code CallableStatement}
     * throws SQLTimeoutException when the driver has determined that the
     * timeout value that was specified by the {code setQueryTimeout}
     * method has been exceeded and has at least attempted to cancel
     * the currently running {code Statement}
     */
    ResultSet executeQuery(String sql) throws SQLException;
    /**
     * Executes the given SQL statement, which may be an {code INSERT},
     * {code UPDATE}, or {code DELETE} statement or an
     * SQL statement that returns nothing, such as an SQL DDL statement.
     *<p>
     * <strong>Note:</strong>This method cannot be called on a
     * {code PreparedStatement} or {code CallableStatement}.
     * param sql an SQL Data Manipulation Language (DML) statement, such as {code INSERT}, {code UPDATE} or
     * {code DELETE}; or an SQL statement that returns nothing,
     * such as a DDL statement.
     *
     * return either (1) the row count for SQL Data Manipulation Language (DML) statements
     *         or (2) 0 for SQL statements that return nothing
     *
     * throws SQLException if a database access error occurs,
     * this method is called on a closed {code Statement}, the given
     * SQL statement produces a {code ResultSet} object, the method is called on a
     * {code PreparedStatement} or {code CallableStatement}
     * throws SQLTimeoutException when the driver has determined that the
     * timeout value that was specified by the {code setQueryTimeout}
     * method has been exceeded and has at least attempted to cancel
     * the currently running {code Statement}
     */
    int executeUpdate(String sql) throws SQLException;
    /**
     * Executes the given SQL statement, which may return multiple results.
     * In some (uncommon) situations, a single SQL statement may return
     * multiple result sets and/or update counts.  Normally you can ignore
     * this unless you are (1) executing a stored procedure that you know may
     * return multiple results or (2) you are dynamically executing an
     * unknown SQL string.
     * <P>
     * The {code execute} method executes an SQL statement and indicates the
     * form of the first result.  You must then use the methods
     * {code getResultSet} or {code getUpdateCount}
     * to retrieve the result, and {code getMoreResults} to
     * move to any subsequent result(s).
     * <p>
     *<strong>Note:</strong>This method cannot be called on a
     * {code PreparedStatement} or {code CallableStatement}.
     * param sql any SQL statement
     * return {code true} if the first result is a {code ResultSet}
     *         object; {code false} if it is an update count or there are
     *         no results
     * throws SQLException if a database access error occurs,
     * this method is called on a closed {code Statement},
     * the method is called on a
     * {code PreparedStatement} or {code CallableStatement}
     * throws SQLTimeoutException when the driver has determined that the
     * timeout value that was specified by the {code setQueryTimeout}
     * method has been exceeded and has at least attempted to cancel
     * the currently running {code Statement}
     * see #getResultSet
     * see #getUpdateCount
     * see #getMoreResults
     */
    boolean execute(String sql) throws SQLException;
    /**
     *  Retrieves the current result as an update count;
     *  if the result is a {code ResultSet} object or there are no more results, -1
     *  is returned. This method should be called only once per result.
     *
     * return the current result as an update count; -1 if the current result is a
     * {code ResultSet} object or there are no more results
     * throws SQLException if a database access error occurs or
     * this method is called on a closed {code Statement}
     * see #execute
     */
    int getUpdateCount() throws SQLException;
```
### Why are the changes needed?
Make the implementation respect the JDBC API specification.
### Does this PR introduce _any_ user-facing change?
No, Connect JDBC Driver is an unreleased feature.
### How was this patch tested?
New UTs are added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52810 from pan3793/SPARK-54108.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit edfd357)
Signed-off-by: yangjie01 <[email protected]>
    | 
           Merged into master/branch-4.1. Thanks @pan3793  | 
    
| 
           Thank you, @pan3793 and @LuciferYang .  | 
    
What changes were proposed in this pull request?
This PR revises the following 3
execute*methods and one additionalgetUpdateCountmethod ofSparkConnectStatementthat are defined injava.sql.StatementWhy are the changes needed?
Make the implementation respect the JDBC API specification.
Does this PR introduce any user-facing change?
No, Connect JDBC Driver is an unreleased feature.
How was this patch tested?
New UTs are added.
Was this patch authored or co-authored using generative AI tooling?
No.