From eb1be635dc2c8c1264582db95e56214491ac5216 Mon Sep 17 00:00:00 2001 From: claimundefine Date: Tue, 12 Nov 2024 17:26:42 -0500 Subject: [PATCH 1/2] Add concurrent and sequential performance tests with and without CSFLE --- .../schemaregistry-serialization.spec.ts | 180 +++++++++++++++++- 1 file changed, 171 insertions(+), 9 deletions(-) diff --git a/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts b/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts index a83c3564..4c772549 100644 --- a/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts +++ b/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts @@ -1,7 +1,7 @@ import { AvroSerializer, AvroDeserializer, AvroSerializerConfig, SerdeType, Serializer, Deserializer, JsonSerializer, JsonDeserializer, JsonSerializerConfig, - ClientConfig, SchemaRegistryClient, SchemaInfo + ClientConfig, SchemaRegistryClient, SchemaInfo, Rule, RuleMode, RuleSet } from "@confluentinc/schemaregistry"; import { localAuthCredentials } from "../constants"; import { v4 } from "uuid"; @@ -24,6 +24,25 @@ const avroSchemaString: string = JSON.stringify({ ], }); +let encRule: Rule = { + name: 'EncryptionDemo', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'schemaregistryperf', + 'encrypt.kms.type': 'aws-kms', + 'encrypt.kms.key.id': 'your-kms-key', + }, + onFailure: 'ERROR,NONE' +}; + +let ruleSet: RuleSet = { + domainRules: [encRule] +}; + + const jsonSchemaString: string = JSON.stringify({ "$schema": "http://json-schema.org/draft-07/schema#", "title": "User", @@ -52,6 +71,18 @@ const jsonSchemaInfo: SchemaInfo = { schemaType: 'JSON' }; +const avroSchemaInfoWithRules: SchemaInfo = { + schema: avroSchemaString, + schemaType: 'AVRO', + ruleSet: ruleSet +}; + +const jsonSchemaInfoWithRules: SchemaInfo = { + schema: jsonSchemaString, + schemaType: 'JSON', + ruleSet: ruleSet +}; + const data: { name: string; age: number; address: string; }[] = []; let schemaRegistryClient: SchemaRegistryClient; @@ -66,10 +97,12 @@ function generateData(numRecords: number) { } } -generateData(10000); +const numRecords = 10000; + +generateData(numRecords); async function serializeAndDeserializeSchemas(serializer: Serializer, deserializer: Deserializer, topic: string) { - Promise.all( + await Promise.all( data.map(async (record) => { const serialized = await serializer.serialize(topic, record); await deserializer.deserialize(topic, serialized); @@ -77,7 +110,7 @@ async function serializeAndDeserializeSchemas(serializer: Serializer, deserializ ); } -describe('Serialization Performance Test', () => { +describe('Concurrent Serialization Performance Test', () => { beforeEach(async () => { schemaRegistryClient = new SchemaRegistryClient(clientConfig); @@ -95,7 +128,7 @@ describe('Serialization Performance Test', () => { await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic); const end = performance.now(); - console.log(`JSON serialization and deserialization took ${end - start} ms`); + console.log(`Concurrent JSON serialization and deserialization took ${end - start} ms`); }); it("Should measure serialization and deserialization performance for Avro", async () => { @@ -103,14 +136,14 @@ describe('Serialization Performance Test', () => { await schemaRegistryClient.register(topic + "-value", avroSchemaInfo); const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; - const serializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); - const deserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); const start = performance.now(); - await serializeAndDeserializeSchemas(serializer, deserializer, topic); + await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic); const end = performance.now(); - console.log(`Avro serialization and deserialization took ${end - start} ms`); + console.log(`Concurrent Avro serialization and deserialization took ${end - start} ms`); }); // it("Should measure serialization and deserialization performance for Protobuf", async () => { @@ -125,3 +158,132 @@ describe('Serialization Performance Test', () => { // console.log(`Protobuf serialization and deserialization took ${end - start} ms`); // }); }); + +describe('Concurrent Serialization Performance Test with Rules', () => { + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + }); + + it("Should measure serialization and deserialization performance for JSON with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", jsonSchemaInfoWithRules); + + const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true }; + const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); + const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const start = performance.now(); + await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic); + const end = performance.now(); + + console.log(`Concurrent JSON serialization and deserialization with rules took ${end - start} ms`); + }); + + it("Should measure serialization and deserialization performance for Avro with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", avroSchemaInfoWithRules); + + const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const start = performance.now(); + await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic); + const end = performance.now(); + + console.log(`Concurrent Avro serialization and deserialization with rules took ${end - start} ms`); + }); +}); + +describe("Sequential Serialization Performance Test", () => { + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + }); + + it("Should measure serialization and deserialization performance for JSON", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", jsonSchemaInfo); + + const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true }; + const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); + const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + await jsonSerializer.serialize(topic, data[0]); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await jsonSerializer.serialize(topic, data[i]); + await jsonDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential JSON serialization and deserialization took ${end - start} ms`); + }); + + it("Should measure serialization and deserialization performance for Avro", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", avroSchemaInfo); + + const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + await avroSerializer.serialize(topic, data[0]); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await avroSerializer.serialize(topic, data[i]); + await avroDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential Avro serialization and deserialization took ${end - start} ms`); + }); +}); + +describe("Sequential Serialization Performance Test with Rules", () => { + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + }); + + it("Should measure serialization and deserialization performance for JSON with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", jsonSchemaInfoWithRules); + + const jsonSerializerConfig: JsonSerializerConfig = { useLatestVersion: true }; + const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); + const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + await jsonSerializer.serialize(topic, data[0]); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await jsonSerializer.serialize(topic, data[i]); + await jsonDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential JSON serialization and deserialization with rules took ${end - start} ms`); + }); + + it("Should measure serialization and deserialization performance for Avro with rules", async () => { + const topic = v4(); + await schemaRegistryClient.register(topic + "-value", avroSchemaInfoWithRules); + + const avroSerializerConfig: AvroSerializerConfig = { useLatestVersion: true }; + const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); + const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + await avroSerializer.serialize(topic, data[0]); + + const start = performance.now(); + for (let i = 0; i < numRecords; i++) { + const serialized = await avroSerializer.serialize(topic, data[i]); + await avroDeserializer.deserialize(topic, serialized); + } + const end = performance.now(); + + console.log(`Sequential Avro serialization and deserialization with rules took ${end - start} ms`); + }); + +}); From f16606eb375b88d6e4a8b70aef6a16db7ec7bc2f Mon Sep 17 00:00:00 2001 From: claimundefine Date: Wed, 13 Nov 2024 16:03:25 -0500 Subject: [PATCH 2/2] Add proper CSFLE testing with CCloud instance --- schemaregistry-examples/src/constants.ts | 14 +- .../schemaregistry-serialization.spec.ts | 121 +++++++++++++----- 2 files changed, 101 insertions(+), 34 deletions(-) diff --git a/schemaregistry-examples/src/constants.ts b/schemaregistry-examples/src/constants.ts index a30f5881..7e524efc 100644 --- a/schemaregistry-examples/src/constants.ts +++ b/schemaregistry-examples/src/constants.ts @@ -1,4 +1,4 @@ -import { BasicAuthCredentials } from '@confluentinc/schemaregistry'; +import { BasicAuthCredentials, BearerAuthCredentials } from '@confluentinc/schemaregistry'; const issuerEndpointUrl = ''; // e.g. 'https://dev-123456.okta.com/oauth2/default/v1/token'; const oauthClientId = ''; @@ -22,7 +22,17 @@ const basicAuthCredentials: BasicAuthCredentials = { userInfo: ':', }; +const bearerAuthCredentials: BearerAuthCredentials = { + credentialsSource: 'OAUTHBEARER', + issuerEndpointUrl: issuerEndpointUrl, + clientId: oauthClientId, + clientSecret: oauthClientSecret, + scope: scope, + identityPoolId: identityPoolId, + logicalCluster: schemaRegistryLogicalCluster +}; + export { issuerEndpointUrl, oauthClientId, oauthClientSecret, scope, identityPoolId, kafkaLogicalCluster, schemaRegistryLogicalCluster, - baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials + baseUrl, clusterBootstrapUrl, clusterApiKey, clusterApiSecret, basicAuthCredentials, localAuthCredentials, bearerAuthCredentials }; \ No newline at end of file diff --git a/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts b/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts index 4c772549..cc979c81 100644 --- a/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts +++ b/schemaregistry-examples/src/performance/schemaregistry-serialization.spec.ts @@ -1,17 +1,21 @@ import { AvroSerializer, AvroDeserializer, AvroSerializerConfig, SerdeType, Serializer, Deserializer, JsonSerializer, JsonDeserializer, JsonSerializerConfig, - ClientConfig, SchemaRegistryClient, SchemaInfo, Rule, RuleMode, RuleSet + ClientConfig, SchemaRegistryClient, SchemaInfo, Rule, RuleMode, RuleSet, FieldEncryptionExecutor, AwsKmsDriver } from "@confluentinc/schemaregistry"; -import { localAuthCredentials } from "../constants"; +import { bearerAuthCredentials } from "../constants"; import { v4 } from "uuid"; import { beforeEach, describe, it } from '@jest/globals'; +FieldEncryptionExecutor.register(); +AwsKmsDriver.register(); + const clientConfig: ClientConfig = { - baseURLs: ['http://localhost:8081'], + baseURLs: ["your-base-url"], + isForward: false, cacheCapacity: 512, cacheLatestTtlSecs: 60, - basicAuthCredentials: localAuthCredentials, + bearerAuthCredentials: bearerAuthCredentials, }; const avroSchemaString: string = JSON.stringify({ @@ -24,24 +28,18 @@ const avroSchemaString: string = JSON.stringify({ ], }); -let encRule: Rule = { - name: 'EncryptionDemo', - kind: 'TRANSFORM', - mode: RuleMode.WRITEREAD, - type: 'ENCRYPT', - tags: ['PII'], - params: { - 'encrypt.kek.name': 'schemaregistryperf', - 'encrypt.kms.type': 'aws-kms', - 'encrypt.kms.key.id': 'your-kms-key', - }, - onFailure: 'ERROR,NONE' -}; - -let ruleSet: RuleSet = { - domainRules: [encRule] -}; - +const avroSchemaStringWithTags: string = JSON.stringify({ + type: 'record', + name: 'User', + fields: [ + { name: 'name', type: 'string' }, + { name: 'age', type: 'int' }, + { + name: 'address', type: 'string', + "confluent:tags": ["PII"] + } + ], +}); const jsonSchemaString: string = JSON.stringify({ "$schema": "http://json-schema.org/draft-07/schema#", @@ -55,12 +53,49 @@ const jsonSchemaString: string = JSON.stringify({ "type": "integer" }, "address": { + "type": "string", + } + }, + "required": ["name", "age", "address"] +}); + +const jsonSchemaStringWithTags: string = JSON.stringify({ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "User", + "type": "object", + "properties": { + "name": { "type": "string" + }, + "age": { + "type": "integer" + }, + "address": { + "type": "string", + "confluent:tags": [ "PII" ] } }, "required": ["name", "age", "address"] }); +let encRule: Rule = { + name: 'EncryptionDemo', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT', + tags: ['PII'], + params: { + 'encrypt.kek.name': 'schemaregistrydemo', + 'encrypt.kms.type': 'aws-kms', + 'encrypt.kms.key.id': 'your-kms-key', + }, + onFailure: 'ERROR,NONE' +}; + +let ruleSet: RuleSet = { + domainRules: [encRule] +}; + const avroSchemaInfo: SchemaInfo = { schema: avroSchemaString, schemaType: 'AVRO' @@ -72,18 +107,18 @@ const jsonSchemaInfo: SchemaInfo = { }; const avroSchemaInfoWithRules: SchemaInfo = { - schema: avroSchemaString, + schema: avroSchemaStringWithTags, schemaType: 'AVRO', ruleSet: ruleSet }; const jsonSchemaInfoWithRules: SchemaInfo = { - schema: jsonSchemaString, + schema: jsonSchemaStringWithTags, schemaType: 'JSON', ruleSet: ruleSet }; -const data: { name: string; age: number; address: string; }[] = []; +let data: { name: string; age: number; address: string; }[]; let schemaRegistryClient: SchemaRegistryClient; @@ -97,9 +132,7 @@ function generateData(numRecords: number) { } } -const numRecords = 10000; - -generateData(numRecords); +const numRecords = 1000; async function serializeAndDeserializeSchemas(serializer: Serializer, deserializer: Deserializer, topic: string) { await Promise.all( @@ -114,6 +147,8 @@ describe('Concurrent Serialization Performance Test', () => { beforeEach(async () => { schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); }); it("Should measure serialization and deserialization performance for JSON", async () => { @@ -124,6 +159,9 @@ describe('Concurrent Serialization Performance Test', () => { const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); + const start = performance.now(); await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic); const end = performance.now(); @@ -139,6 +177,9 @@ describe('Concurrent Serialization Performance Test', () => { const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); + const start = performance.now(); await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic); const end = performance.now(); @@ -162,6 +203,8 @@ describe('Concurrent Serialization Performance Test', () => { describe('Concurrent Serialization Performance Test with Rules', () => { beforeEach(async () => { schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); }); it("Should measure serialization and deserialization performance for JSON with rules", async () => { @@ -172,6 +215,9 @@ describe('Concurrent Serialization Performance Test with Rules', () => { const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); + const start = performance.now(); await serializeAndDeserializeSchemas(jsonSerializer, jsonDeserializer, topic); const end = performance.now(); @@ -187,6 +233,9 @@ describe('Concurrent Serialization Performance Test with Rules', () => { const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); + const start = performance.now(); await serializeAndDeserializeSchemas(avroSerializer, avroDeserializer, topic); const end = performance.now(); @@ -198,6 +247,8 @@ describe('Concurrent Serialization Performance Test with Rules', () => { describe("Sequential Serialization Performance Test", () => { beforeEach(async () => { schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); }); it("Should measure serialization and deserialization performance for JSON", async () => { @@ -208,7 +259,8 @@ describe("Sequential Serialization Performance Test", () => { const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); - await jsonSerializer.serialize(topic, data[0]); + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); const start = performance.now(); for (let i = 0; i < numRecords; i++) { @@ -228,7 +280,8 @@ describe("Sequential Serialization Performance Test", () => { const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); - await avroSerializer.serialize(topic, data[0]); + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); const start = performance.now(); for (let i = 0; i < numRecords; i++) { @@ -244,6 +297,8 @@ describe("Sequential Serialization Performance Test", () => { describe("Sequential Serialization Performance Test with Rules", () => { beforeEach(async () => { schemaRegistryClient = new SchemaRegistryClient(clientConfig); + data = []; + generateData(numRecords); }); it("Should measure serialization and deserialization performance for JSON with rules", async () => { @@ -254,7 +309,8 @@ describe("Sequential Serialization Performance Test with Rules", () => { const jsonSerializer: JsonSerializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, jsonSerializerConfig); const jsonDeserializer: JsonDeserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); - await jsonSerializer.serialize(topic, data[0]); + const firstSerialized = await jsonSerializer.serialize(topic, data[0]); + await jsonDeserializer.deserialize(topic, firstSerialized); const start = performance.now(); for (let i = 0; i < numRecords; i++) { @@ -274,7 +330,8 @@ describe("Sequential Serialization Performance Test with Rules", () => { const avroSerializer: AvroSerializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, avroSerializerConfig); const avroDeserializer: AvroDeserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); - await avroSerializer.serialize(topic, data[0]); + const firstSerialized = await avroSerializer.serialize(topic, data[0]); + await avroDeserializer.deserialize(topic, firstSerialized); const start = performance.now(); for (let i = 0; i < numRecords; i++) {