Skip to content

Commit

Permalink
bringing the readers in line with the writers, xtdb#2506
Browse files Browse the repository at this point in the history
still to do:

- use these in the EE - we still delegate back to `IMonoVectorReader` et al.
  • Loading branch information
jarohen committed Jul 14, 2023
1 parent bc80730 commit 7d02935
Show file tree
Hide file tree
Showing 105 changed files with 3,331 additions and 2,354 deletions.
7 changes: 4 additions & 3 deletions .clj-kondo/config.edn
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
xtdb.util/with-open clojure.core/with-open
clojure.test.check.clojure-test/defspec clojure.test/deftest
clojure.test.check.properties/for-all clojure.core/for
juxt.clojars-mirrors.nextjdbc.v1v2v674.next.jdbc/with-transaction next.jdbc/with-transaction}
:hooks {:analyze-call {xtdb.pgwire/def-msg hooks/pgwire-def-msg}}
juxt.clojars-mirrors.nextjdbc.v1v2v674.next.jdbc/with-transaction next.jdbc/with-transaction
:hooks {:analyze-call {xtdb.pgwire/def-msg hooks/pgwire-def-msg
xtdb.vector.reader/def-reader-factory hooks/rdr-def-reader-factory}}
:linters {:not-empty? {:level :off}
:unresolved-symbol {:level :error
:exclude [(clojure.test/is [=plan-file])]}}}
:exclude [(clojure.test/is [=plan-file])]}}}}
13 changes: 13 additions & 0 deletions .clj-kondo/hooks.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,16 @@
(list (kondo/token-node 'def)
nm
(kondo/map-node opts)))})

(defn rdr-def-reader-factory [{{[_def-rdr-factory clazz arrow-vec-binding & methods] :children} :node}]
{:node (kondo/list-node
(list
(kondo/token-node 'let)
(kondo/vector-node
(list
(first arrow-vec-binding)
kondo/token-node 'nil))
(kondo/list-node
(list (kondo/token-node 'extend-protocol)
(kondo/token-node 'xtdb.vector.reader/ReaderFactory)
clazz))))})
10 changes: 5 additions & 5 deletions core/src/main/clojure/xtdb/blocks.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns xtdb.blocks
(:require [xtdb.util :as util])
(:import xtdb.ICursor
java.util.Iterator
[org.apache.arrow.vector VectorSchemaRoot]))
(:import [java.util Arrays Iterator]
[org.apache.arrow.vector VectorSchemaRoot]
xtdb.ICursor))

(deftype SliceCursor [^VectorSchemaRoot root
^Iterator row-counts
Expand All @@ -29,5 +29,5 @@
(when current-slice
(.close current-slice))))

(defn ->slices ^xtdb.ICursor [^VectorSchemaRoot root, ^Iterable row-counts]
(SliceCursor. root (.iterator row-counts) 0 nil))
(defn ->slices ^xtdb.ICursor [^VectorSchemaRoot root, ^ints row-counts]
(SliceCursor. root (.iterator (Arrays/stream row-counts)) 0 nil))
25 changes: 12 additions & 13 deletions core/src/main/clojure/xtdb/bloom.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
(:require [xtdb.expression :as expr]
[xtdb.types :as types]
[xtdb.util :as util]
[xtdb.vector.indirect :as iv])
[xtdb.vector.reader :as vr])
(:import java.nio.ByteBuffer
org.apache.arrow.memory.RootAllocator
(org.apache.arrow.memory.util.hash MurmurHasher SimpleHasher)
[org.apache.arrow.vector ValueVector VarBinaryVector]
org.roaringbitmap.buffer.ImmutableRoaringBitmap
org.roaringbitmap.RoaringBitmap
(xtdb.vector IIndirectRelation IIndirectVector IVectorWriter)))
(xtdb.vector RelationReader IVectorReader IVectorWriter)))

(set! *unchecked-math* :warn-on-boxed)

