Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load test with Avro Schema that contains references throw an error #293

Open
SamyLegal opened this issue May 5, 2024 · 2 comments
Open

Comments

@SamyLegal
Copy link

Hi,

Currently, i develop a load test with xk6-kafka that use an Avro Schema with references.
When i try to use a schema with references, i have an error/exception.
The stacktrace of the error is present at the end of this message.

Kafka Scenario

For better understand our load test, I will explain our use case of Kafka.
We use Kafka Connect and MongoDB Source Kafka Connector to retrieve data from Mongo collections for insert them in Kafka.
We use this connector for multiple collections, so we have multiple Avro schemas that use MongoDb record schemas from this connector.
For avoid to duplicate this elements of MongoDB schemas, when i create an Avro schema, I reference this Avro schemas instead of include them in my schema.

Avro schema of my principal entity

{
  "type": "record",
  "name": "PieceChangeStream",
  "namespace": "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece",
  "fields": [
    {
      "name": "ns",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.Ns"
      ],
      "default": null
    },
    {
      "name": "_id",
      "type": "string"
    },
    {
      "name": "operationType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "fullDocumentBeforeChange",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "fullDocument",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "to",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.To"
      ],
      "default": null
    },
    {
      "name": "documentKey",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.DocumentKey"
      ],
      "default": null
    },
    {
      "name": "updateDescription",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.UpdateDescription"
      ],
      "default": null
    },
    {
      "name": "clusterTime",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "txnNumber",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "lsid",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.Lsid"
      ],
      "default": null
    }
  ]
}

Exemple of reference schema

com.mongodb.kafka.connect.source.Ns

{
  "type": "record",
  "name": "Ns",
  "namespace": "com.mongodb.kafka.connect.source",
  "fields": [
    {
      "name": "db",
      "type": "string"
    },
    {
      "name": "coll",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

We use Apicurio Schema Registry with the "Confluent Compatibility" for that works with xk6-kafka.
If i use a full schema without references my script work.

K6 script

My k6 script is as follows.
I use the method "getSchema" with property "references" for retrieve my schema.

import { uuidv4 } from "https://jslib.k6.io/k6-utils/1.4.0/index.js";

import { check } from "k6";
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  SCHEMA_TYPE_AVRO,
  SCHEMA_TYPE_STRING,
  RECORD_NAME_STRATEGY,
} from "k6/x/kafka"; // import kafka extension

const brokers = ["localhost:9092"];
const topic = "piece";

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  autoCreateTopic: true,
});

const reader = new Reader({
  brokers: brokers,
  topic: topic,
});

const connection = new Connection({
  address: brokers[0],
});

const schemaRegistry = new SchemaRegistry({
  url: "http://localhost:8200/apis/ccompat/v6",
});

if (__VU === 0) {
  connection.createTopic({ topic: topic });
}

const valueSchemaObject = schemaRegistry.getSchema({
  enableCaching: true,
  subject: "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceChangeStream",
  schemaType: SCHEMA_TYPE_AVRO,
  version: 1,
  references: [
    {
      name: "com.mongodb.kafka.connect.source.DocumentKey",
      subject: "com.mongodb.kafka.connect.source.DocumentKey",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.Lsid",
      subject: "com.mongodb.kafka.connect.source.Lsid",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.Ns",
      subject: "com.mongodb.kafka.connect.source.Ns",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.To",
      subject: "com.mongodb.kafka.connect.source.To",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.UpdateDescription",
      subject: "com.mongodb.kafka.connect.source.UpdateDescription",
      version: 1,
    },
    {
      name: "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument",
      subject: "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument",
      version: 1,
    },
  ],
});

