Skip to content

Commit

Permalink
CSV uploads for ClickHouse Cloud (#236)
Browse files Browse the repository at this point in the history
* CSV uploads

* ~ move function

* Set `wait_end_of_query=1` for clickhouse cloud when creating the table

* Only support uploads for CH Cloud DBs

* Fix reflection warnings

* Use order by to specify primary key

* set select_sequential_consistency in connection details

* Avoid setting select_sequential_consistency for on-premise

* Remove unused imports

* ~ 49.11

* ~ fix test and add test for `select_sequential_consistency`
  • Loading branch information
calherries authored May 17, 2024
1 parent b0aca28 commit 409984e
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
uses: actions/checkout@v2
with:
repository: metabase/metabase
ref: v0.49.6
ref: v0.49.11

- name: Remove incompatible tests
# dataset-definition-test tests test data definition,
Expand Down
123 changes: 116 additions & 7 deletions src/metabase/driver/clickhouse.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns metabase.driver.clickhouse
"Driver for ClickHouse databases"
#_{:clj-kondo/ignore [:unsorted-required-namespaces]}
(:require [clojure.string :as str]
(:require [clojure.core.memoize :as memoize]
[clojure.string :as str]
[honey.sql :as sql]
[metabase [config :as config]]
[metabase.driver :as driver]
[metabase.driver.clickhouse-introspection]
Expand All @@ -12,8 +14,11 @@
[metabase.driver.sql-jdbc [common :as sql-jdbc.common]
[connection :as sql-jdbc.conn]]
[metabase.driver.sql-jdbc.execute :as sql-jdbc.execute]
[metabase.driver.sql.query-processor :as sql.qp]
[metabase.driver.sql.util :as sql.u]
[metabase.util.log :as log]))
[metabase.upload :as upload]
[metabase.util.log :as log])
(:import [com.clickhouse.jdbc.internal ClickHouseStatementImpl]))

(set! *warn-on-reflection* true)

Expand All @@ -32,17 +37,17 @@
:test/jvm-timezone-setting false
:connection-impersonation false
:schemas true
:datetime-diff true}]
:datetime-diff true
:upload-with-auto-pk false}]

(defmethod driver/database-supports? [:clickhouse feature] [_driver _feature _db] supported?))

(def ^:private default-connection-details
{:user "default" :password "" :dbname "default" :host "localhost" :port "8123"})

(defmethod sql-jdbc.conn/connection-details->spec :clickhouse
[_ details]
;; ensure defaults merge on top of nils
(let [details (reduce-kv (fn [m k v] (assoc m k (or v (k default-connection-details))))
(defn- connection-details->spec* [details]
(let [;; ensure defaults merge on top of nils
details (reduce-kv (fn [m k v] (assoc m k (or v (k default-connection-details))))
default-connection-details
details)
{:keys [user password dbname host port ssl use-no-proxy]} details
Expand All @@ -61,6 +66,33 @@
:product_name product-name}
(sql-jdbc.common/handle-additional-options details :separator-style :url))))

(def ^:private ^{:arglists '([db-details])} cloud?
"Is this a cloud DB?"
(memoize/ttl
(fn [db-details]
(sql-jdbc.execute/do-with-connection-with-options
:clickhouse
(connection-details->spec* db-details)
nil
(fn [^java.sql.Connection conn]
(with-open [stmt (.prepareStatement conn "SELECT value='1' FROM system.settings WHERE name='cloud_mode'")
rset (.executeQuery stmt)]
(when (.next rset)
(.getBoolean rset 1))))))
;; cache the results for 48 hours; TTL is here only to eventually clear out old entries
:ttl/threshold (* 48 60 60 1000)))

(defmethod sql-jdbc.conn/connection-details->spec :clickhouse
[_ details]
(cond-> (connection-details->spec* details)
(cloud? details)
;; select_sequential_consistency guarantees that we can query data from any replica in CH Cloud
;; immediately after it is written
(assoc :select_sequential_consistency true)))

(defmethod driver/database-supports? [:clickhouse :uploads] [_driver _feature db]
(cloud? (:details db)))

(defmethod driver/can-connect? :clickhouse
[driver details]
(if config/is-test?
Expand Down Expand Up @@ -112,6 +144,83 @@
:semantic-version {:major (.getInt rset 2)
:minor (.getInt rset 3)}})))))

(defmethod driver/upload-type->database-type :clickhouse
[_driver upload-type]
(case upload-type
::upload/varchar-255 "Nullable(String)"
::upload/text "Nullable(String)"
::upload/int "Nullable(Int64)"
::upload/float "Nullable(Float64)"
::upload/boolean "Nullable(Boolean)"
::upload/date "Nullable(Date32)"
::upload/datetime "Nullable(DateTime64(3))"
;; FIXME: should be `Nullable(DateTime64(3))`
::upload/offset-datetime nil))

(defmethod driver/table-name-length-limit :clickhouse
[_driver]
;; FIXME: This is a lie because you're really limited by a filesystems' limits, because Clickhouse uses
;; filenames as table/column names. But its an approximation
206)

