Skip to content

PIP 43: producer send message with different schema

Sijie Guo edited this page Sep 3, 2019 · 3 revisions

Motivation

For now, Pulsar producer can only produce messages of one type of schema which is determined by user when it is created, or by fecthing the latest version of schema from registry if AUTO_PRODUCE_BYTES type is specified. Schema, however, can be updated by external system after producer started, which would lead to inconsistency between messsage payload and schema version metadata. Also some senarios like replicating from kafka require a single producer for replicating messages of different schemas from one Kafka partition to one Pulsar partition to guarantee the order and no duplicates.

Here proposing that messages can indicate the associated schema by itself with two parts of changes to make it clear.

Changes:Part-1

For the part-1, here propose that producer supports to new message specified with schema, particularly, of same POJO type.

Interfaces

For the Producer<T> interface, here propose a new method for newMessage to new a message builder with specified schema with the following signature:

TypedMessageBuilder<T> newMessage(Schema<T> schema);

where the parameterized type T is required to be same with producer.

For AutoProduceBytesSchema specially, user SHOULD new message with actual schema wrapped by auto produce bytes schema. A static method MAY be provided by Schema interface with follwing signature:

static Schema<byte[]> AUTO_PRODUCE_BYTES(Schema<?> schema);

Wire protocols

To guarantee scenario that send message with brand new schema, we also propose a new Command to get schema version, or create one if NOT present.

message CommandGetOrCreateSchema {
    required uint64 request_id = 1;
    required string topic      = 2;
    required Schema schema     = 3;
}

message CommandGetOrCreateSchemaResponse {
    required uint64 request_id      = 1;
    optional ServerError error_code = 2;
    optional string error_message   = 3;

    optional bytes schema_version   = 4;
}

Implementation

Client

The current Schema schema field of Producer would be used as default schema, when producer send messages without specifying schema explicily, with which the default schema would be associated.

Producer SHOULD maintain a local map from schema to schema version, and check the schema version of the associated schema of message before send. If the schema can not be found, producer SHOULD try to register this schema to the registry and get the version of it, then insert the pair to the local map. Hash of Schema same with registry CAN be used as key of map.

Producer SHOULD also attach the actual schema version to the message metadata as it is.

For batch messages with single same metadata, only one schema version is allowed, so before adding one message into batch container, producer SHOULD check the schema version in this batch, and flush the batch if associated with different schema from the message and add it to another batch.

To be seamless for producer not requires this feature, an option CAN be added to producer builder to enable this feature, and disable by default. When this feature is NOT enabled, the action of producer SHOULD keep as it is.

Broker

Server SHOULD handle the register schema command, and put it to registry, or just respond the existing version if present which SHOULD be built on top of compatibility check. The registry backend has alreay implemented this interface.

Functions

Functions MAY inherit the feature gate option and expose it to configuration, same for Sources.

Changes:Part-2

For the part-2, here propose to allow producer to new message with different POJO type of schema. To be noted that, once one producer can send different POJOs, the parameterized type of message involved in methods of Producer SHOULD be changed in some way. The interceptor mechanism would also be affected.

Interfaces

For the Producer<T> interface, here propose to enhance the method propsed in part-1 to accept arbitrary inner type with the following signature:

<V> TypedMessageBuilder<V> newMessage(Schema<V> schema);

where the parameterized type T and V is NOT required to be same.

For ProducerInterceptor<T> interface, provide a method to indicate whether the message is supported by the interceptor instance:

default bool eligible(Message message) {
    return true;
}

which is essential especially when producer can send different type of messages, notes that the message parameter is Message raw type, not required to has same parameterized type with interceptor.

Implementations

Client

The only thing that needs to be pointed out is when the parameterized type of producer and schema/message conflict, the message parameter is allowed to declare with different parameterized type.

For ProducerInterceptors, check if eligible before invoking each interceptor.

Clone this wiki locally