Skip to content

Commit

Permalink
Implement Protobuf support (#296)
Browse files Browse the repository at this point in the history
Protobuf support
  • Loading branch information
amrutha-shanbhag authored Jan 28, 2022
1 parent 869b9b4 commit 6111c6d
Show file tree
Hide file tree
Showing 67 changed files with 8,942 additions and 24 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ jobs:
- name: Install dependencies
run: python -m pip install -r requirements-dev.txt

- name: Install Protoc
uses: arduino/setup-protoc@v1
with:
version: '3.13.0'


- name: Execute unit-tests
run: make unittest

Expand Down
27 changes: 27 additions & 0 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
SchemaIncompatibilityType
)
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility
from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility
from karapace.schema_reader import SchemaType, TypedSchema

import logging
Expand Down Expand Up @@ -63,6 +64,10 @@ def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Valida
return jsonschema_compatibility(reader, writer)


def check_protobuf_compatibility(reader, writer) -> SchemaCompatibilityResult:
return check_protobuf_schema_compatibility(reader, writer)


def check_compatibility(
old_schema: TypedSchema, new_schema: TypedSchema, compatibility_mode: CompatibilityModes
) -> SchemaCompatibilityResult:
Expand Down Expand Up @@ -128,6 +133,28 @@ def check_compatibility(
)
)

elif old_schema.schema_type is SchemaType.PROTOBUF:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = result.merged_with(check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
))

else:
result = SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
Expand Down
Empty file.
33 changes: 33 additions & 0 deletions karapace/compatibility/protobuf/checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from karapace.avro_compatibility import SchemaCompatibilityResult, SchemaCompatibilityType
from karapace.protobuf.compare_result import CompareResult
from karapace.protobuf.schema import ProtobufSchema

import logging

log = logging.getLogger(__name__)


def check_protobuf_schema_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult:
result = CompareResult()
log.debug("READER: %s", reader.to_schema())
log.debug("WRITER: %s", writer.to_schema())
writer.compare(reader, result)
log.debug("IS_COMPATIBLE %s", result.is_compatible())
if result.is_compatible():
return SchemaCompatibilityResult.compatible()

incompatibilities = []
locations = set()
messages = set()
for record in result.result:
if not record.modification.is_compatible():
incompatibilities.append(record.modification.__str__())
locations.add(record.path)
messages.add(record.message)

return SchemaCompatibilityResult(
compatibility=SchemaCompatibilityType.incompatible,
incompatibilities=list(incompatibilities),
locations=set(locations),
messages=set(messages),
)
3 changes: 2 additions & 1 deletion karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
"session_timeout_ms": 10000,
"karapace_rest": False,
"karapace_registry": False,
"master_election_strategy": "lowest"
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
}
DEFAULT_LOG_FORMAT_JOURNAL = "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s"

Expand Down
8 changes: 4 additions & 4 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
RECORD_KEYS = ["key", "value", "partition"]
PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"}
RECORD_CODES = [42201, 42202]
KNOWN_FORMATS = {"json", "avro", "binary"}
KNOWN_FORMATS = {"json", "avro", "protobuf", "binary"}
OFFSET_RESET_STRATEGIES = {"latest", "earliest"}
SCHEMA_MAPPINGS = {"avro": SchemaType.AVRO, "jsonschema": SchemaType.JSONSCHEMA}
SCHEMA_MAPPINGS = {"avro": SchemaType.AVRO, "jsonschema": SchemaType.JSONSCHEMA, "protobuf": SchemaType.PROTOBUF}
TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"])


Expand Down Expand Up @@ -536,7 +536,7 @@ async def serialize(
return json.dumps(obj).encode("utf8")
if ser_format == "binary":
return base64.b64decode(obj)
if ser_format in {"avro", "jsonschema"}:
if ser_format in {"avro", "jsonschema", "protobuf"}:
return await self.schema_serialize(obj, schema_id)
raise FormatError(f"Unknown format: {ser_format}")

Expand Down Expand Up @@ -565,7 +565,7 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
)
# disallow missing id and schema for any key/value list that has at least one populated element
if formats["embedded_format"] in {"avro", "jsonschema"}:
if formats["embedded_format"] in {"avro", "jsonschema", "protobuf"}:
for prefix, code in zip(RECORD_KEYS, RECORD_CODES):
if self.all_empty(data, prefix):
continue
Expand Down
4 changes: 2 additions & 2 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import time
import uuid

KNOWN_FORMATS = {"json", "avro", "binary", "jsonschema"}
KNOWN_FORMATS = {"json", "avro", "binary", "jsonschema", "protobuf"}
OFFSET_RESET_STRATEGIES = {"latest", "earliest"}

TypedConsumer = namedtuple("TypedConsumer", ["consumer", "serialization_format", "config"])
Expand Down Expand Up @@ -481,7 +481,7 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats
async def deserialize(self, bytes_: bytes, fmt: str):
if not bytes_:
return None
if fmt in {"avro", "jsonschema"}:
if fmt in {"avro", "jsonschema", "protobuf"}:
return await self.deserializer.deserialize(bytes_)
if fmt == "json":
return json.loads(bytes_.decode('utf-8'))
Expand Down
Empty file added karapace/protobuf/__init__.py
Empty file.
78 changes: 78 additions & 0 deletions karapace/protobuf/compare_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from dataclasses import dataclass, field
from enum import auto, Enum


class Modification(Enum):
PACKAGE_ALTER = auto()
SYNTAX_ALTER = auto()
MESSAGE_ADD = auto()
MESSAGE_DROP = auto()
MESSAGE_MOVE = auto()
ENUM_CONSTANT_ADD = auto()
ENUM_CONSTANT_ALTER = auto()
ENUM_CONSTANT_DROP = auto()
ENUM_ADD = auto()
ENUM_DROP = auto()
TYPE_ALTER = auto()
FIELD_ADD = auto()
FIELD_DROP = auto()
FIELD_MOVE = auto()
FIELD_LABEL_ALTER = auto()
FIELD_NAME_ALTER = auto()
FIELD_KIND_ALTER = auto()
FIELD_TYPE_ALTER = auto()
ONE_OF_ADD = auto()
ONE_OF_DROP = auto()
ONE_OF_MOVE = auto()
ONE_OF_FIELD_ADD = auto()
ONE_OF_FIELD_DROP = auto()
ONE_OF_FIELD_MOVE = auto()
FEW_FIELDS_CONVERTED_TO_ONE_OF = auto()

# protobuf compatibility issues is described in at
# https://yokota.blog/2021/08/26/understanding-protobuf-compatibility/
def is_compatible(self) -> bool:
return self not in [
self.MESSAGE_MOVE, self.MESSAGE_DROP, self.FIELD_LABEL_ALTER, self.FIELD_KIND_ALTER, self.FIELD_TYPE_ALTER,
self.ONE_OF_FIELD_DROP, self.FEW_FIELDS_CONVERTED_TO_ONE_OF
]


@dataclass
class ModificationRecord:
modification: Modification
path: str
message: str = field(init=False)

def __post_init__(self) -> None:
if self.modification.is_compatible():
self.message = f"Compatible modification {self.modification} found"
else:
self.message = f"Incompatible modification {self.modification} found"

def to_str(self) -> str:
return self.message


class CompareResult:
def __init__(self) -> None:
self.result = []
self.path = []
self.canonical_name = []

def push_path(self, name_element: str, canonical: bool = False) -> None:
if canonical:
self.canonical_name.append(name_element)
self.path.append(name_element)

def pop_path(self, canonical: bool = False) -> None:
if canonical:
self.canonical_name.pop()
self.path.pop()

def add_modification(self, modification: Modification) -> None:
record = ModificationRecord(modification, ".".join(self.path))
self.result.append(record)

def is_compatible(self) -> bool:
return all(record.modification.is_compatible() for record in self.result)
134 changes: 134 additions & 0 deletions karapace/protobuf/compare_type_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from dataclasses import dataclass
from karapace.protobuf.compare_result import CompareResult
from karapace.protobuf.exception import IllegalArgumentException
from karapace.protobuf.proto_type import ProtoType
from karapace.protobuf.type_element import TypeElement
from typing import Dict, List, Optional, TYPE_CHECKING, Union