(defn- quote-name [s]
(let [parts (str/split (name s) #"\.")]
(str/join "." (map #(str "`" % "`") parts))))

(defn- create-table!-sql
"Creates a ClickHouse table with the given name and column definitions. It assumes the engine is MergeTree,
so it only works with Clickhouse Cloud and single node on-premise deployments at the moment."
[driver table-name column-definitions & {:keys [primary-key]}]
(str/join "\n"
[(first (sql/format {:create-table (keyword table-name)
:with-columns (mapv (fn [[name type-spec]]
(vec (cons name [[:raw type-spec]])))
column-definitions)}
:quoted true
:dialect (sql.qp/quote-style driver)))
"ENGINE = MergeTree"
(format "ORDER BY (%s)" (str/join ", " (map quote-name primary-key)))]))

(defmethod driver/create-table! :clickhouse
[driver db-id table-name column-definitions & {:keys [primary-key]}]
(sql-jdbc.execute/do-with-connection-with-options
driver
db-id
{:write? true}
(fn [^java.sql.Connection conn]
(with-open [stmt (.createStatement conn)]
(let [^ClickHouseStatementImpl stmt (.unwrap stmt ClickHouseStatementImpl)
request (.getRequest stmt)]
(.set request "wait_end_of_query" "1")
(with-open [_response (-> request
(.query ^String (create-table!-sql driver table-name column-definitions :primary-key primary-key))
(.executeAndWait))]))))))

(defmethod driver/insert-into! :clickhouse
[driver db-id table-name column-names values]
(when (seq values)
(sql-jdbc.execute/do-with-connection-with-options
driver
db-id
{:write? true}
(fn [^java.sql.Connection conn]
(let [sql (format "INSERT INTO %s (%s)" (quote-name table-name) (str/join ", " (map quote-name column-names)))]
(with-open [ps (.prepareStatement conn sql)]
(doseq [row values]
(when (seq row)
(doseq [[idx v] (map-indexed (fn [x y] [(inc x) y]) row)]
(condp isa? (type v)
java.lang.String (.setString ps idx v)
java.lang.Boolean (.setBoolean ps idx v)
java.lang.Long (.setLong ps idx v)
java.lang.Double (.setFloat ps idx v)
java.math.BigInteger (.setObject ps idx v)
java.time.LocalDate (.setObject ps idx v)
java.time.LocalDateTime (.setObject ps idx v)
(.setString ps idx v)))
(.addBatch ps)))
(doall (.executeBatch ps))))))))

;;; ------------------------------------------ User Impersonation ------------------------------------------

(defmethod driver.sql/set-role-statement :clickhouse
Expand Down
70 changes: 42 additions & 28 deletions test/metabase/driver/clickhouse_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[cljc.java-time.temporal.chrono-unit :as chrono-unit]
[clojure.test :refer :all]
[metabase.driver :as driver]
[metabase.driver.clickhouse :as clickhouse]
[metabase.driver.clickhouse-data-types-test]
[metabase.driver.clickhouse-introspection-test]
[metabase.driver.clickhouse-substitution-test]
Expand Down Expand Up @@ -56,34 +57,47 @@
(offset-date-time/parse shanghai-now date-time-formatter/iso-offset-date-time))))))))

(deftest ^:parallel clickhouse-connection-string
(testing "connection with no additional options"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{}))))
(testing "custom connection with additional options"
(is (= (merge
ctd/default-connection-params
{:subname "//myclickhouse:9999/foo?sessionTimeout=42"
:user "bob"
:password "qaz"
:use_no_proxy true
:ssl true})
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:host "myclickhouse"
:port 9999
:user "bob"
:password "qaz"
:dbname "foo"
:use-no-proxy true
:additional-options "sessionTimeout=42"
:ssl true}))))
(testing "nil dbname handling"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:dbname nil})))))
(mt/with-dynamic-redefs [;; This function's implementation requires the connection details to actually connect to the
;; database, which is orthogonal to the purpose of this test.
clickhouse/cloud? (constantly false)]
(testing "connection with no additional options"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{}))))
(testing "custom connection with additional options"
(is (= (merge
ctd/default-connection-params
{:subname "//myclickhouse:9999/foo?sessionTimeout=42"
:user "bob"
:password "qaz"
:use_no_proxy true
:ssl true})
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:host "myclickhouse"
:port 9999
:user "bob"
:password "qaz"
:dbname "foo"
:use-no-proxy true
:additional-options "sessionTimeout=42"
:ssl true}))))
(testing "nil dbname handling"
(is (= ctd/default-connection-params
(sql-jdbc.conn/connection-details->spec
:clickhouse
{:dbname nil}))))))

(deftest ^:parallel clickhouse-connection-string-select-sequential-consistency
(mt/with-dynamic-redefs [;; This function's implementation requires the connection details to actually
;; connect to the database, which is orthogonal to the purpose of this test.
clickhouse/cloud? (constantly true)]
(testing "connection with no additional options"
(is (= (assoc ctd/default-connection-params :select_sequential_consistency true)
(sql-jdbc.conn/connection-details->spec
:clickhouse
{}))))))

(deftest ^:parallel clickhouse-tls
(mt/test-driver
Expand Down
2 changes: 2 additions & 0 deletions test/metabase/test/data/clickhouse.clj
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@

(defmethod sql.tx/add-fk-sql :clickhouse [& _] nil) ; TODO - fix me

(defmethod sql.tx/session-schema :clickhouse [_] "default")

(defmethod tx/supports-time-type? :clickhouse [_driver] false)

(defn rows-without-index
Expand Down

0 comments on commit 409984e

Please sign in to comment.