-
Hello! I am a little bit lost regarding the consumer kafka topics part: I understand that you defined the topics to be consumed in the config file but, which php class is going to handle it? Let's say I have a topic 'users' with different messages ('user-created', 'user-deleted'...). How do I set 'this topic is going to be handled by this php-class'? I know how to work with Spiral jobs (based on names) but consuming external messages is a mystery to me (currently, I'm dispatching the messages from a python service). Thank you! :) |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 18 replies
-
Hey @konigbach 👋 |
Beta Was this translation helpful? Give feedback.
-
@konigbach Hi. Do you pass the job name in your service that pushes the job to the queue? The name does not necessarily have to be the class name, you can use any string. And you can register a standalone handler for each topic. For example: // app/config/queue.php
return [
'registry' => [
'handlers' => [
'users' => App\Endpoint\Job\UsersJob::class,
// ...
],
],
]; And push jobs specifying this name (instead of the class name). This is an example in PHP, but your Python service should have something similar. $queue->push('users', ['name' => 'value']); |
Beta Was this translation helpful? Give feedback.
-
I am having problems just getting the basic config set up. Should I create a new discussion topic?
Error "Did not find expected key by parsing a block mapping" NB The example has a key called topic at the jobs.pipelines.queue-name.config level which is confusing # Kafka jobs driver
#
# This option is required to use Kafka driver. Addrs can contain any number of addresses separated by comma (127.0.0.1:9092,127.0.0.1:9093,...)
kafka:
addrs: 127.0.0.1:9092
jobs:
num_pollers: 10
pipeline_size: 100000
pool:
num_workers: 10
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
pipelines:
test-local-6:
# Driver name
#
# This option is required
driver: kafka
# Driver's configuration
#
# Should not be empty
config:
# Pipeline priority
#
# If the job has priority set to 0, it will inherit the pipeline's priority. Default: 10.
priority: 1
# Topic name: https://kafka.apache.org/intro#intro_concepts_and_terms
#
# This option is required and should not be empty.
topic: test-1 So we have to specify a topic here (single topic, which one??) How does this work? |
Beta Was this translation helpful? Give feedback.
@konigbach Hi. Do you pass the job name in your service that pushes the job to the queue? The name does not necessarily have to be the class name, you can use any string.
And you can register a standalone handler for each topic. For example:
And push jobs specifying this name (instead of the class name). This is an example in PHP, but your Python service should have something similar.