Skip to content

Commit

Permalink
[improve][pip] PIP-312: Use StateStoreProvider to manage state in Pul…
Browse files Browse the repository at this point in the history
…sar Functions endpoints (apache#21438)
  • Loading branch information
jiangpengcheng authored Nov 20, 2023
1 parent 403faa4 commit cbdb19c
Showing 1 changed file with 150 additions and 0 deletions.
150 changes: 150 additions & 0 deletions pip/pip-312.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# PIP-312: Use StateStoreProvider to manage state in Pulsar Functions endpoints

# Background knowledge

States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values.
Keys are scoped to an individual function and shared between instances of that function.

Pulsar Functions use `StateStoreProvider` to initialize a `StateStore` to manage state, so it can support multiple state storage backend, such as:
- `BKStateStoreProviderImpl`: use Apache BookKeeper as the backend
- `PulsarMetadataStateStoreProviderImpl`: use Pulsar Metadata as the backend

Users can also implement their own `StateStoreProvider` to support other state storage backend.

The Broker also exposes two endpoints to put and query a state key of a function:
- GET /{tenant}/{namespace}/{functionName}/state/{key}
- POST /{tenant}/{namespace}/{functionName}/state/{key}

Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper's `StorageAdminClient` directly to put and query state,
this makes the Pulsar Functions' state store highly coupled with Apache BookKeeper.

See: [code](https://github.com/apache/pulsar/blob/1a66b640c3cd86bfca75dc9ab37bfdb37427a13f/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java#L1152-L1297)

# Motivation

This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend.

# Goals

## In Scope

- Pulsar Functions can use other state storage backend other than Apache BookKeeper.

## Out of Scope

None

# High Level Design

- Replace the `StorageAdminClient` in `ComponentImpl` with `StateStoreProvider` to manage state.
- Add a `cleanup` method to the `StateStoreProvider` interface

# Detailed Design

## Design & Implementation Details

1. In the `ComponentImpl#getFunctionState` and `ComponentImpl#queryState` methods, replace the `StorageAdminClient` with `StateStoreProvider`:

```java
String tableNs = getStateNamespace(tenant, namespace);
String tableName = functionName;

String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();

if (storageClient.get() == null) {
storageClient.compareAndSet(null, StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.serviceUri(stateStorageServiceUrl)
.clientName("functions-admin")
.build())
.withNamespace(tableNs)
.build());
}
...
```

Replaced to:

```java
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name);
```

2. Add a `cleanup` method to the `StateStoreProvider` interface:

```java
default void cleanUp(String tenant, String namespace, String name) throws Exception;
```

Because when delete a function, the related state store should also be deleted.
Currently, it's also using BookKeeper's `StorageAdminClient` to delete the state store table:

```java
deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);


private void deleteStatestoreTableAsync(String namespace, String table) {
StorageAdminClient adminClient = worker().getStateStoreAdminClient();
if (adminClient != null) {
adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> {
if ((throwable == null && res)
|| ((throwable instanceof NamespaceNotFoundException
|| throwable instanceof StreamNotFoundException))) {
log.info("{}/{} table deleted successfully", namespace, table);
} else {
if (throwable != null) {
log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable);
} else {
log.error("{}/{} table deletion failed but moving on", namespace, table);
}
}
});
}
}
```

So this proposal will add a `cleanup` method to the `StateStoreProvider` and call it after a function is deleted:

```java
worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName);
```

3. Add a new `init` method to `StateStoreProvider` interface:

The current `init` method requires a `FunctionDetails` parameter, but we cannot get the `FunctionDetails` in the `ComponentImpl` class,
and this parameter is not used either in `BKStateStoreProviderImpl` or in `PulsarMetadataStateStoreProviderImpl`,
but for backward compatibility, instead of updating the `init` method, this proposal will add a new `init` method without `FunctionDetails` parameter:

```java
default void init(Map<String, Object> config) throws Exception {}
```

## Public-facing Changes

None

# Monitoring

# Security Considerations

# Backward & Forward Compatibility

## Revert

- Nothing needs to be done if users use the Apache BookKeeper as the state storage backend.
- If users use another state storage backend, they need to change it back to BookKeeper.

## Upgrade

Nothing needs to be done.

# Alternatives

# General Notes

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/0rz29wotonmdck76pdscwbqo19t3rbds
* Mailing List voting thread: https://lists.apache.org/thread/t8vmyxovrrb5xl8jvrp1om50l6nprdjt

0 comments on commit cbdb19c

Please sign in to comment.