Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka escapeHeaders documentation #4332

Merged
merged 6 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ spec:
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m
- name: escapeHeaders # Optional.
value: false
```

## Spec metadata fields
Expand Down Expand Up @@ -99,6 +101,7 @@ spec:
| `consumerFetchDefault` | N | Input/Output | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` |
| `heartbeatInterval` | N | Input | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to `"3s"`. | `"5s"` |
| `sessionTimeout` | N | Input | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to `"10s"`. | `"20s"` |
| `escapeHeaders` | N | Input | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` |

#### Note
The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ spec:
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m
- name: escapeHeaders # Optional.
value: false

```

Expand Down Expand Up @@ -112,6 +114,7 @@ spec:
| consumerFetchDefault | N | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` |
| heartbeatInterval | N | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to "3s". | `"5s"` |
| sessionTimeout | N | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to "10s". | `"20s"` |
| escapeHeaders | N | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` |

The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component.

Expand Down Expand Up @@ -485,6 +488,39 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
}'
```

## Receiving message headers with special characters

The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.
HTTP header values must follow specifications, making some characters not allowed. [Learn more about the protocols](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2).
In this case, you can enable `escapeHeaders` configuration setting, which uses URL escaping to encode header values on the consumer side.

{{% alert title="Note" color="primary" %}}
When using this setting, the received message headers are URL escaped, and you need to URL "un-escape" it to get the original value.
{{% /alert %}}

Set `escapeHeaders` to `true` to URL escape.

```yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-escape-headers
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "none"
- name: escapeHeaders
value: "true"
```

## Avro Schema Registry serialization/deserialization
You can configure pub/sub to publish or consume data encoded using [Avro binary serialization](https://avro.apache.org/docs/), leveraging an [Apache Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/) (for example, [Confluent Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/), [Apicurio](https://www.apicur.io/registry/)).

Expand Down Expand Up @@ -597,6 +633,7 @@ To run Kafka on Kubernetes, you can use any Kafka operator, such as [Strimzi](ht

{{< /tabs >}}


## Related links
- [Basic schema for a Dapr component]({{< ref component-schema >}})
- Read [this guide]({{< ref "howto-publish-subscribe.md##step-1-setup-the-pubsub-component" >}}) for instructions on configuring pub/sub components
Expand Down
Loading