Skip to content

Commit affb9d2

Browse files
committedSep 11, 2024·
Support listeners in KvinPartitioned.
1 parent ca1b5ec commit affb9d2

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed
 

‎bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void scheduleCyclicArchival() {
128128
public boolean addListener(KvinListener listener) {
129129
try {
130130
listeners.add(listener);
131-
return true;
131+
return hotStore.addListener(listener);
132132
} catch (Exception e) {
133133
throw new RuntimeException(e);
134134
}
@@ -138,7 +138,7 @@ public boolean addListener(KvinListener listener) {
138138
public boolean removeListener(KvinListener listener) {
139139
try {
140140
listeners.remove(listener);
141-
return true;
141+
return hotStore.removeListener(listener);
142142
} catch (Exception e) {
143143
throw new RuntimeException(e);
144144
}
@@ -164,6 +164,10 @@ public void createNewHotDataStore() throws IOException {
164164
FileUtils.deleteDirectory(this.currentStoreArchivePath);
165165
FileUtils.moveDirectory(this.currentStorePath, this.currentStoreArchivePath);
166166
hotStore = new KvinLevelDb(currentStorePath);
167+
for (KvinListener listener : listeners) {
168+
// register listeners on new hot store
169+
hotStore.addListener(listener);
170+
}
167171
hotStoreArchive = new KvinLevelDb(this.currentStoreArchivePath);
168172
}
169173

‎bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDb.scala

+8-1
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,11 @@ class KvinLevelDb(path: File) extends KvinLevelDbBase with Kvin {
593593
}
594594

595595
override def put(entries: java.lang.Iterable[KvinTuple]): Unit = {
596+
var notifyTuples = Option.empty[mutable.ArrayBuffer[KvinTuple]]
597+
if (! this.listeners.isEmpty && entries.isInstanceOf[IExtendedIterator[_]]) {
598+
notifyTuples = Some(new mutable.ArrayBuffer[KvinTuple]())
599+
}
600+
596601
val idsBatch = ids.createWriteBatch()
597602
val batch = values.createWriteBatch()
598603
activeWrites.incrementAndGet()
@@ -614,6 +619,8 @@ class KvinLevelDb(path: File) extends KvinLevelDbBase with Kvin {
614619
// remove timed-out entries
615620
ttl(entry.item) map (asyncRemoveByTtl(values, prefix, _))
616621
}
622+
// buffer tuples if entries are given via iterator
623+
notifyTuples.foreach(_.addOne(entry))
617624
}
618625
var writeIds: Future[_] = null
619626
if (idsBatch.size() > 0) {
@@ -632,7 +639,7 @@ class KvinLevelDb(path: File) extends KvinLevelDbBase with Kvin {
632639
uriToIdCacheWrite.invalidateAll()
633640
}
634641
}
635-
entries.asScala.foreach { entry =>
642+
notifyTuples.getOrElse(entries.asScala).foreach { entry =>
636643
for (l <- listeners.asScala) l.valueAdded(entry.item, entry.property, entry.context, entry.time, entry.seqNr, entry.value)
637644
}
638645
}

0 commit comments

Comments
 (0)
Please sign in to comment.