Better processor ideas #1141
Replies: 5 comments 5 replies
-
Any thoughts on how to make the ergonomics of language processors like JavaScript better. Right now having to embed the code within the configuration feels less than ideal. |
Beta Was this translation helpful? Give feedback.
-
Thanks for the list Lovro! It looks really great. 👍 I'd suggest thinking about two more:
|
Beta Was this translation helpful? Give feedback.
-
Tagging this request from @lyuboxa #1172 as relevant to Better Processors work. |
Beta Was this translation helpful? Give feedback.
-
For wasm I use Wazero and Nats kv. Wazero is the runner. Nats KV stores the wasm and nats pub sub system ensures the wasm binaries are where they need to be across any cluster of nodes. Nats looks to repo releases to get the wasm , so it’s sort doing CDC off any GitHub release. A global registry of what to look for can be in git or s3. —- Also the core foundational storage of conduit uses badgerdb or postresql or in memory. i started working on a NATS jetstream based Storage System because the nature of the data is KV and Again NATS KV System gives you streaming KV and easy clustering of that storage. As well as fault tolerance with no need for any load balancers etc etc. i really like the things being brought up in this discussion. If I can help I would try to free up sone time .. |
Beta Was this translation helpful? Give feedback.
-
Shipped as part of |
Beta Was this translation helpful? Give feedback.
-
This discussion can serve as a starting point to explore and ultimately choose the specific improvements we want to apply to processors.
Current state
Let's be honest, Conduit processors are currently pretty basic. We would like to make them more powerful to give users more freedom when manipulating in-flight data.
Initially, processors were inspired by single message transforms (SMTs) provided by Kafka Connect. We tried to mimic the behavior as well as processor names to bring them closer to new Conduit users that had prior experience with Kafka Connect. A couple of issues arose because of this. The main issue was that Conduit is working with a single OpenCDC record that combines the key and payload, whereas Kafka Connect has separate payloads for both. The OpenCDC record additionally provides fields like metadata, position, operation, and two fields for the payload (before and after). Conduit also differentiates between raw data and structured data. All of this meant we tried to jam a square peg into a round hole and ended up with sub-par processors.
Potential improvements
Ability to process any field
In Conduit a single record is always represented as an OpenCDC record, meaning that it has the following structure:
Currently built-in processors can only manipulate fields
Record.Key
andRecord.Payload.After
. We should change processors so they can be configured to work on any of these fields, maybe even multiple fields at the same time (e.g. copy payload data into metadata).Better type names
Processor types are currently very hard to read. They are all in lowercase letters without spaces and normally end with "key" or "payload" depending on which field they manipulate (e.g.
timestampconverterpayload
).We should make processor types easier to read and group together processors that do the same thing but work on different fields. The field could be chosen by a config option (see ability to process any field).
Logging
Processors do not have access to a logger, so we had to resort to workarounds in processors that need it (example). When building a processor we should give it access to a logger.
Discovering processor types
The UI has poor support for processors because they need to be hard coded. Currently, there is no way to retrieve the available processor types and their specifications.
We should expose an endpoint that returns all processor types and their specifications, similar to how we already expose the list of plugins and their specifications.
For example something like this:
OpenCDC unwrap processor
The
unwrap
processor currently only supports formatsdebezium
andkafka-connect
. We should also provide support to unwrap an OpenCDC record. This will be particularly useful when reprocessing records coming from the DLQ, since those records contain an OpenCDC record in the payload.Processor lifecycle
A processor is currently just a simple function that gets called for every record. There is no specific method called when initializing or closing a processor. To support more complex processors that need to open/close resources at the start/end we should introduce more methods. This is a prerequisite if we want to have pluggable processors that need to be cleaned up when the pipeline stops running.
Permanent storage
A processor can only store data in memory while it's running, it has no access to permanent storage. If we provided a way to store data permanently, processors would get the ability to do stateful processing (e.g. aggregations) across pipeline restarts.
Splitting and combining records
Current processors are simple functions that take a record and return a record. They can modify the record as they see fit or drop it entirely, but they can't split it into multiple records or combine multiple records into one record.
Giving processors the ability to split records would allow us to denormalize data while combining records would allow us to aggregate it. For this to work across restarts we would also need to give processors access to permanent storage.
Note that we will need to consider how we would give the processor access to multiple records at once. If the processor relies on being called by the node (as it does now), then it can't combine records based on a time window, as there will not necessarily be a record at the point the time window gets closed.
Pluggable processors
Pluggable processors or bring-your-own-processor (BYOP) is the ability to provide a custom processor implementation, similarly to how we currently allow the user to provide standalone connector implementations. We already provide a similar feature using the JavaScript processor, although it has many drawbacks like poor performance and the fact that users are forced to write JavaScript. We should think about alternatives that would improve the experience of writing your own processor like go-plugin or WebAssembly.
Beta Was this translation helpful? Give feedback.
All reactions