Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async KeyValue Interface #1245

Open
davidmcote opened this issue Nov 1, 2024 · 0 comments
Open

Async KeyValue Interface #1245

davidmcote opened this issue Nov 1, 2024 · 0 comments
Labels
proposal Enhancement idea or proposal

Comments

@davidmcote
Copy link
Contributor

davidmcote commented Nov 1, 2024

Proposed change

All the CRUDL operations in the KeyValue interface block their calling thread while waiting for a response from NATS JetStream, which may be multiple network hops away depending on the nats-server topology.

It'd be nice if there were async counterparts which returned their result in the form of a CompleteableFuture, similar to the asynchronous forms of JetStream.publishAsync() and Connection.requestWithTimeout(). And for list methods, a way to be notified as results arrive without needing them all in memory at the same time in a large collection.

TL;DR:

  1. Introduce an async counterpart to singleton methods which return their result as a CompletableFuture.
  2. Update the existing consumeKeys() methods so they return their BlockingQueue before it is fully populated.
  3. Introduce a consumeHistory() method for parity.
  4. Overload the consumeKeys() methods with an asynchronous form that notifies a callback as responses arrive.

Details

For singleton methods, the KeyValue interface could be updated in a fairly straightforward manner:

public interface KeyValue {
    // Existing synchronous accessors
    KeyValueEntry get(String key) throws IOException, JetStreamApiException;
    KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException;
    long put(String key, byte[] value) throws IOException, JetStreamApiException;
    long put(String key, String value) throws IOException, JetStreamApiException;
    long put(String key, Number value) throws IOException, JetStreamApiException;
    long create(String key, byte[] value) throws IOException, JetStreamApiException;
    long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException;
    long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException;
    void delete(String key) throws IOException, JetStreamApiException;
    void delete(String key, long expectedRevision) throws IOException, JetStreamApiException;
    void purge(String key) throws IOException, JetStreamApiException;
    void purge(String key, long expectedRevision) throws IOException, JetStreamApiException;

    // Proposed asynchronous counterparts
    CompletableFuture<KeyValueEntry> getAsync(String key);
    CompletableFuture<KeyValueEntry getAsync(String key, long revision);
    CompletableFuture<Long> putAsync(String key, byte[] value);
    CompletableFuture<Long> putAsync(String key, String value);
    CompletableFuture<Long> putAsync(String key, Number value);
    CompletableFuture<Long> createAsync(String key, byte[] value) ;
    CompletableFuture<Long> updateAsync(String key, byte[] value, long expectedRevision);
    CompletableFuture<Long> updateAsync(String key, String value, long expectedRevision);
    CompletableFuture<Void> deleteAsync(String key);
    CompletableFuture<Void> deleteAsync(String key, long expectedRevision);
    CompletableFuture<Void> purgeAsync(String key);
    CompletableFuture<Void> purgeAsync(String key, long expectedRevision);
}

There's also a number of synchronous listing methods in the KeyValue interface.

List<String> keys() throws IOException, JetStreamApiException, InterruptedException;
List<String> keys(String filter) throws IOException, JetStreamApiException, InterruptedException;
List<String> keys(List<String> filters) throws IOException, JetStreamApiException, InterruptedException;
List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException;

There are a few listing methods which return a BlockingQueue, suggesting they have decoupled push-pull semantics.

LinkedBlockingQueue<KeyResult> consumeKeys();
LinkedBlockingQueue<KeyResult> consumeKeys(String filter);
LinkedBlockingQueue<KeyResult> consumeKeys(List<String> filters);
// consumeHistory() not part of interface.

But actually these are still synchronous. Visual inspection of the NatsKeyValue impl looks like the whole BlockingQueue is fully populated synchronously before returning it.

These methods could be relaxed to returning the BlockingQueue immediately
and pushing entries in the background as they arrive from the internal JetStream subscription.

Unfortunately, the application would still need spend it's own thread to pull items from it.

Proposed Asynchronous Listing API

With the following overloads, the application would pass in a callback which receives the same "KeyResult" object used in the BlockingQueue forms.

void consumeKeys(Consumer<KeyResult> callback);
void consumeKeys(String filter, Consumer<KeyResult> callback);
void consumeKeys(List<String> filters, Consumer<KeyResult> callback);
void consumeHistory(String key, Consumer<KeyResult> callback);

The callback is invoked by the onMessage handler of the internal JetStream subscription as it receives responses from the JetStream server. At no point is the entire KeyValue keyset held in memory. If the application-provided callback throws, the listing is aborted.

This proposed API would also be easily adapted to RxJava's Observable, should an application be so inclined.

At present, the alternatives to this proposal are:

  1. Accept that calls to KeyValue will block the calling thread.
  2. Schedule synchronous calls to a separate threadpool.
  3. Accept that list calls may construct a very large data-structure in memory before returning it.
  4. Manually implement KeyValue interactions by sidestepping the KeyValue abstraction and using the lower level JetStream interface directly.

Use case

Reduced thread and memory utilization from clients using the KeyValue interface.

Contribution

Things are nuts for me until EOY, but if there is interest for this proposal or parts of it, I could probably break this work into a few increments and submit patches 2025Q1. I am particularly keen on having at least the asynchronous singleton methods sooner than later and they seem the least controversial of this proposal.

@davidmcote davidmcote added the proposal Enhancement idea or proposal label Nov 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Enhancement idea or proposal
Projects
None yet
Development

No branches or pull requests

1 participant