Skip to content

Conversation

@fangnx
Copy link
Member

@fangnx fangnx commented Oct 17, 2025

What

Follow-up PR after #2041: adding missing types + correcting existing types, according to mypy static checker, in schema-registry module

What's left are functions that might require refactoring and more thorough investigation to get the types right:

  • common/avro.py, json_schema.py, protobuf.py: schemas are defined as unions, and we need to add guards to verify types accordingly during transformations
  • schema_registry_client: handling None return types for several functions
  • rules/cel: adding types for CEL operations
  • rules/encryption: handling None values propagated through functions. Need to understand the logic better

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

JIRA: https://confluentinc.atlassian.net/browse/DGS-22076

Test & Review

Open questions / Follow-ups

@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@sonarqube-confluent

This comment has been minimized.

@fangnx fangnx changed the title WIP: add type-related fixes in schema-registry module Set up type hinting: add fixes in schema-registry module (mostly already typed) Oct 20, 2025
@fangnx fangnx marked this pull request as ready for review October 20, 2025 22:15
@fangnx fangnx requested review from a team and MSeal as code owners October 20, 2025 22:15
@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.


def __init__(
self, key_uri: Optional[str], token: Optional[str], ns: Optional[str] = None,
self, key_uri: str, token: Optional[str], ns: Optional[str] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the optional part as we are already checking emptiness for key_url in https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/schema_registry/rules/encryption/hcvault/hcvault_driver.py#L53, and I think HcVaultKmsClient is supposed to be only created with a specified key_uri, according to the function doc

Not sure if this is considered a breaking change (if customers initializes HcVaultKmsClient directly in their code). @rayokota would love to hear your thoughts on this


def __init__(
self, key_uri: Optional[str], credentials: TokenCredential
self, key_uri: str, credentials: TokenCredential
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar change as the one in hcvault_client.py

@sonarqube-confluent

This comment has been minimized.

return parsed_schema

named_schemas = _resolve_named_schema(schema, self._registry)
if schema.schema_str is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice the schema_str field should never be empty, and even it is I think it makes sense to raise the error to fail early here

Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions on the PR. We are introducing a lot of ignore comments in places like avro that feel off but maybe not worth tackling in this pass anyway.

referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True)
ref_named_schemas = await _resolve_named_schema(referenced_schema.schema, schema_registry_client)
# References in registered schemas are validated by server to be complete
referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True) # type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a need for type ignoring here? Can we set the ref/referenced_schema type to avoid this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, subject and version of SchemaReference never be None, but probably for historical reason (or tech debt) they have been typed as optional. I think updating the types will be a breaking change

Here, get_version() requires non-empty subject and version, so that's why I added the type ignore there. Alternatively we can do:

            if ref.subject is None or ref.version is None:
               # maybe log something
                continue
            referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True)

schema_name = parsed_schema.get("name", schema_dict.get("type"))
if schema.schema_str is not None:
schema_dict = json.loads(schema.schema_str)
schema_name = parsed_schema.get("name", schema_dict.get("type")) # type: ignore[union-attr]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment that 'type' in the dict is for the schema type and not any language type hinting? Seeing it might be confusing without context right next to a type ignore call.

def field_transformer(rule_ctx, field_transform, msg): return ( # noqa: E731
transform(rule_ctx, parsed_schema, msg, field_transform))
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None,
value = self._execute_rules(ctx, subject, RuleMode.WRITE, None, # type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should spread this function args out per line if we're type commenting one. It read weird like this and I think is error prone to refactor bugs

buffer = fo.getvalue()

if latest_schema is not None:
if latest_schema is not None and ctx is not None and subject is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changed the logic here. Are we certain it's a correct / tested change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_execute_rules_with_phase() already requires SerializationContext and subject to build RuleContext (

)

Base automatically changed from typehinting-kafka to master October 22, 2025 22:06
Copilot AI review requested due to automatic review settings October 23, 2025 04:15
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent
Copy link

Failed

  • 75.50% Coverage on New Code (is less than 80.00%)

Analysis Details

42 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 42 Code Smells

