Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"No such method Error" when attempting to parse JSON published to Kafka topic #36

Open
szmejap opened this issue Feb 23, 2022 · 2 comments

Comments

@szmejap
Copy link

szmejap commented Feb 23, 2022

Dear RMLStreamer dev team

When setting up a test deployment on RML Streamer using the provided docker-compose files, I encounter the following error when publishing JSON data to a Kafka topic:

java.lang.NoSuchMethodError: 'scala.collection.mutable.ArrayOps scala.Predef$.refArrayOps(java.lang.Object[])'
	at io.rml.framework.core.item.json.JSONItem$$anonfun$1.apply(JSONItem.scala:90)
	at io.rml.framework.core.item.json.JSONItem$$anonfun$1.apply(JSONItem.scala:80)
	at scala.collection.immutable.List.flatMap(List.scala:334)
	at io.rml.framework.core.item.json.JSONItem$.fromStringOptionableList(JSONItem.scala:78)
	at io.rml.framework.flink.source.JSONStream$$anonfun$2.apply(JSONStream.scala:99)
	at io.rml.framework.flink.source.JSONStream$$anonfun$2.apply(JSONStream.scala:98)
	at org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:648)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.base/java.lang.Thread.run(Unknown Source)

The error references this line, which, as far as I understand, deserializes individual JSON items.

Test scenario

I verified, that the file input/output example works. To set up an analogous Kafka example, I prepared a local Kafka broker with topics "testInput" and "testOutput" and used the following mapping file (based on the 'scenario-1' example):

@prefix rr: <http://www.w3.org/ns/r2rml#> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix ex: <http://example.com/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix rml: <http://semweb.mmlab.be/ns/rml#> .
@prefix ql: <http://semweb.mmlab.be/ns/ql#> .
@prefix rmls: <http://semweb.mmlab.be/ns/rmls#> .
@base <http://example.com/base/> .

<TriplesMap1> a rr:TriplesMap;
 
  rml:logicalSource [
    rml:source [
      a rmls:KafkaStream ;
      rmls:broker "kafka:9092" ;
      rmls:groupId "0";
      rmls:topic "testInput";
    ];
    rml:referenceFormulation ql:JSONPath;
    rml:iterator "$.persons[*]"
    ];

  rr:subjectMap [ 
    rr:template "http://example.com/{fname};{lname}";
    rr:class foaf:Person;
  ];

  rr:predicateObjectMap [ 
    rr:predicate ex:owes; 
    rr:objectMap [ rml:reference "amount"; ]
  ].

I'm using the kafka console producer and consumer to publish and consume data:

./kafka-console-producer.sh --broker-list 127.0.0.1:9093 --topic testInput < ../../scenario-1/input.json

The input data:

{"persons": [{ "fname": "Sue", "lname": "Jones", "amount": "20.0E0" }, { "fname": "Bob", "lname": "Smith", "amount": "30.0E0" }]}

Arguments for the Flink job are:

toKafka --mapping-file /mnt/data/kafkaMapping.ttl --broker-list kafka:9092 --topic testOutput

The job is created and runs, until something is published on the input topic. Sending any data to testInput topic results in errors:

  • When sending plain string, JSON parsing error is produced (as expected): ERROR io.rml.framework.core.item.json.JSONItem$ [] - Error while parsing JSON: [PLAIN TEXT HERE], and the job continues
  • When sending JSON string, either piped from file, or pasted in, the Job fails with the exception cited above

Environment setup

I'm only including this information for the sake of completeness. I don't think this is relevant to the problem, but I just don't know anymore.

I tried to simplify the setup, so that the error may be reproducible. I have local scripts to interact with Kafka, and the rest of the setup is managed with docker-compose, using the following file:

version: "3.9"
services:

  jobmanager:
    image: flink:1.14.0-scala_2.12-java11
    expose:
      - "6123"
    ports:
      - "8081:8081"
    depends_on:
      - kafka
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - data:/mnt/data

  taskmanager:
    image: flink:1.14.0-scala_2.12-java11
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - data:/mnt/data

  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
      - '9093:9093'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

volumes:
  data:

Broker address localhost:9093 maps to kafka:9092 for the internal Docker network, in which Flink and Kafka images run, so the different addresses and port numbers still lead to the same services.

Summary

Is there a minimal working example of how to use RML Streamer with Kafka? How can I access detailed RML Streames logs (if available)?

I would be grateful for any assistance you can provide.

Best Regards,
Pawel

@ghsnd
Copy link
Contributor

ghsnd commented Feb 25, 2022

Hi Pawel,
Could you try again and change the Flink container version in your docker-compose file for both the jobmanager and taskmanager to

image: flink:1.14.0-scala_2.11-java11

Scala 2.12 is not supported yet although I see it on the example docker-compose file. I'll change that as well.

@szmejap
Copy link
Author

szmejap commented Feb 28, 2022

Hi Gerald!,

Thank you for your suggestion. I'll try to test it again with the Scala 2.11 image, when I have the time (it's not a priority ATM).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants