Skip to content

Commit

Permalink
rather than creating a separate null leg in DUVs if any inputs are nu…
Browse files Browse the repository at this point in the history
…llable, we make the legs themselves nullable. change mostly in types/merge-fields. resolves xtdb#3673
  • Loading branch information
jarohen committed Sep 5, 2024
1 parent a697504 commit 467cba4
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 43 deletions.
72 changes: 36 additions & 36 deletions core/src/main/clojure/xtdb/types.clj
Original file line number Diff line number Diff line change
Expand Up @@ -350,51 +350,49 @@
;; beware that anywhere this is used, naming of the fields (apart from struct subfields) should not matter
(defn merge-fields* [& fields]
(letfn [(merge-field* [acc ^Field field]
(let [arrow-type (.getType field)
nullable? (.isNullable field)
acc (cond-> acc
nullable? (assoc #xt.arrow/type :null nil))]
(condp = (class arrow-type)
ArrowType$Null acc
ArrowType$Union (reduce merge-field* acc (.getChildren field))
ArrowType$List (update acc arrow-type merge-field* (first (.getChildren field)))
SetType (update acc arrow-type merge-field* (first (.getChildren field)))
ArrowType$FixedSizeList (update acc arrow-type merge-field*
(first (.getChildren field)))
ArrowType$Struct (update acc arrow-type
(fn [acc]
(let [default-field-mapping (if acc {#xt.arrow/type :null nil} nil)
children (.getChildren field)]
(as-> acc acc
(reduce (fn [acc ^Field field]
(update acc (.getName field) (fnil merge-field* default-field-mapping) field))
acc
children)
(reduce (fn [acc null-k]
(update acc null-k merge-field* null-field))
acc
(set/difference (set (keys acc))
(set (map #(.getName ^Field %) children))))))))
(assoc acc (.getType field) nil))))

(kv->field [[arrow-type opts] {:keys [nullable?] :or {nullable? false}}]
(let [arrow-type (.getType field)]
(if (instance? ArrowType$Union arrow-type)
(reduce merge-field* acc (.getChildren field))

(-> acc
(update arrow-type
(fn [{:keys [nullable?] :as type-opts}]
(into {:nullable? (or nullable? (.isNullable field))}
(condp = (class arrow-type)
ArrowType$List {:el (merge-field* (:el type-opts) (first (.getChildren field)))}
SetType {:el (merge-field* (:el type-opts) (first (.getChildren field)))}
ArrowType$FixedSizeList {:el (merge-field* (:el type-opts) (first (.getChildren field)))}
ArrowType$Struct {:fields (let [default-field-mapping (if type-opts {#xt.arrow/type :null {:nullable? true}} nil)
children (.getChildren field)]
(as-> (:fields type-opts) fields-acc
(reduce (fn [field-acc ^Field field]
(update field-acc (.getName field) (fnil merge-field* default-field-mapping) field))
fields-acc
children)
(reduce (fn [field-acc null-k]
(update field-acc null-k merge-field* null-field))
fields-acc
(set/difference (set (keys fields-acc))
(set (map #(.getName ^Field %) children))))))}
{}))))))))

(kv->field [[arrow-type {:keys [nullable?] :as type-opts}]]
(condp instance? arrow-type
ArrowType$List (->field-default-name arrow-type nullable? [(map->field opts)])
SetType (->field-default-name arrow-type nullable? [(map->field opts)])
ArrowType$FixedSizeList (->field-default-name arrow-type (or (contains? opts #xt.arrow/type :null) nullable?)
[(map->field (dissoc opts #xt.arrow/type :null))])
ArrowType$List (->field-default-name arrow-type nullable? [(map->field (:el type-opts))])
SetType (->field-default-name arrow-type nullable? [(map->field (:el type-opts))])
ArrowType$FixedSizeList (->field-default-name arrow-type nullable? [(map->field (:el type-opts))])
ArrowType$Struct (->field-default-name arrow-type nullable?
(map (fn [[name opts]]
(let [^Field field (map->field opts)]
(apply ->field name (.getType field) (.isNullable field)
(cond->> (.getChildren field)
(not= ArrowType$Struct (class (.getType field)))
(map ->canonical-field)))))
opts))
(:fields type-opts)))

ArrowType$Null (->field-default-name #xt.arrow/type :null true nil)

TsTzRangeType (->field-default-name #xt.arrow/type :tstz-range true
TsTzRangeType (->field-default-name #xt.arrow/type :tstz-range nullable?
[(->field "$data" temporal-arrow-type false)])

(->field-default-name arrow-type nullable? nil)))
Expand All @@ -404,9 +402,11 @@
nullable? (contains? arrow-type-map #xt.arrow/type :null)]
(case (count without-null)
0 null-field
1 (kv->field (first without-null) {:nullable? nullable?})
1 (kv->field (let [[arrow-type type-opts] (first without-null)]
[arrow-type (cond-> type-opts
nullable? (assoc :nullable? true))]))

(->field-default-name #xt.arrow/type :union false (map #(kv->field % {}) arrow-type-map)))))]
(->field-default-name #xt.arrow/type :union false (map kv->field arrow-type-map)))))]

(-> (transduce (comp (remove nil?) (distinct)) (completing merge-field*) {} fields)
(map->field))))
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/kotlin/xtdb/arrow/DenseUnionVector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DenseUnionVector(
override fun endList() = writeValueThen().endList()

override fun rowCopier0(src: VectorReader): RowCopier {
val innerCopier = inner.rowCopier0(src)
val innerCopier = src.rowCopier(inner)
return RowCopier { srcIdx -> valueCount.also { writeValueThen(); innerCopier.copyRow(srcIdx) } }
}

Expand Down Expand Up @@ -168,7 +168,9 @@ class DenseUnionVector(
for (i in legVectors.indices) {
val leg = legVectors[i]
if (leg.name == name) {
if (leg.field.fieldType != fieldType) TODO("promotion")
val legFieldType = leg.field.fieldType
if (legFieldType.type != fieldType.type || (fieldType.isNullable && !legFieldType.isNullable))
TODO("promotion")

return LegWriter(i.toByte(), leg)
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/kotlin/xtdb/arrow/VectorReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ interface VectorReader : AutoCloseable {
fun select(idxs: IntArray): VectorReader = IndirectVector(this, selection(idxs))
val asList get() = List(valueCount) { getObject(it) }

fun rowCopier(dest: VectorWriter) = dest.rowCopier0(this).let { copier ->
RowCopier { srcIdx ->
fun rowCopier(dest: VectorWriter): RowCopier {
val copier = dest.rowCopier0(this)
return if (dest is DenseUnionVector) copier
else RowCopier { srcIdx ->
if (isNull(srcIdx)) valueCount.also { dest.writeNull() } else copier.copyRow(srcIdx)
}
}
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/kotlin/xtdb/arrow/DenseUnionVectorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,26 @@ class DenseUnionVectorTest {
}
}
}

@Test
fun `nullable mono into DUV`() {
DoubleVector(allocator, "dbl", true).use { dblVec ->
dblVec.writeDouble(3.14)
dblVec.writeNull()
dblVec.writeDouble(2.71)

DenseUnionVector(
allocator, "dest",
listOf(DoubleVector(allocator, "f64", true))
).use { destVec ->
dblVec.rowCopier(destVec).run {
copyRow(0)
copyRow(1)
copyRow(2)
}

assertEquals(listOf(3.14, null, 2.71), destVec.asList)
}
}
}
}
28 changes: 26 additions & 2 deletions src/test/clojure/xtdb/compactor_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[xtdb.buffer-pool :as bp]
[xtdb.compactor :as c]
[xtdb.indexer.live-index :as li]
[xtdb.node :as xtn]
[xtdb.test-json :as tj]
[xtdb.test-util :as tu]
[xtdb.time :as time]
Expand Down Expand Up @@ -453,8 +454,31 @@

(t/is (= #{{:xt/id 0, :count 12} {:xt/id 1, :count 12}}
(set (xt/q node "SELECT _id, count(*) count
FROM docs FOR ALL VALID_TIME
GROUP BY _id"))))
FROM docs FOR ALL VALID_TIME
GROUP BY _id"))))

(tj/check-json (.toPath (io/as-file (io/resource "xtdb/compactor-test/lose-data-on-compaction")))
(.resolve node-dir (tables-key "public$docs")) #"log-(.+)\.arrow")))))

(t/deftest test-compaction-promotion-bug-3673
(util/with-open [node (xtn/start-node)]
(xt/submit-tx node [[:put-docs :foo {:xt/id 0, :foo 12.0} {:xt/id 1}]])
(tu/finish-chunk! node)
(c/compact-all! node #xt.time/duration "PT0.5S")

(xt/submit-tx node [[:put-docs :foo {:xt/id 2, :foo 24} {:xt/id 3, :foo 28.1} {:xt/id 4}]])
(tu/finish-chunk! node)
(c/compact-all! node #xt.time/duration "PT0.5S")

(t/is (= #{12.0 nil 24 28.1}
(->> (xt/q node "SELECT foo FROM foo")
(into #{} (map :foo))))))

(util/with-open [node (xtn/start-node)]
(xt/submit-tx node [[:put-docs :foo {:xt/id 1} {:foo "foo", :xt/id 0} {:foo 3, :xt/id 2}]])
(tu/finish-chunk! node)
(c/compact-all! node #xt.time/duration "PT0.5S")

(t/is (= #{"foo" nil 3}
(->> (xt/q node "SELECT foo FROM foo")
(into #{} (map :foo)))))))
17 changes: 16 additions & 1 deletion src/test/clojure/xtdb/types_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,22 @@
(types/merge-fields (types/col-type->field :null) (types/col-type->field :i64))))

(t/is (= (types/col-type->field "union" [:union #{:null :i64 :f64}])
(types/merge-fields (types/col-type->field :f64) (types/col-type->field :null) (types/col-type->field :i64)))))
(types/merge-fields (types/col-type->field :f64) (types/col-type->field :null) (types/col-type->field :i64))))

(t/testing "nulls kept within the legs they were originally in"
(t/is (= (types/->field "union" #xt.arrow/type :union false
(types/col-type->field :f64)
(types/col-type->field "i64" [:union #{:null :i64}]))
(types/merge-fields (types/col-type->field :f64)
(types/col-type->field [:union #{:null :i64}]))))

(t/is (= (types/->field "union" #xt.arrow/type :union false
(types/col-type->field :f64)
(types/col-type->field :f32)
(types/col-type->field "i64" [:union #{:null :i64}]))
(types/merge-fields (types/col-type->field [:union #{:f64 :f32}])
(types/col-type->field [:union #{:null :i64}])))
"other unions flattened")))

(t/testing "sets"
(t/is (= (types/col-type->field [:set :i64])
Expand Down

0 comments on commit 467cba4

Please sign in to comment.