Coverage and Duplications

  • Coverage 75.50% Coverage (67.00% Estimated after merge)
  • Duplications No duplication information (5.00% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

ValueError: If ctx is None.
"""
if ctx is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the name_strategy functions a bit: they have to follow the same function signature but SerializationContext is only required for some

self.bearer_field_provider = _StaticFieldProvider(static_token, logical_cluster, identity_pool)
if not isinstance(static_token, string_type):
raise TypeError("bearer.auth.token must be a str, not " + str(type(static_token)))
if self.bearer_auth_credentials_source == 'OAUTHBEARER':
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the indentation is intended to address the type issue of logical_cluster and identity_pool for building _AsyncOAuthClient: they must be non-empty, which we already check in line 280 and 284

This doesn't affect code logic: we check self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'} for the outer block, and those if-else branches are for OAUTHBEARER and STATIC_TOKEN respectively



def get_type(schema: JsonSchema) -> FieldType:
if isinstance(schema, list):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JsonSchema union type is either bool or dict. This was likely coped from avro.py

@fangnx fangnx requested review from MSeal and Copilot October 24, 2025 19:34
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 42 out of 42 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

await self.generate_access_token()

if self.token is None:
raise ValueError("Token is not set after the at")
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected incomplete error message from 'after the at' to 'after the attempt to generate it'.

Suggested change
raise ValueError("Token is not set after the at")
raise ValueError("Token is not set after the attempt to generate it")

Copilot uses AI. Check for mistakes.
self.token_endpoint, logical_cluster, identity_pool,
self.max_retries, self.retries_wait_ms,
self.retries_max_wait_ms)
else: # STATIC_TOKEN
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elif condition check has been replaced with else without verifying all possible values. This assumes only 'OAUTHBEARER' and 'STATIC_TOKEN' are valid values for bearer_auth_credentials_source. Consider adding an explicit elif condition for 'STATIC_TOKEN' to make the code more maintainable and guard against unexpected values.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already the outer block if self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'}:

self.token_endpoint, logical_cluster, identity_pool,
self.max_retries, self.retries_wait_ms,
self.retries_max_wait_ms)
else: # STATIC_TOKEN
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elif condition check has been replaced with else without verifying all possible values. This assumes only 'OAUTHBEARER' and 'STATIC_TOKEN' are valid values for bearer_auth_credentials_source. Consider adding an explicit elif condition for 'STATIC_TOKEN' to make the code more maintainable and guard against unexpected values.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already the outer block if self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'}: (line 273)

if subject_name is not None:
query['subject'] = subject_name
query: dict[str, Any] = {'offset': offset, 'limit': limit}
if subject_name is not None: query['subject'] = subject_name
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is incorrect on line 867. The 'query['subject'] = subject_name' statement should be on its own line with proper indentation.

Suggested change
if subject_name is not None: query['subject'] = subject_name
if subject_name is not None:
query['subject'] = subject_name

Copilot uses AI. Check for mistakes.
if subject_name is not None:
query['subject'] = subject_name
query: dict[str, Any] = {'offset': offset, 'limit': limit}
if subject_name is not None: query['subject'] = subject_name
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is incorrect on line 868. The 'query['subject'] = subject_name' statement should be on its own line with proper indentation.

Suggested change
if subject_name is not None: query['subject'] = subject_name
if subject_name is not None:
query['subject'] = subject_name

Copilot uses AI. Check for mistakes.
Comment on lines 83 to 84
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a mutable default argument (empty list) is dangerous as it will be shared across all instances. Use None as default and initialize inside the function instead.

Suggested change
def __init__(self, topic_partitions: List[TopicPartition] = []) -> None:
self.topic_partitions = topic_partitions or []
def __init__(self, topic_partitions: Optional[List[TopicPartition]] = None) -> None:
self.topic_partitions = topic_partitions if topic_partitions is not None else []

Copilot uses AI. Check for mistakes.
inline_tags = get_inline_tags(reader_schema) if reader_schema is not None else None
obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
reader_schema_raw, obj_dict,
inline_tags,field_transformer)
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space after comma between 'inline_tags' and 'field_transformer' arguments.

Suggested change
inline_tags,field_transformer)
inline_tags, field_transformer)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants