Skip to content

Commit

Permalink
reducing async in live-index to try to narrow down the race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
jarohen committed Aug 15, 2024
1 parent 8b31860 commit 3d6bc53
Showing 1 changed file with 30 additions and 36 deletions.
66 changes: 30 additions & 36 deletions core/src/main/clojure/xtdb/indexer/live_index.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
(java.lang AutoCloseable)
(java.time Duration)
(java.util ArrayList HashMap Map)
(java.util.concurrent CompletableFuture CompletionException)
(java.util.concurrent StructuredTaskScope$ShutdownOnFailure StructuredTaskScope$Subtask)
(java.util.concurrent.locks StampedLock)
(java.util.function Function)
(org.apache.arrow.memory BufferAllocator)
(org.apache.arrow.vector.types.pojo Field)
xtdb.IBufferPool
(xtdb.api IndexerConfig TransactionKey)
(xtdb.arrow Relation RelationReader)
xtdb.IBufferPool
xtdb.metadata.IMetadataManager
(xtdb.trie LiveHashTrie)
(xtdb.util RefCounter RowCounter)
Expand All @@ -43,7 +42,7 @@
(^xtdb.indexer.live_index.ILiveTableTx startTx [^xtdb.api.TransactionKey txKey
^boolean newLiveTable])
(^xtdb.watermark.ILiveTableWatermark openWatermark [^boolean retain])
(^java.util.concurrent.CompletableFuture #_<List<Map$Entry>> finishChunk [^long firstRow ^long nextRow])
(^java.util.List #_<Map$Entry> finishChunk [^long firstRow ^long nextRow])
(^void close []))

#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
Expand Down Expand Up @@ -182,18 +181,15 @@
(.syncRowCount live-rel)
(let [row-count (.getPosition (.writerPosition live-rel))]
(when (pos? row-count)
(let [!fut (CompletableFuture/runAsync
(fn []
(with-open [data-rel (.openAsRelation live-rel)]
(trie/write-live-trie! allocator buffer-pool
(util/table-name->table-path table-name)
(trie/->log-l0-l1-trie-key 0 first-row next-row row-count)
live-trie data-rel))))
table-metadata (MapEntry/create table-name
{:fields (live-rel->fields live-rel)
:row-count row-count})]
(-> !fut
(.thenApply (fn [_] table-metadata)))))))
(with-open [data-rel (.openAsRelation live-rel)]
(trie/write-live-trie! allocator buffer-pool
(util/table-name->table-path table-name)
(trie/->log-l0-l1-trie-key 0 first-row next-row row-count)
live-trie data-rel)

(MapEntry/create table-name
{:fields (live-rel->fields live-rel)
:row-count row-count})))))

(openWatermark [this retain?] (live-table-wm live-rel (.live-trie this) retain?))

Expand Down Expand Up @@ -331,26 +327,24 @@
FinishChunk
(finish-chunk! [this]
(let [chunk-idx (.getChunkIdx row-counter)
next-chunk-idx (+ chunk-idx (.getChunkRowCount row-counter))

futs (->> (for [^ILiveTable table (.values tables)]
(.finishChunk table chunk-idx next-chunk-idx))

(remove nil?)
(into-array CompletableFuture))]

;; TODO currently non interruptible, meaning this waits for the tries to be written
(try
(.join (CompletableFuture/allOf futs))
(catch CompletionException e
(throw (.getCause e))))

(let [table-metadata (-> (into {} (keep deref) futs)
(util/rethrowing-cause))]
(.finishChunk metadata-mgr chunk-idx
{:latest-completed-tx latest-completed-tx
:next-chunk-idx next-chunk-idx
:tables table-metadata}))
next-chunk-idx (+ chunk-idx (.getChunkRowCount row-counter))]

(with-open [scope (StructuredTaskScope$ShutdownOnFailure.)]
(let [tasks (vec (for [^ILiveTable table (.values tables)]
(.fork scope (fn []
(.finishChunk table chunk-idx next-chunk-idx)))))]
(.join scope)

(let [table-metadata (-> (into {} (keep #(try
(.get ^StructuredTaskScope$Subtask %)
(catch Exception _
(throw (.exception ^StructuredTaskScope$Subtask %)))))
tasks)
(util/rethrowing-cause))]
(.finishChunk metadata-mgr chunk-idx
{:latest-completed-tx latest-completed-tx
:next-chunk-idx next-chunk-idx
:tables table-metadata}))))

(.nextChunk row-counter)

Expand Down

0 comments on commit 3d6bc53

Please sign in to comment.