Skip to content

Commit

Permalink
feat: set kafka keys if setKey is set (#2146)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Oct 13, 2024
1 parent dd08bca commit bc12925
Show file tree
Hide file tree
Showing 14 changed files with 658 additions and 519 deletions.
4 changes: 4 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -20868,6 +20868,10 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SASL",
"description": "SASL user to configure SASL connection for kafka broker SASL.enable=true default for SASL."
},
"setKey": {
"description": "SetKey sets the Kafka key to the keys passed in the Message. When the key is null (default), the record is sent randomly to one of the available partitions of the topic. If a key exists, Kafka hashes the key, and the result is used to map the message to a specific partition. This ensures that messages with the same key end up in the same partition.",
"type": "boolean"
},
"tls": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.TLS",
"description": "TLS user to configure TLS connection for kafka broker TLS.enable=true default for TLS."
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -20867,6 +20867,10 @@
"description": "SASL user to configure SASL connection for kafka broker SASL.enable=true default for SASL.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SASL"
},
"setKey": {
"description": "SetKey sets the Kafka key to the keys passed in the Message. When the key is null (default), the record is sent randomly to one of the available partitions of the topic. If a key exists, Kafka hashes the key, and the result is used to map the message to a specific partition. This ensures that messages with the same key end up in the same partition.",
"type": "boolean"
},
"tls": {
"description": "TLS user to configure TLS connection for kafka broker TLS.enable=true default for TLS.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.TLS"
Expand Down
4 changes: 4 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_monovertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3579,6 +3579,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -4112,6 +4114,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down
4 changes: 4 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8259,6 +8259,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -8792,6 +8794,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down
4 changes: 4 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3047,6 +3047,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -3580,6 +3582,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down
12 changes: 12 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6765,6 +6765,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -7298,6 +7300,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -17990,6 +17994,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -18523,6 +18529,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -24394,6 +24402,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -24927,6 +24937,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down
12 changes: 12 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6765,6 +6765,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -7298,6 +7300,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -17990,6 +17994,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -18523,6 +18529,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -24394,6 +24402,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down Expand Up @@ -24927,6 +24937,8 @@ spec:
required:
- mechanism
type: object
setKey:
type: boolean
tls:
properties:
caCertSecret:
Expand Down
24 changes: 24 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5227,6 +5227,30 @@ Description

<td>

<code>setKey</code></br> <em> bool </em>
</td>

<td>

<em>(Optional)</em>
<p>

SetKey sets the Kafka key to the keys passed in the Message. When the
key is null (default), the record is sent randomly to one of the
available partitions of the topic. If a key exists, Kafka hashes the
key, and the result is used to map the message to a specific partition.
This ensures that messages with the same key end up in the same
partition.
</p>

</td>

</tr>

<tr>

<td>

<code>tls</code></br> <em> <a href="#numaflow.numaproj.io/v1alpha1.TLS">
TLS </a> </em>
</td>
Expand Down
Loading

0 comments on commit bc12925

Please sign in to comment.