Skip to content

Commit

Permalink
Merge pull request #304 from jmartisk/redis-batching
Browse files Browse the repository at this point in the history
Use batching in Redis to avoid overflowing the connection pool
  • Loading branch information
geoand authored Feb 15, 2024
2 parents 62374e9 + 44e8a8c commit bc722c8
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -25,7 +26,6 @@
import io.quarkiverse.langchain4j.QuarkusJsonCodecFactory;
import io.quarkiverse.langchain4j.redis.runtime.RedisSchema;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.json.ReactiveJsonCommands;
import io.quarkus.redis.datasource.keys.KeyScanArgs;
import io.quarkus.redis.datasource.search.CreateArgs;
import io.quarkus.redis.datasource.search.Document;
Expand Down Expand Up @@ -125,9 +125,8 @@ private void addAllInternal(List<String> ids, List<Embedding> embeddings, List<T
if (ids.isEmpty() || ids.size() != embeddings.size() || (embedded != null && embedded.size() != embeddings.size())) {
throw new IllegalArgumentException("ids, embeddings and embedded must be non-empty and of the same size");
}
ReactiveJsonCommands<String> json = ds.json();
int size = ids.size();
Uni[] unis = new Uni[size];
List<Request> commands = new ArrayList<>();
for (int i = 0; i < size; i++) {
String id = ids.get(i);
Embedding embedding = embeddings.get(i);
Expand All @@ -147,9 +146,9 @@ private void addAllInternal(List<String> ids, List<Embedding> embeddings, List<T
fields.putAll(textSegment.metadata().asMap());
}
String key = schema.getPrefix() + id;
unis[i] = json.jsonSet(key, "$", fields);
commands.add(Request.cmd(Command.JSON_SET).arg(key).arg("$").arg(Json.toJson(fields)));
}
Uni.join().all(unis).andFailFast().await().indefinitely();
ds.getRedis().batchAndAwait(commands);
}

@Override
Expand Down

0 comments on commit bc722c8

Please sign in to comment.