diff --git a/pip/pip-312.md b/pip/pip-312.md new file mode 100644 index 0000000000000..b4b12e6cc5c79 --- /dev/null +++ b/pip/pip-312.md @@ -0,0 +1,150 @@ +# PIP-312: Use StateStoreProvider to manage state in Pulsar Functions endpoints + +# Background knowledge + +States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. +Keys are scoped to an individual function and shared between instances of that function. + +Pulsar Functions use `StateStoreProvider` to initialize a `StateStore` to manage state, so it can support multiple state storage backend, such as: +- `BKStateStoreProviderImpl`: use Apache BookKeeper as the backend +- `PulsarMetadataStateStoreProviderImpl`: use Pulsar Metadata as the backend + +Users can also implement their own `StateStoreProvider` to support other state storage backend. + +The Broker also exposes two endpoints to put and query a state key of a function: +- GET /{tenant}/{namespace}/{functionName}/state/{key} +- POST /{tenant}/{namespace}/{functionName}/state/{key} + +Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper's `StorageAdminClient` directly to put and query state, +this makes the Pulsar Functions' state store highly coupled with Apache BookKeeper. + +See: [code](https://github.com/apache/pulsar/blob/1a66b640c3cd86bfca75dc9ab37bfdb37427a13f/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java#L1152-L1297) + +# Motivation + +This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend. + +# Goals + +## In Scope + +- Pulsar Functions can use other state storage backend other than Apache BookKeeper. + +## Out of Scope + +None + +# High Level Design + +- Replace the `StorageAdminClient` in `ComponentImpl` with `StateStoreProvider` to manage state. +- Add a `cleanup` method to the `StateStoreProvider` interface + +# Detailed Design + +## Design & Implementation Details + +1. In the `ComponentImpl#getFunctionState` and `ComponentImpl#queryState` methods, replace the `StorageAdminClient` with `StateStoreProvider`: + + ```java + String tableNs = getStateNamespace(tenant, namespace); + String tableName = functionName; + + String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl(); + + if (storageClient.get() == null) { + storageClient.compareAndSet(null, StorageClientBuilder.newBuilder() + .withSettings(StorageClientSettings.newBuilder() + .serviceUri(stateStorageServiceUrl) + .clientName("functions-admin") + .build()) + .withNamespace(tableNs) + .build()); + } + ... + ``` + + Replaced to: + + ```java + DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name); + ``` + +2. Add a `cleanup` method to the `StateStoreProvider` interface: + + ```java + default void cleanUp(String tenant, String namespace, String name) throws Exception; + ``` + + Because when delete a function, the related state store should also be deleted. + Currently, it's also using BookKeeper's `StorageAdminClient` to delete the state store table: + + ```java + deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName); + + + private void deleteStatestoreTableAsync(String namespace, String table) { + StorageAdminClient adminClient = worker().getStateStoreAdminClient(); + if (adminClient != null) { + adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> { + if ((throwable == null && res) + || ((throwable instanceof NamespaceNotFoundException + || throwable instanceof StreamNotFoundException))) { + log.info("{}/{} table deleted successfully", namespace, table); + } else { + if (throwable != null) { + log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable); + } else { + log.error("{}/{} table deletion failed but moving on", namespace, table); + } + } + }); + } + } + ``` + + So this proposal will add a `cleanup` method to the `StateStoreProvider` and call it after a function is deleted: + + ```java + worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName); + ``` + +3. Add a new `init` method to `StateStoreProvider` interface: + + The current `init` method requires a `FunctionDetails` parameter, but we cannot get the `FunctionDetails` in the `ComponentImpl` class, + and this parameter is not used either in `BKStateStoreProviderImpl` or in `PulsarMetadataStateStoreProviderImpl`, + but for backward compatibility, instead of updating the `init` method, this proposal will add a new `init` method without `FunctionDetails` parameter: + + ```java + default void init(Map config) throws Exception {} + ``` + +## Public-facing Changes + +None + +# Monitoring + +# Security Considerations + +# Backward & Forward Compatibility + +## Revert + +- Nothing needs to be done if users use the Apache BookKeeper as the state storage backend. +- If users use another state storage backend, they need to change it back to BookKeeper. + +## Upgrade + +Nothing needs to be done. + +# Alternatives + +# General Notes + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/0rz29wotonmdck76pdscwbqo19t3rbds +* Mailing List voting thread: https://lists.apache.org/thread/t8vmyxovrrb5xl8jvrp1om50l6nprdjt