export default function () {
  for (let index = 0; index < 200; index++) {
    const key = uuidv4();
    let messages = [
      {
        key: schemaRegistry.serialize({
          data: key,
          schemaType: SCHEMA_TYPE_STRING,
        }),
        value: schemaRegistry.serialize({
          data: {
            _id: key,
            operationType: {
              string: "insert",
            },
            fullDocumentBeforeChange: null,
            fullDocument: {
              "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument":
                {
                  actif: null,
                  tenant: {
                    string: "test",
                  },
                  description: {
                    "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.DescriptionRecord":
                      {
                        lang: {
                          string: "fr",
                        },
                        value: "Piece",
                      },
                  },
                  origin: null,
                  libelle: {
                    "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.LibelleRecord":
                      {
                        lang: null,
                        value: "Piece",
                      },
                  },
                  id: {
                    string: "/referentiel-piece/api/tenants/test/piece/" + key,
                  },
                  reference: key,
                  kind: {
                    string: "piece",
                  },
                  title: null,
                  date: {
                    string: "2024-02-21T17:42:36.437Z",
                  },
                  expiration: {
                    string: "2024-02-21T17:42:36.437Z",
                  },
                  statut: {
                    string: "PUBLIE",
                  },
                  nature: {
                    string: "INSTRUCTION",
                  },
                  user: null,
                  version: null,
                  active: {
                    boolean: true,
                  },
                  _id: {
                    string: key,
                  },
                },
            },
            ns: {
              "com.mongodb.kafka.connect.source.Ns": {
                db: "pda",
                coll: {
                  string: "piece",
                },
              },
            },
            to: null,
            documentKey: {
              "com.mongodb.kafka.connect.source.DocumentKey": {
                _id: {
                  string: key,
                },
              },
            },
            updateDescription: null,
            clusterTime: null,
            txnNumber: null,
            lsid: null,
          },
          schema: valueSchemaObject,
          schemaType: SCHEMA_TYPE_AVRO,
        }),
      },
    ];
    writer.produce({ messages: messages });
  }

  // Read 10 messages only
  let messages = reader.consume({ limit: 200 });
  check(messages, {
    "200 messages returned": (msgs) => msgs.length === 200
  });
}

export function teardown(data) {
  if (__VU === 0) {
    // Delete the topic
    connection.deleteTopic(topic);
  }
  writer.close();
  reader.close();
  connection.close();
}

Stacktrace of the error

ERRO[0000] panic: runtime error: invalid memory address or nil pointer dereference
goroutine 103 [running]:
runtime/debug.Stack()
        runtime/debug/stack.go:24 +0x64
go.k6.io/k6/js/common.RunWithPanicCatching.func1()
        go.k6.io/[email protected]/js/common/util.go:82 +0x180
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/dop251/goja.(*Runtime).runWrapped.func1()
        github.com/dop251/[email protected]/runtime.go:2442 +0xf4
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/dop251/goja.(*vm).handleThrow(0x14000952900, {0x10634cd00, 0x107514980})
        github.com/dop251/[email protected]/vm.go:788 +0x3c0
github.com/dop251/goja.(*vm).try.func1()
        github.com/dop251/[email protected]/vm.go:807 +0x48
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/dop251/goja.(*vm).handleThrow(0x14000952900, {0x10634cd00, 0x107514980})
        github.com/dop251/[email protected]/vm.go:788 +0x3c0
github.com/dop251/goja.(*vm).runTryInner.func1()
        github.com/dop251/[email protected]/vm.go:830 +0x48
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/linkedin/goavro/v2.(*Codec).NativeFromTextual(0x140009cd8f0?, {0x140026ba580, 0x515, 0x580})
        github.com/linkedin/goavro/[email protected]/codec.go:500 +0x24
github.com/mostafa/xk6-kafka.(*AvroSerde).Serialize(0x106331860?, {0x1063316e0?, 0x14002061ef0?}, 0x140009cd8f0)
        github.com/mostafa/[email protected]/avro.go:14 +0x58
github.com/mostafa/xk6-kafka.(*Kafka).serialize(0x140013ba2c0, 0x14002061e90)
        github.com/mostafa/[email protected]/serdes.go:46 +0xb8
github.com/mostafa/xk6-kafka.(*Kafka).schemaRegistryClientClass.func4({{0x106614ee8, 0x14001fcb110}, {0x1400168fcb8, 0x1, 0x25}})
        github.com/mostafa/[email protected]/schema_registry.go:210 +0x108
github.com/dop251/goja.(*nativeFuncObject).vmCall(0x14001c1e3c0, 0x14000952900, 0x1)
        github.com/dop251/[email protected]/func.go:563 +0x168
github.com/dop251/goja.call.exec(0x952900?, 0x14000952900)
        github.com/dop251/[email protected]/vm.go:3375 +0x74
github.com/dop251/goja.(*vm).run(0x14000952900)
        github.com/dop251/[email protected]/vm.go:582 +0x6c
github.com/dop251/goja.(*vm).runTryInner(0x14000952900?)
        github.com/dop251/[email protected]/vm.go:834 +0x58
github.com/dop251/goja.(*baseJsFuncObject).__call(0x14002d36cc0, {0x1400178d320?, 0x1, 0x104ba84bc?}, {0x0, 0x0}, {0x1066151e0?, 0x1075bc940?})
        github.com/dop251/[email protected]/func.go:426 +0x5d0
