Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Jan 23, 2024
1 parent ca66822 commit 16faac3
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions docs/website/docs/dlt-ecosystem/verified-sources/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ For more information, read the [Walkthrough: Run a pipeline](../../walkthroughs/
`dlt` works on the principle of [sources](../../general-usage/source) and
[resources](../../general-usage/resource).

### Source `kafka_source`
### Source `kafka_consumer`

This function retrieves messages from the given Kafka topics.

Expand All @@ -127,8 +127,9 @@ the `secrets.toml`. May be used explicitly to pass an initialized
Kafka Consumer object.

`msg_processor`: A function, which'll be used to process every message
read from the given topics. Can be used explicitly to pass a custom
processor. See the [default processor](https://github.com/dlt-hub/verified-sources/blob/fe8ed7abd965d9a0ca76d100551e7b64a0b95744/sources/kafka/helpers.py#L14-L50)
read from the given topics before saving them in the destination.
Can be used explicitly to pass a custom processor. See the
[default processor](https://github.com/dlt-hub/verified-sources/blob/fe8ed7abd965d9a0ca76d100551e7b64a0b95744/sources/kafka/helpers.py#L14-L50)
as an example of how to implement processors.

`batch_size`: The amount of messages to extract from the cluster
Expand Down Expand Up @@ -164,7 +165,7 @@ this offset.
topics = ["topic1", "topic2", "topic3"]

source = kafka_consumer(topics)
load_info = pipeline.run(source, write_disposition="replace")
pipeline.run(source, write_disposition="replace")
```

1. To extract messages and process them in a custom way:
Expand All @@ -181,12 +182,12 @@ this offset.
}

data = kafka_consumer("topic", msg_processor=custom_msg_processor)
load_info = pipeline.run(data)
pipeline.run(data)
```

1. To extract messages, starting from a timestamp:

```python
data = kafka_consumer("topic", start_from=pendulum.datetime(2023, 12, 15))
load_info = pipeline.run(data)
pipeline.run(data)
```

0 comments on commit 16faac3

Please sign in to comment.