Expand Down Expand Up @@ -55,13 +55,11 @@
;; Cassandra-style hashes:
;; https://www.researchgate.net/publication/220770131_Less_Hashing_Same_Performance_Building_a_Better_Bloom_Filter
(defn bloom-hashes
(^ints [^IIndirectVector col ^long idx]
(^ints [^IVectorReader col ^long idx]
(bloom-hashes col idx bloom-k bloom-bit-mask))
(^ints [^IIndirectVector col ^long idx ^long k ^long mask]
(let [vec (.getVector col)
idx (.getIndex col idx)
hash-1 (.hashCode vec idx SimpleHasher/INSTANCE)
hash-2 (.hashCode vec idx (MurmurHasher. hash-1))
(^ints [^IVectorReader col ^long idx ^long k ^long mask]
(let [hash-1 (.hashCode col idx SimpleHasher/INSTANCE)
hash-2 (.hashCode col idx (MurmurHasher. hash-1))
acc (int-array k)]
(dotimes [n k]
(aset acc n (unchecked-int (bit-and mask (+ hash-1 (* hash-2 n))))))
Expand All @@ -76,30 +74,31 @@
:target-type target-col-type}
{:param-types {param param-type}})
{:keys [writer-bindings write-value-out!]} (expr/write-value-out-code return-type)]
(-> `(fn [~(-> expr/params-sym (expr/with-tag IIndirectRelation))
(-> `(fn [~(-> expr/params-sym (expr/with-tag RelationReader))
~(-> expr/out-vec-sym (expr/with-tag ValueVector))]
(let [~@(expr/batch-bindings emitted-expr)
~@writer-bindings]
~(continue (fn [return-type code]
`(do
~(write-value-out! return-type code)
(bloom-hashes (iv/->direct-vec ~expr/out-vec-sym) 0))))))
(bloom-hashes (vr/vec->reader ~expr/out-vec-sym) 0))))))
#_(doto (clojure.pprint/pprint))
(eval))))
(util/lru-memoize)))

(defn literal-hashes ^ints [^IIndirectRelation params param-expr target-col-type]
(defn literal-hashes ^ints [params param-expr target-col-type]
(let [f (literal-hasher param-expr target-col-type)]
(with-open [allocator (RootAllocator.)
tmp-vec (-> (types/col-type->field target-col-type)
(.createVector allocator))]
(f params tmp-vec))))

(defn write-bloom [^IVectorWriter bloom-wtr, ^IIndirectVector col]
(defn write-bloom [^IVectorWriter bloom-wtr, ^IVectorReader col]
(let [bloom (RoaringBitmap.)]
(dotimes [in-idx (.getValueCount col)]
(dotimes [in-idx (.valueCount col)]
(let [^ints el-hashes (bloom-hashes col in-idx)]
(.add bloom el-hashes)))

(let [buf (ByteBuffer/allocate (.serializedSizeInBytes bloom))]
(.serialize bloom buf)
(.writeBytes bloom-wtr (doto buf .clear)))))
2 changes: 1 addition & 1 deletion core/src/main/clojure/xtdb/cli.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
[clojure.string :as str]
[clojure.tools.cli :as cli]
[clojure.tools.logging :as log]
[xtdb.node :as node]
[xtdb.error :as err]
[xtdb.node :as node]
[xtdb.util :as util])
(:import java.io.File
java.net.URL
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/clojure/xtdb/coalesce.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
(:import java.util.function.Consumer
org.apache.arrow.memory.BufferAllocator
xtdb.ICursor
(xtdb.vector IIndirectRelation IRelationWriter)))
(xtdb.vector RelationReader)))

;; We pass the first 100 results through immediately, so that any limit-like queries don't need to wait for a full block to return rows.
;; Then, we coalesce small blocks together into blocks of at least 100, to share the per-block costs.
Expand All @@ -23,7 +23,7 @@
(let [!passed-on? (volatile! false)
advanced? (.tryAdvance cursor (reify Consumer
(accept [_ read-rel]
(let [^IIndirectRelation read-rel read-rel
(let [^RelationReader read-rel read-rel
row-count (.rowCount read-rel)
seen-rows (.seen-rows this)]
(cond
Expand Down
1 change: 1 addition & 0 deletions core/src/main/clojure/xtdb/datalog.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@

plan (-> plan
(apply-datalog-specific-rewrites basis wm-src scan-emitter)

#_(doto clojure.pprint/pprint)
#_(->> (binding [*print-meta* true]))
(lp/rewrite-plan {})
Expand Down
34 changes: 17 additions & 17 deletions core/src/main/clojure/xtdb/expression.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[xtdb.rewrite :refer [zmatch]]
[xtdb.types :as types]
[xtdb.util :as util]
[xtdb.vector.indirect :as iv]
[xtdb.vector.reader :as vr]
[xtdb.vector.writer :as vw])
(:import (clojure.lang Keyword MapEntry)
(java.nio ByteBuffer)
Expand All @@ -14,12 +14,12 @@
(java.util Arrays Date List)
(java.util.regex Pattern)
(java.util.stream IntStream)
(org.apache.arrow.vector BitVector PeriodDuration ValueVector)
(org.apache.arrow.vector PeriodDuration ValueVector)
(org.apache.commons.codec.binary Hex)
(xtdb StringUtil)
(xtdb.operator IProjectionSpec IRelationSelector)
(xtdb.types IntervalDayTime IntervalMonthDayNano IntervalYearMonth)
(xtdb.vector IIndirectRelation IIndirectVector IMonoVectorReader IPolyVectorReader IStructValueReader MonoToPolyReader RemappedTypeIdReader)
(xtdb.vector IMonoVectorReader IPolyVectorReader RelationReader IStructValueReader IVectorReader MonoToPolyReader RemappedTypeIdReader)
xtdb.vector.ValueBox))

(set! *unchecked-math* :warn-on-boxed)
Expand Down Expand Up @@ -415,12 +415,12 @@
{:return-type col-type
:batch-bindings (if (types/union? col-type)
(if (and extract-vecs-from-rel? extract-vec-from-rel?)
[[sanitized-var `(.polyReader (.vectorForName ~rel ~(str variable))
[[sanitized-var `(.polyReader (.readerForName ~rel ~(str variable))
'~col-type)]]
[[sanitized-var `(some-> ~sanitized-var (.polyReader '~col-type))]])

(if (and extract-vecs-from-rel? extract-vec-from-rel?)
[[sanitized-var `(.monoReader (.vectorForName ~rel ~(str variable)) '~col-type)]]
[[sanitized-var `(.monoReader (.readerForName ~rel ~(str variable)) '~col-type)]]
[[sanitized-var `(some-> ~sanitized-var (.monoReader '~col-type))]]))
:continue (fn [f]
(continue-read f col-type sanitized-var idx))}))
Expand Down Expand Up @@ -1534,8 +1534,8 @@
{:keys [return-type continue] :as emitted-expr} (codegen-expr expr opts)
{:keys [writer-bindings write-value-out!]} (write-value-out-code return-type)]
{:!projection-fn (delay
(-> `(fn [~(-> rel-sym (with-tag IIndirectRelation))
~(-> params-sym (with-tag IIndirectRelation))
(-> `(fn [~(-> rel-sym (with-tag RelationReader))
~(-> params-sym (with-tag RelationReader))
~(-> out-vec-sym (with-tag ValueVector))]
(let [~@(batch-bindings emitted-expr)
~@writer-bindings
Expand All @@ -1545,18 +1545,19 @@
(write-value-out! t c))))))

#_(doto clojure.pprint/pprint)
#_(->> (binding [*print-meta* true]))
eval))

:return-type return-type}))
(util/lru-memoize)
wrap-zone-id-cache-buster))

(defn ->param-types [^IIndirectRelation params]
(defn ->param-types [^RelationReader params]
(->> params
(into {} (map (fn [^IIndirectVector col]
(into {} (map (fn [^IVectorReader col]
(MapEntry/create
(symbol (.getName col))
(types/field->col-type (.getField (.getVector col)))))))))
(types/field->col-type (.getField col))))))))

(defn ->expression-projection-spec ^xtdb.operator.IProjectionSpec [col-name form {:keys [col-types param-types] :as input-types}]
(let [expr (form->expr form input-types)
Expand All @@ -1572,9 +1573,9 @@

(project [_ allocator in-rel params]
(let [var->col-type (->> (seq in-rel)
(into {} (map (fn [^IIndirectVector iv]
(into {} (map (fn [^IVectorReader iv]
[(symbol (.getName iv))
(types/field->col-type (.getField (.getVector iv)))]))))
(types/field->col-type (.getField iv))]))))

{:keys [return-type !projection-fn]} (emit-projection expr {:param-types (->param-types params)
:var->col-type var->col-type})
Expand All @@ -1586,16 +1587,15 @@
(.allocateNew))
(@!projection-fn in-rel params out-vec)
(.setValueCount out-vec row-count)
(iv/->direct-vec out-vec)))))))
(vr/vec->reader out-vec)))))))

(defn ->expression-relation-selector ^xtdb.operator.IRelationSelector [form input-types]
(let [projector (->expression-projection-spec "select" (list 'boolean form) input-types)]
(reify IRelationSelector
(select [_ al in-rel params]
(with-open [selection (.project projector al in-rel params)]
(let [^BitVector sel-vec (.getVector selection)
res (IntStream/builder)]
(dotimes [idx (.getValueCount selection)]
(when (= 1 (.get sel-vec (.getIndex selection idx)))
(let [res (IntStream/builder)]
(dotimes [idx (.valueCount selection)]
(when (.getBoolean selection idx)
(.add res idx)))
(.toArray (.build res))))))))
15 changes: 7 additions & 8 deletions core/src/main/clojure/xtdb/expression/comparator.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
(:require [xtdb.expression :as expr]
[xtdb.types :as types]
[xtdb.util :as util])
(:import (xtdb.vector IIndirectVector)
java.util.HashMap
java.util.function.IntBinaryOperator))
(:import java.util.function.IntBinaryOperator
(xtdb.vector IVectorReader)))

(set! *unchecked-math* :warn-on-boxed)

Expand Down Expand Up @@ -74,8 +73,8 @@
{:var->col-type {left-col-sym left-col-type, right-col-sym right-col-type}
:extract-vecs-from-rel? false})]

(-> `(fn [~(-> left-col-sym (expr/with-tag IIndirectVector))
~(-> right-col-sym (expr/with-tag IIndirectVector))]
(-> `(fn [~(-> left-col-sym (expr/with-tag IVectorReader))
~(-> right-col-sym (expr/with-tag IVectorReader))]
(let [~@(expr/batch-bindings emitted-expr)]
(reify IntBinaryOperator
(~'applyAsInt [_# ~left-idx-sym ~right-idx-sym]
Expand All @@ -84,9 +83,9 @@
(eval))))
(util/lru-memoize)))

(defn ->comparator ^java.util.function.IntBinaryOperator [^IIndirectVector left-col, ^IIndirectVector right-col, null-ordering]
(let [left-field (.getField (.getVector left-col))
right-field (.getField (.getVector right-col))
(defn ->comparator ^java.util.function.IntBinaryOperator [^IVectorReader left-col, ^IVectorReader right-col, null-ordering]
(let [left-field (.getField left-col)
right-field (.getField right-col)
f (build-comparator (types/field->col-type left-field)
(types/field->col-type right-field)
null-ordering)]
Expand Down
Loading

0 comments on commit 7d02935

Please sign in to comment.