Replies: 9 comments 14 replies
-
Hey @suityou01 👋 You may find the correct options here: https://roadrunner.dev/docs/queues-kafka/current/en |
Beta Was this translation helpful? Give feedback.
-
This makes sense. I have the worker package installed and yet I get this error
{
"require": {
"spiral/roadrunner-cli": "^2.6",
"spiral/roadrunner-http": "^3.3",
"nyholm/psr7": "^1.8",
"spiral/roadrunner-worker": "^3.3",
"spiral/roadrunner-jobs": "^4.3",
"spiral/roadrunner": "^2023.3"
}
} <?php
use Spiral\RoadRunner\Environment;
use Spiral\RoadRunner\Environment\Mode;
// 1. Using global env variable
$isJobsMode = $_SERVER['RR_MODE'] === 'jobs';
// 2. Using RoadRunner's API
$env = Environment::fromGlobals();
$isJobsMode = $env->getMode() === Mode::MODE_JOBS;
$consumer = new Consumer();
/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
var_dump($task);
} Any ideas? |
Beta Was this translation helpful? Give feedback.
-
Yep problem exists between chair and monitor. :-) working code looks like <?php
use Spiral\RoadRunner\Environment;
use Spiral\RoadRunner\Environment\Mode;
use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface;
include "vendor/autoload.php";
// 1. Using global env variable
$isJobsMode = $_SERVER['RR_MODE'] === 'jobs';
// 2. Using RoadRunner's API
$env = Environment::fromGlobals();
$isJobsMode = $env->getMode() === Mode::MODE_JOBS;
$consumer = new Consumer();
/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
var_dump($task);
} |
Beta Was this translation helpful? Give feedback.
-
Not getting any output when the messages are flowing through kafka though... version: '3'
rpc:
listen: tcp://127.0.0.1:6001
server:
command: "php consumer.php"
#http:
# address: "0.0.0.0:8080"
kafka:
brokers: ["broker:9092"]
jobs:
num_pollers: 1
pipeline_size: 100
pool:
num_workers: 1
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
pipelines:
kafka:
driver: kafka
config:
priority: 1
brokers: [ broker:9092 ]
consumer_options:
topics: [ "feeds", "feeds.feeds.unit", "^[a-zA-Z0-9._-]+$" ]
consume_regexp: true
logs:
level: error |
Beta Was this translation helpful? Give feedback.
-
So here is what I get in my container log 2023-12-21T12:38:20+0000 DEBUG rpc plugin was started {"address": "tcp://127.0.0.1:6001", "list of the plugins with RPC methods:": ["lock", "resetter", "informer", "jobs", "app"]}
2023-12-21T12:38:20+0000 DEBUG jobs initializing driver {"pipeline": "kafka", "driver": "kafka"}
2023-12-21T12:38:20+0000 DEBUG kafka ping kafka: ok {"driver": "kafka", "pipeline": "kafka"}
2023-12-21T12:38:20+0000 DEBUG jobs driver ready {"pipeline": "kafka", "driver": "kafka", "start": "2023-12-21T12:38:20+0000", "elapsed": "3.843019ms"}
2023-12-21T12:38:20+0000 DEBUG server worker is allocated {"pid": 16, "internal_event_name": "EventWorkerConstruct"}
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T12:38:20+0000 DEBUG jobs exited from jobs pipeline processor
[INFO] RoadRunner server started; version: 2023.3.8, buildtime: 2023-12-14T16:05:26+0000
[INFO] sdnotify: not notified
Loop now looks like /** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
var_dump($task);
$task->complete();
} Config now looks like version: '3'
rpc:
listen: tcp://127.0.0.1:6001
server:
command: "php consumer.php"
#http:
# address: "0.0.0.0:8080"
kafka:
brokers: ["broker:9092"]
jobs:
num_pollers: 1
pipeline_size: 100
pool:
num_workers: 1
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
pipelines:
kafka:
driver: kafka
config:
priority: 1
consumer_options:
topics: [ "feeds", "feeds.feeds.unit", "^[a-zA-Z0-9._-]+$" ]
consume_regexp: true
logs:
level: debug Maybe something is coming through but I cannot see the output from var_dump if it is. |
Beta Was this translation helpful? Give feedback.
-
I think those debug messages are on startup. So no messages are coming through it seems. |
Beta Was this translation helpful? Give feedback.
-
Kafka does not report it as a consumer so it looks like the handshake is failing. How do I get more output from this black box? |
Beta Was this translation helpful? Give feedback.
-
OK here is the latest. It is not collecting any messages from kafka. version: '3'
rpc:
listen: tcp://127.0.0.1:6001
server:
command: "php consumer.php"
#http:
# address: "0.0.0.0:8080"
kafka:
brokers: ["broker:9092"]
jobs:
num_pollers: 1
pipeline_size: 100
pool:
num_workers: 1
max_jobs: 0
allocate_timeout: 60s
destroy_timeout: 60s
pipelines:
kafka:
driver: kafka
config:
priority: 1
auto_create_topics_enable: true
consumer_options:
topics: [ "feeds", "feeds.feeds.unit", "^[a-zA-Z0-9._-]+$" ]
consume_regexp: true
group_options:
group_id: connect-debezium
consume:
[
"kafka"
]
logs:
level: debug
output: stdout
I have a sink connector which sinks the messages with elasticsearch so I know that kafka is working. I can see the messages through the kafka-ui Nothing is reaching the kafka driver in roadrunner. I literally have no more clue what else to do to get this to work? I have followed all your instructions I think very well. I really want to use this product in my project but if I cannot get it to work or get help to get it to work then I don't think it's ready for general use. |
Beta Was this translation helpful? Give feedback.
-
An error message! 2023-12-21T15:38:32+0000 DEBUG rpc plugin was started {"address": "tcp://127.0.0.1:6001", "list of the plugins with RPC methods:": ["lock", "app", "informer", "jobs", "resetter"]}
2023-12-21T15:38:32+0000 DEBUG jobs initializing driver {"pipeline": "kafka", "driver": "kafka"}
2023-12-21T15:38:32+0000 DEBUG kafka ping kafka: ok {"driver": "kafka", "pipeline": "kafka"}
2023-12-21T15:38:32+0000 DEBUG jobs driver ready {"pipeline": "kafka", "driver": "kafka", "start": "2023-12-21T15:38:32+0000", "elapsed": "5.560543ms"}
2023-12-21T15:38:32+0000 DEBUG kafka pipeline was started {"driver": "kafka", "pipeline": "kafka", "start": "2023-12-21T15:38:32+0000", "elapsed": "3.234µs"}
2023-12-21T15:38:32+0000 ERROR kafka non-retriable consumer error {"topic": "", "partition": 0, "code": 23, "description": "The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.", "message": "INCONSISTENT_GROUP_PROTOCOL"}
2023-12-21T15:38:32+0000 DEBUG kafka kafka listener stopped
2023-12-21T15:38:32+0000 ERROR kafka listener error {"error": "unable to join group session: INCONSISTENT_GROUP_PROTOCOL: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list."}
2023-12-21T15:38:32+0000 DEBUG kafka pipeline was stopped {"driver": "kafka", "pipeline": "kafka", "start": "2023-12-21T15:38:32+0000", "elapsed": "3.450471ms"}
2023-12-21T15:38:32+0000 DEBUG server worker is allocated {"pid": 16, "internal_event_name": "EventWorkerConstruct"}
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
2023-12-21T15:38:32+0000 DEBUG jobs exited from jobs pipeline processor
[INFO] RoadRunner server started; version: 2023.3.8, buildtime: 2023-12-14T16:05:26+0000
[INFO] sdnotify: not notified
Process docker container logs smart_planning_engine finished |
Beta Was this translation helpful? Give feedback.
-
I am having problems just getting the basic config set up.
Error "Did not find expected key by parsing a block mapping"
NB The example has a key called topic at the jobs.pipelines.queue-name.config level which is confusing
e.g.
So we have to specify a topic here (single topic, which one??)
And for the consumer we have to specify a topics key
How does this work?
Beta Was this translation helpful? Give feedback.
All reactions