diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/kafka.md b/docs/website/docs/dlt-ecosystem/verified-sources/kafka.md index fda4c94fdc..1d09907d0a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/kafka.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/kafka.md @@ -104,7 +104,7 @@ For more information, read the [Walkthrough: Run a pipeline](../../walkthroughs/ `dlt` works on the principle of [sources](../../general-usage/source) and [resources](../../general-usage/resource). -### Source `kafka_source` +### Source `kafka_consumer` This function retrieves messages from the given Kafka topics. @@ -127,8 +127,9 @@ the `secrets.toml`. May be used explicitly to pass an initialized Kafka Consumer object. `msg_processor`: A function, which'll be used to process every message -read from the given topics. Can be used explicitly to pass a custom -processor. See the [default processor](https://github.com/dlt-hub/verified-sources/blob/fe8ed7abd965d9a0ca76d100551e7b64a0b95744/sources/kafka/helpers.py#L14-L50) +read from the given topics before saving them in the destination. +Can be used explicitly to pass a custom processor. See the +[default processor](https://github.com/dlt-hub/verified-sources/blob/fe8ed7abd965d9a0ca76d100551e7b64a0b95744/sources/kafka/helpers.py#L14-L50) as an example of how to implement processors. `batch_size`: The amount of messages to extract from the cluster @@ -164,7 +165,7 @@ this offset. topics = ["topic1", "topic2", "topic3"] source = kafka_consumer(topics) - load_info = pipeline.run(source, write_disposition="replace") + pipeline.run(source, write_disposition="replace") ``` 1. To extract messages and process them in a custom way: @@ -181,12 +182,12 @@ this offset. } data = kafka_consumer("topic", msg_processor=custom_msg_processor) - load_info = pipeline.run(data) + pipeline.run(data) ``` 1. To extract messages, starting from a timestamp: ```python data = kafka_consumer("topic", start_from=pendulum.datetime(2023, 12, 15)) - load_info = pipeline.run(data) + pipeline.run(data) ```