if TYPE_CHECKING:
from karapace.protobuf.message_element import MessageElement
from karapace.protobuf.field_element import FieldElement


def compute_name(t: ProtoType, result_path: List[str], package_name: str, types: dict) -> Optional[str]:
string = t.string

if string.startswith("."):
name = string[1:]
if types.get(name):
return name
return None
canonical_name = list(result_path)
if package_name:
canonical_name.insert(0, package_name)
while len(canonical_name) > 0:
pretender: str = ".".join(canonical_name) + "." + string
pt = types.get(pretender)
if pt is not None:
return pretender
canonical_name.pop()
if types.get(string):
return string
return None


class CompareTypes:
def __init__(self, self_package_name: str, other_package_name: str, result: CompareResult) -> None:

self.self_package_name = self_package_name
self.other_package_name = other_package_name
self.self_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.other_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.locked_messages: List['MessageElement'] = []
self.environment: List['MessageElement'] = []
self.result = result

def add_a_type(self, prefix: str, package_name: str, type_element: TypeElement, types: dict) -> None:
name: str
if prefix:
name = prefix + "." + type_element.name
else:
name = type_element.name
from karapace.protobuf.message_element import MessageElement
from karapace.protobuf.field_element import FieldElement

if isinstance(type_element, MessageElement): # add support of MapEntry messages
if "map_entry" in type_element.options:
key: Optional[FieldElement] = next((f for f in type_element.fields if f.name == "key"), None)
value: Optional[FieldElement] = next((f for f in type_element.fields if f.name == "value"), None)
types[name] = TypeRecordMap(package_name, type_element, key, value)
else:
types[name] = TypeRecord(package_name, type_element)
else:
types[name] = TypeRecord(package_name, type_element)

for t in type_element.nested_types:
self.add_a_type(name, package_name, t, types)

def add_self_type(self, package_name: str, type_element: TypeElement) -> None:
self.add_a_type(package_name, package_name, type_element, self.self_types)

def add_other_type(self, package_name: str, type_element: TypeElement) -> None:
self.add_a_type(package_name, package_name, type_element, self.other_types)

def get_self_type(self, t: ProtoType) -> Union[None, 'TypeRecord', 'TypeRecordMap']:
name = compute_name(t, self.result.path, self.self_package_name, self.self_types)
if name is not None:
type_record = self.self_types.get(name)
return type_record
return None

def get_other_type(self, t: ProtoType) -> Union[None, 'TypeRecord', 'TypeRecordMap']:
name = compute_name(t, self.result.path, self.other_package_name, self.other_types)
if name is not None:
type_record = self.other_types.get(name)
return type_record
return None

def self_type_short_name(self, t: ProtoType) -> Optional[str]:
name = compute_name(t, self.result.path, self.self_package_name, self.self_types)
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.self_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1):]
return name

def other_type_short_name(self, t: ProtoType) -> Optional[str]:
name = compute_name(t, self.result.path, self.other_package_name, self.other_types)
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.other_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1):]
return name

def lock_message(self, message: 'MessageElement') -> bool:
if message in self.locked_messages:
return False
self.locked_messages.append(message)
return True

def unlock_message(self, message: 'MessageElement') -> bool:
if message in self.locked_messages:
self.locked_messages.remove(message)
return True
return False


@dataclass
class TypeRecord:
package_name: str
type_element: TypeElement


class TypeRecordMap(TypeRecord):
def __init__(
self, package_name: str, type_element: TypeElement, key: Optional['FieldElement'], value: Optional['FieldElement']
) -> None:
super().__init__(package_name, type_element)
self.key = key
self.value = value

def map_type(self) -> ProtoType:
return ProtoType.get2(f"map<{self.key.element_type}, {self.value.element_type}>")
Loading

0 comments on commit 6111c6d

Please sign in to comment.