github.com/dop251/goja.(*baseJsFuncObject)._call(...)
        github.com/dop251/[email protected]/func.go:442
github.com/dop251/goja.(*baseJsFuncObject).call(0x0?, {{0x1066151e0, 0x1075bc940}, {0x1400178d320, 0x1, 0x1}}, {0x0?, 0x0?})
        github.com/dop251/[email protected]/func.go:450 +0x74
github.com/dop251/goja.(*baseJsFuncObject).Call(...)
        github.com/dop251/[email protected]/func.go:382
github.com/dop251/goja.AssertFunction.func1.1()
        github.com/dop251/[email protected]/runtime.go:2402 +0x68
github.com/dop251/goja.(*vm).try(0x14000952900, 0x14002cac728)
        github.com/dop251/[email protected]/vm.go:811 +0x1e0
github.com/dop251/goja.(*Runtime).runWrapped(0x14001a07c08, 0x0?)
        github.com/dop251/[email protected]/runtime.go:2446 +0x64
github.com/dop251/goja.AssertFunction.func1({0x1066151e0?, 0x1075bc940?}, {0x1400178d320?, 0x1?, 0x1?})
        github.com/dop251/[email protected]/runtime.go:2401 +0x78
go.k6.io/k6/js.(*VU).runFn.func2.1()
        go.k6.io/[email protected]/js/runner.go:866 +0x4c
go.k6.io/k6/js/eventloop.(*EventLoop).Start(0x14002d27ef0, 0x14001fec780)
        go.k6.io/[email protected]/js/eventloop/eventloop.go:177 +0x160
go.k6.io/k6/js.(*VU).runFn.func2()
        go.k6.io/[email protected]/js/runner.go:865 +0xcc
go.k6.io/k6/js/common.RunWithPanicCatching({0x106619a18?, 0x14000341d80?}, 0x107513070?, 0x14000a0f1e0?)
        go.k6.io/[email protected]/js/common/util.go:86 +0x74
go.k6.io/k6/js.(*VU).runFn(0x14001b6a1e0, {0x106602ee8, 0x14000a0f0e0}, 0x1, 0x140001b64f8, 0x1400178d000, {0x1400178d320, 0x1, 0x1})
        go.k6.io/[email protected]/js/runner.go:864 +0x1d8
go.k6.io/k6/js.(*ActiveVU).RunOnce(0x14000a52540)
        go.k6.io/[email protected]/js/runner.go:797 +0x3d4
go.k6.io/k6/lib/executor.PerVUIterations.Run.getIterationRunner.func7({0x106602eb0, 0x14001fe3bf0}, {0x1065efd00?, 0x14000a52540?})
        go.k6.io/[email protected]/lib/executor/helpers.go:108 +0x44
go.k6.io/k6/lib/executor.PerVUIterations.Run.func5({0x1065fae18, 0x14001b6a1e0})
        go.k6.io/[email protected]/lib/executor/per_vu_iterations.go:228 +0x320
created by go.k6.io/k6/lib/executor.PerVUIterations.Run in goroutine 101
        go.k6.io/[email protected]/lib/executor/per_vu_iterations.go:241 +0x8cc

Have you got an idea of the origin of the error ?
Is that scenario work with xk6-kafka ?

@mostafa
Copy link
Owner

mostafa commented May 5, 2024

Hey @SamyLegal,

Please see FAQ No. 12 on README.

@SamyLegal
Copy link
Author

Thank you for the answer @mostafa

I have read the FAQ No. 12 on README and i have try my schema without references with the tool https://github.com/mostafa/nested-avro-schema and all works fine.

I think my error is not related to nested schemas Avro but an Avro schema that contains references to others schemas in a schema registry like this :

{
  "type": "record",
  "name": "PieceChangeStream",
  "namespace": "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece",
  "fields": [
    {
      "name": "ns",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.Ns"
      ],
      "default": null
    },
    {
      "name": "_id",
      "type": "string"
    },
    {
      "name": "operationType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "fullDocumentBeforeChange",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "fullDocument",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "to",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.To"
      ],
      "default": null
    },
   ...
  ]
}

In the project, i have not found a unit test with this case. For you do you think is it works ?

The library "srclient" that you use in this project handle this case.
https://github.com/riferrei/srclient/blob/baa74d8799c9533e679e30739a09346fbd8f6982/schemaRegistryClient_test.go#L291

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants